[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user asfgit closed the pull request at: https://github.com/apache/incubator-rya/pull/177 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r130935217 --- Diff: extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryStorageMetadata.java --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage; + +import java.util.Objects; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; + +import com.google.common.base.Preconditions; + +/** + * Metadata for a given PeriodicQueryStorage table. + */ +public class PeriodicQueryStorageMetadata { + +private String sparql; +private VariableOrder varOrder; + +/** + * Create a PeriodicQueryStorageMetadat object --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user amihalik commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r129669294 --- Diff: extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryStorageMetadata.java --- @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage; + +import java.util.Objects; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; + +import com.google.common.base.Preconditions; + +/** + * Metadata for a given PeriodicQueryStorage table. + */ +public class PeriodicQueryStorageMetadata { + +private String sparql; +private VariableOrder varOrder; + +/** + * Create a PeriodicQueryStorageMetadat object --- End diff -- typo: ...Metadat**a** --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user amihalik commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r129670570 --- Diff: extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/accumulo/AccumuloPeriodicQueryResultStorageIT.java --- @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo.accumulo; --- End diff -- accumulo.accumulo? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user amihalik commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r129668480 --- Diff: extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PeriodicQueryTableNameFactory.java --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo; + +import static java.util.Objects.requireNonNull; + +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; + +/** + * Class for creating the names of {@link PeriodicQueryResultStorage} tables. + * + */ +public class PeriodicQueryTableNameFactory { + +public static final String PeriodicTableSuffix = "PERIODIC_QUERY_"; --- End diff -- "midfix" :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r128063437 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java --- @@ -36,10 +36,11 @@ * Incrementally exports SPARQL query results to Kafka topics. */ public class KafkaBindingSetExporter implements IncrementalBindingSetExporter { + --- End diff -- I meant the file had no changes in it, so you could revert it and remove it from the PR --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r128063325 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java --- @@ -0,0 +1,128 @@ +package org.apache.rya.indexing.pcj.fluo.app.batch; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.util.Iterator; +import java.util.Optional; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.client.scanner.ColumnScanner; +import org.apache.fluo.api.client.scanner.RowScanner; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.ColumnValue; +import org.apache.fluo.api.data.RowColumn; +import org.apache.fluo.api.data.Span; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task; + +import com.google.common.base.Preconditions; + +/** + * This class processes {@link SpanBatchDeleteInformation} objects by + * deleting the entries in the Fluo Column corresponding to the {@link Span} + * of the BatchInformation object. This class will delete entries until the + * batch size is met, and then create a new SpanBatchDeleteInformation object + * with an updated Span whose starting point is the stopping point of this + * batch. If the batch limit is not met, then a new batch is not created and + * the task is complete. + * + */ +public class SpanBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater { + +private static final Logger log = Logger.getLogger(SpanBatchBindingSetUpdater.class); + +/** + * Process SpanBatchDeleteInformation objects by deleting all entries indicated + * by Span until batch limit is met. + * @param tx - Fluo Transaction + * @param row - Byte row identifying BatchInformation + * @param batch - SpanBatchDeleteInformation object to be processed + */ +@Override +public void processBatch(TransactionBase tx, Bytes row, BatchInformation batch) throws Exception { +super.processBatch(tx, row, batch); +Preconditions.checkArgument(batch instanceof SpanBatchDeleteInformation); +SpanBatchDeleteInformation spanBatch = (SpanBatchDeleteInformation) batch; +Task task = spanBatch.getTask(); +int batchSize = spanBatch.getBatchSize(); +Span span = spanBatch.getSpan(); +Column column = batch.getColumn(); +Optional rowCol = Optional.empty(); + +switch (task) { +case Add: +log.trace("The Task Add is not supported for SpanBatchBindingSetUpdater. Batch " + batch + " will not be processed."); --- End diff -- fair enough --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r128043592 --- Diff: extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.pruner; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.Snapshot; +import org.apache.fluo.api.client.SnapshotBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; +import org.apache.rya.periodic.notification.api.BinPruner; +import org.apache.rya.periodic.notification.api.NodeBin; + +import jline.internal.Preconditions; + +/** + * Implementation of {@link BinPruner} that deletes old, already processed + * Periodic Query results from Fluo and the PCJ table to which the Fluo results + * are exported. + * + */ +public class PeriodicQueryPruner implements BinPruner, Runnable { + +private static final Logger log = Logger.getLogger(PeriodicQueryPruner.class); +private FluoClient client; +private AccumuloBinPruner accPruner; +private FluoBinPruner fluoPruner; +private BlockingQueue bins; +private AtomicBoolean closed = new AtomicBoolean(false); +private int threadNumber; + +public PeriodicQueryPruner(FluoBinPruner fluoPruner, AccumuloBinPruner accPruner, FluoClient client, BlockingQueue bins, int threadNumber) { +Preconditions.checkNotNull(fluoPruner); +Preconditions.checkNotNull(accPruner); +Preconditions.checkNotNull(client); +this.client = client; --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r128043280 --- Diff: extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.processor; + +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; +import org.apache.rya.periodic.notification.api.BinPruner; +import org.apache.rya.periodic.notification.api.NodeBin; +import org.apache.rya.periodic.notification.api.NotificationProcessor; +import org.apache.rya.periodic.notification.exporter.BindingSetRecord; +import org.apache.rya.periodic.notification.exporter.KafkaPeriodicBindingSetExporter; +import org.apache.rya.periodic.notification.notification.TimestampedNotification; +import org.openrdf.query.BindingSet; + +import com.google.common.base.Preconditions; + +/** + * Implementation of {@link NotificationProcessor} that uses the id indicated by + * the {@link TimestampedNotification} to obtain results from the + * {@link PeriodicQueryResultStorage} layer containing the results of the + * Periodic Query. The TimestampedNotificationProcessor then parses the results + * and adds them to work queues to be processed by the {@link BinPruner} and the + * {@link KafkaPeriodicBindingSetExporter}. + * + */ +public class TimestampedNotificationProcessor implements NotificationProcessor, Runnable { + +private static final Logger log = Logger.getLogger(TimestampedNotificationProcessor.class); +private PeriodicQueryResultStorage periodicStorage; +private BlockingQueue notifications; // notifications + // to process +private BlockingQueue bins; // entries to delete from Fluo +private BlockingQueue bindingSets; // query results to export +private AtomicBoolean closed = new AtomicBoolean(false); +private int threadNumber; + + +public TimestampedNotificationProcessor(PeriodicQueryResultStorage periodicStorage, +BlockingQueue notifications, BlockingQueue bins, BlockingQueue bindingSets, +int threadNumber) { +Preconditions.checkNotNull(notifications); +Preconditions.checkNotNull(bins); +Preconditions.checkNotNull(bindingSets); --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r128042903 --- Diff: extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java --- @@ -0,0 +1,117 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */package org.apache.rya.periodic.notification.processor; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.periodic.notification.api.LifeCycle; +import org.apache.rya.periodic.notification.api.NodeBin; +import org.apache.rya.periodic.notification.exporter.BindingSetRecord; +import org.apache.rya.periodic.notification.notification.TimestampedNotification; + +import com.google.common.base.Preconditions; + +/** + * Executor service that runs {@link TimestampedNotificationProcessor}s with basic + * functionality for starting, stopping, and determining whether notification processors are + * being executed. + * + */ +public class NotificationProcessorExecutor implements LifeCycle { + +private static final Logger log = Logger.getLogger(TimestampedNotificationProcessor.class); +private BlockingQueue notifications; // notifications +private BlockingQueue bins; // entries to delete from Fluo +private BlockingQueue bindingSets; // query results to + // export +private PeriodicQueryResultStorage periodicStorage; +private List processors; +private int numberThreads; +private ExecutorService executor; +private boolean running = false; + +/** + * Creates NotificationProcessorExecutor. + * @param periodicStorage - storage layer that periodic results are read from + * @param notifications - notifications are pulled from this queue, and the timestamp indicates which bin of results to query for + * @param bins - after notifications are processed, they are added to the bin to be deleted + * @param bindingSets - results read from the storage layer to be exported + * @param numberThreads - number of threads used for processing + */ +public NotificationProcessorExecutor(PeriodicQueryResultStorage periodicStorage, BlockingQueue notifications, +BlockingQueue bins, BlockingQueue bindingSets, int numberThreads) { +Preconditions.checkNotNull(notifications); +Preconditions.checkNotNull(bins); +Preconditions.checkNotNull(bindingSets); +this.notifications = notifications; --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r128042579 --- Diff: extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java --- @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.notification; + +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import org.apache.rya.periodic.notification.api.Notification; + +import com.google.common.base.Preconditions; + +/** + * Notification Object used by the Periodic Query Service to inform workers to + * process results for a given Periodic Query with the indicated id. + * Additionally, this Object contains a period that indicates a frequency at + * which regular updates are generated. + * + */ +public class PeriodicNotification implements Notification { + +private String id; +private long period; +private TimeUnit periodTimeUnit; +private long initialDelay; + +/** + * Creates a PeriodicNotification. + * @param id - Fluo Query Id that this notification is associated with + * @param period - period at which notifications are generated + * @param periodTimeUnit - time unit associated with the period and delay + * @param initialDelay - amount of time to wait before generating the first notification + */ +public PeriodicNotification(String id, long period, TimeUnit periodTimeUnit, long initialDelay) { +Preconditions.checkNotNull(id); +Preconditions.checkNotNull(periodTimeUnit); +Preconditions.checkArgument(period > 0 && initialDelay >= 0); +this.id = id; +this.period = period; +this.periodTimeUnit = periodTimeUnit; +this.initialDelay = initialDelay; +} + + +/** + * Create a PeriodicNotification + * @param other - other PeriodicNotification used in copy constructor + */ +public PeriodicNotification(PeriodicNotification other) { +this(other.id, other.period, other.periodTimeUnit, other.initialDelay); +} + +public String getId() { +return id; +} + +/** + * @return - period at which regular notifications are generated + */ +public long getPeriod() { +return period; +} + +/** + * @return time unit of period and initial delay + */ +public TimeUnit getTimeUnit() { +return periodTimeUnit; +} + +/** + * @return amount of time to delay before beginning to generate notifications + */ +public long getInitialDelay() { +return initialDelay; +} + +@Override +public String toString() { +StringBuilder builder = new StringBuilder(); +String delim = "="; +String delim2 = ";"; +return builder.append("id").append(delim).append(id).append(delim2).append("period").append(delim).append(period).append(delim2) + .append("periodTimeUnit").append(delim).append(periodTimeUnit).append(delim2).append("initialDelay").append(delim) +.append(initialDelay).toString(); +} + +@Override +public boolean equals(Object other) { +if (this == other) { +return true; +} + +if (!(other instanceof PeriodicNotification)) { +return false; +} + +PeriodicNotification notification = (PeriodicNotification) other; +return Objects.equals(this.id, notification.id) && (this.period == notification.period) +&& Objects.equals(this.periodTimeUnit, notification.periodTimeUnit) && (this.initialDelay == notification.initialDelay); +
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r128042239 --- Diff: extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java --- @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.notification; + +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import org.apache.rya.periodic.notification.api.Notification; + +import com.google.common.base.Preconditions; + +/** + * Notification Object used by the Periodic Query Service to inform workers to + * process results for a given Periodic Query with the indicated id. + * Additionally, this Object contains a period that indicates a frequency at + * which regular updates are generated. + * + */ +public class PeriodicNotification implements Notification { + +private String id; +private long period; +private TimeUnit periodTimeUnit; +private long initialDelay; + +/** + * Creates a PeriodicNotification. + * @param id - Fluo Query Id that this notification is associated with + * @param period - period at which notifications are generated + * @param periodTimeUnit - time unit associated with the period and delay + * @param initialDelay - amount of time to wait before generating the first notification + */ +public PeriodicNotification(String id, long period, TimeUnit periodTimeUnit, long initialDelay) { +Preconditions.checkNotNull(id); +Preconditions.checkNotNull(periodTimeUnit); +Preconditions.checkArgument(period > 0 && initialDelay >= 0); --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r128041892 --- Diff: extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.notification; + +import org.apache.rya.periodic.notification.api.Notification; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; + +/** + * This Object contains a Notification Object used by the Periodic Query Service + * to inform workers to process results for a given Periodic Query with the + * indicated id. Additionally, the CommandNotification contains a + * {@link Command} about which action the + * {@link NotificationCoordinatorExecutor} should take (adding or deleting). + * CommandNotifications are meant to be added to an external work queue (such as + * Kafka) to be processed by the NotificationCoordinatorExecutor. + * + */ +public class CommandNotification implements Notification { + +private Notification notification; +private Command command; + +public enum Command { +ADD, DELETE +}; + +/** + * Creates a new CommandNotification + * @param command - the command associated with this notification (either add, update, or delete) + * @param notification - the underlying notification associated with this command + */ +public CommandNotification(Command command, Notification notification) { +Preconditions.checkNotNull(notification); +Preconditions.checkNotNull(command); +this.command = command; +this.notification = notification; +} + +@Override +public String getId() { +return notification.getId(); +} + +/** + * Returns {@link Notification} contained by this CommmandNotification. + * @return - Notification contained by this Object + */ +public Notification getNotification() { +return this.notification; +} + +/** + * @return Command contained by this Object (either add or delete) + */ +public Command getCommand() { +return this.command; +} + +@Override +public boolean equals(Object other) { +if (this == other) { +return true; +} +if (other instanceof CommandNotification) { +CommandNotification cn = (CommandNotification) other; +return Objects.equal(this.command, cn.command) && Objects.equal(this.notification, cn.notification); +} else { +return false; +} +} + +@Override +public int hashCode() { +int result = 17; +result = 31 * result + Objects.hashCode(command); --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r128041556 --- Diff: extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.notification; + +import org.apache.rya.periodic.notification.api.Notification; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; + +/** + * This Object contains a Notification Object used by the Periodic Query Service + * to inform workers to process results for a given Periodic Query with the + * indicated id. Additionally, the CommandNotification contains a + * {@link Command} about which action the + * {@link NotificationCoordinatorExecutor} should take (adding or deleting). + * CommandNotifications are meant to be added to an external work queue (such as + * Kafka) to be processed by the NotificationCoordinatorExecutor. + * + */ +public class CommandNotification implements Notification { + +private Notification notification; +private Command command; + +public enum Command { +ADD, DELETE +}; + +/** + * Creates a new CommandNotification + * @param command - the command associated with this notification (either add, update, or delete) + * @param notification - the underlying notification associated with this command + */ +public CommandNotification(Command command, Notification notification) { +Preconditions.checkNotNull(notification); +Preconditions.checkNotNull(command); --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r128041076 --- Diff: extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java --- @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.notification; + +import org.apache.rya.periodic.notification.api.Notification; + +import com.google.common.base.Objects; + +/** + * Notification Object used by the Periodic Query Service + * to inform workers to process results for a given Periodic + * Query with the indicated id. + * + */ +public class BasicNotification implements Notification { + +private String id; + +/** + * Creates a BasicNotification + * @param id - Fluo query id associated with this Notification + */ +public BasicNotification(String id) { +this.id = id; +} + +/** + * @return the Fluo Query Id that this notification will generate results for + */ +@Override +public String getId() { +return id; +} + +@Override +public boolean equals(Object other) { +if (this == other) { +return true; +} + +if (other instanceof BasicNotification) { +BasicNotification not = (BasicNotification) other; +return Objects.equal(this.id, not.id); +} + +return false; +} + +@Override +public int hashCode() { +int result = 17; --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r128001523 --- Diff: extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.application; + +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; +import org.apache.rya.periodic.notification.api.BinPruner; +import org.apache.rya.periodic.notification.api.LifeCycle; +import org.apache.rya.periodic.notification.api.NodeBin; +import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor; +import org.apache.rya.periodic.notification.exporter.BindingSetRecord; +import org.apache.rya.periodic.notification.exporter.KafkaExporterExecutor; +import org.apache.rya.periodic.notification.processor.NotificationProcessorExecutor; +import org.apache.rya.periodic.notification.pruner.PeriodicQueryPrunerExecutor; +import org.apache.rya.periodic.notification.registration.kafka.KafkaNotificationProvider; +import org.openrdf.query.algebra.evaluation.function.Function; + +import com.google.common.base.Preconditions; + +/** + * The PeriodicNotificationApplication runs the key components of the Periodic + * Query Service. It consists of a {@link KafkaNotificationProvider}, a + * {@link NotificationCoordinatorExecutor}, a + * {@link NotificationProcessorExecutor}, a {@link KafkaExporterExecutor}, and a + * {@link PeriodicQueryPrunerExecutor}. These services run in coordination with + * one another to perform the following tasks in the indicated order: + * Retrieve new requests to generate periodic notifications from Kafka + * Register them with the {@link NotificationCoordinatorExecutor} to + * generate the periodic notifications + * As notifications are generated, they are added to a work queue that is + * monitored by the {@link NotificationProcessorExecutor}. + * The processor processes the notifications by reading all of the query + * results corresponding to the bin and query id indicated by the notification. + * After reading the results, the processor adds a {@link BindingSetRecord} + * to a work queue monitored by the {@link KafkaExporterExecutor}. + * The processor then adds a {@link NodeBin} to a workqueue monitored by the + * {@link BinPruner} + * The exporter processes the BindingSetRecord by exporing the result to + * Kafka + * The BinPruner processes the NodeBin by cleaning up the results for the + * indicated bin and query in Accumulo and Fluo. + * + * The purpose of this Periodic Query Service is to facilitate the ability to + * answer Periodic Queries using the Rya Fluo application, where a Periodic + * Query is any query requesting periodic updates about events that occurred + * within a given window of time of this instant. This is also known as a + * rolling window query. Period Queries can be expressed using SPARQL by + * including the {@link Function} indicated by the URI + * {@link PeriodicQueryUtil#PeriodicQueryURI}. The user must provide this + * Function with the following arguments: the temporal variable in the query + * that will be filtered on, the window of time that events must occur within, + * the period at which the user wants to receive updates, and the time unit. The + * following query requests all observations that occurred within the last + * minute and requests updates every 15 seconds. It also performs a count on + * those observations. + * + * prefix function: http://org.apache.rya/function# + * "prefix time: http://www.w3.org/2006/time# + * "select (count(?obs) as ?total) where { + * "Filter(function:periodic(?time, 1, .25, time:minutes)) + * "?obs uri:hasTime ?time. + * "?obs u
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r128001253 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.batch.serializer; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.Optional; + +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Span; +import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side; +import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation; +import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task; +import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation; +import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.junit.Test; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +public class BatchInformationSerializerTest { + +@Test +public void testSpanBatchInformationSerialization() { + +SpanBatchDeleteInformation batch = SpanBatchDeleteInformation.builder().setBatchSize(1000) + .setColumn(FluoQueryColumns.PERIODIC_QUERY_BINDING_SET).setSpan(Span.prefix(Bytes.of("prefix"))).build(); +System.out.println(batch); +byte[] batchBytes = BatchInformationSerializer.toBytes(batch); +Optional decodedBatch = BatchInformationSerializer.fromBytes(batchBytes); +System.out.println(decodedBatch); +assertEquals(batch, decodedBatch.get()); +} + +@Test +public void testJoinBatchInformationSerialization() { + +QueryBindingSet bs = new QueryBindingSet(); +bs.addBinding("a", new URIImpl("urn:123")); +bs.addBinding("b", new URIImpl("urn:456")); +VisibilityBindingSet vBis = new VisibilityBindingSet(bs, "FOUO"); + +JoinBatchInformation batch = JoinBatchInformation.builder().setBatchSize(1000).setTask(Task.Update) + .setColumn(FluoQueryColumns.PERIODIC_QUERY_BINDING_SET).setSpan(Span.prefix(Bytes.of("prefix346"))) + .setJoinType(JoinType.LEFT_OUTER_JOIN).setSide(Side.RIGHT).setVarOrder(new VariableOrder(Arrays.asList("a", "b"))) +.setBs(vBis).build(); + +System.out.println(batch); --- End diff -- deleted --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r128000960 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryNode.java --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; +import org.openrdf.query.algebra.QueryModelVisitor; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.UnaryTupleOperator; +import org.openrdf.query.algebra.evaluation.function.Function; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkArgument; + +/** + * This is a {@link UnaryTupleOperator} that gets placed in the parsed query + * {@link TupleExpr} when a {@link Filter} is encountered in the SPARQL String that + * contains the Periodic {@link Function} {@link PeriodicQueryUtil#PeriodicQueryURI}. + * The PeiodicQueryNode is created from the arguments passed to the Periodic Function, + * which consist of a time unit, a temporal period, a temporal window of time, and the + * temporal variable in the query, which assumes a value indicated by the + * Time ontology: http://www.w3.org/2006/time. The purpose of the PeriodicQueryNode + * is to filter out all events that did not occur within the specified window of time + * of this instant and to generate notifications at a regular interval indicated by the period. + * + */ +public class PeriodicQueryNode extends UnaryTupleOperator { + +private TimeUnit unit; +private long windowDuration; +private long periodDuration; +private String temporalVar; + +/** + * Creates a PeriodicQueryNode from the specified values. + * @param window - specifies the window of time that event must occur within from this instant + * @param period - regular interval at which notifications are generated (must be leq window). + * @param unit - time unit of the period and window + * @param temporalVar - temporal variable in query used for filtering + * @param arg - child of PeriodicQueryNode in parsed query + */ +public PeriodicQueryNode(long window, long period, TimeUnit unit, String temporalVar, TupleExpr arg) { +super(arg); +checkArgument(period <= window); +checkNotNull(temporalVar); +checkNotNull(arg); +checkNotNull(unit); +this.windowDuration = window; --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r128000243 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java --- @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; + +/** + * Metadata that is required for periodic queries in the Rya Fluo Application. + * If a periodic query is registered with the Rya Fluo application, the BindingSets + * are placed into temporal bins according to whether they occur within the window of + * a period's ending time. This Metadata is used to create a Bin Id, which is equivalent + * to the period's ending time, to be inserted into each BindingSet that occurs within that + * bin. This is to allow the AggregationUpdater to aggregate the bins by grouping on the + * Bin Id. + * + */ +public class PeriodicQueryMetadata extends CommonNodeMetadata { + +private String parentNodeId; +private String childNodeId; +private long windowSize; +private long period; +private TimeUnit unit; +private String temporalVariable; + +/** + * Constructs an instance of PeriodicQueryMetadata + * @param nodeId - id of periodic query node + * @param varOrder - variable order indicating the order the BindingSet results are written in + * @param parentNodeId - id of parent node + * @param childNodeId - id of child node + * @param windowSize - size of window used for filtering + * @param period - period size that indicates frequency of notifications + * @param unit - TimeUnit corresponding to window and period + * @param temporalVariable - temporal variable that periodic conditions are applied to + */ +public PeriodicQueryMetadata(String nodeId, VariableOrder varOrder, String parentNodeId, String childNodeId, long windowSize, long period, +TimeUnit unit, String temporalVariable) { +super(nodeId, varOrder); +Preconditions.checkNotNull(parentNodeId); +Preconditions.checkNotNull(childNodeId); +Preconditions.checkNotNull(temporalVariable); +Preconditions.checkNotNull(unit); +Preconditions.checkNotNull(period > 0); --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127999634 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.observers; + +import static java.util.Objects.requireNonNull; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; + --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127997700 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java --- @@ -36,10 +36,11 @@ * Incrementally exports SPARQL query results to Kafka topics. */ public class KafkaBindingSetExporter implements IncrementalBindingSetExporter { + --- End diff -- Okay. Well that's good. Can't tell if you're being sarcastic or something. No need to leave a comment if nothing needs to be changed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127997246 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java --- @@ -0,0 +1,92 @@ +package org.apache.rya.indexing.pcj.fluo.app.batch; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.Span; + +/** + * This class represents a batch order to delete all entries in the Fluo table indicated + * by the given Span and Column. These batch orders are processed by the {@link BatchObserver}, + * which uses this batch information along with the nodeId passed into the Observer to perform + * batch deletes. + * + */ +public class SpanBatchDeleteInformation extends AbstractSpanBatchInformation { + +private static final BatchBindingSetUpdater updater = new SpanBatchBindingSetUpdater(); + +public SpanBatchDeleteInformation(int batchSize, Column column, Span span) { +super(batchSize, Task.Delete, column, span); +} + +/** + * @return Updater that applies the {@link Task} to the given {@link Span} and {@link Column} + */ +@Override +public BatchBindingSetUpdater getBatchUpdater() { +return updater; +} + + +public static Builder builder() { +return new Builder(); +} + +public static class Builder { + +private int batchSize = DEFAULT_BATCH_SIZE; +private Column column; +private Span span; + +/** + * @param batchSize - {@link Task}s are applied in batches of this size + */ +public Builder setBatchSize(int batchSize) { +this.batchSize = batchSize; +return this; +} + +/** + * Sets column to apply batch {@link Task} to + * @param column - column batch Task will be applied to + * @return + */ +public Builder setColumn(Column column) { +this.column = column; +return this; +} + +/** + * @param span - span that batch {@link Task} will be applied to + * + */ +public Builder setSpan(Span span) { +this.span = span; +return this; +} + + +public SpanBatchDeleteInformation build() { --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127997010 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java --- @@ -0,0 +1,92 @@ +package org.apache.rya.indexing.pcj.fluo.app.batch; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.Span; + +/** + * This class represents a batch order to delete all entries in the Fluo table indicated + * by the given Span and Column. These batch orders are processed by the {@link BatchObserver}, + * which uses this batch information along with the nodeId passed into the Observer to perform + * batch deletes. + * + */ +public class SpanBatchDeleteInformation extends AbstractSpanBatchInformation { + +private static final BatchBindingSetUpdater updater = new SpanBatchBindingSetUpdater(); + +public SpanBatchDeleteInformation(int batchSize, Column column, Span span) { +super(batchSize, Task.Delete, column, span); +} + +/** + * @return Updater that applies the {@link Task} to the given {@link Span} and {@link Column} + */ +@Override +public BatchBindingSetUpdater getBatchUpdater() { +return updater; +} + + +public static Builder builder() { --- End diff -- I think this one is pretty clear. Not providing class docs here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127996635 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java --- @@ -0,0 +1,128 @@ +package org.apache.rya.indexing.pcj.fluo.app.batch; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.util.Iterator; +import java.util.Optional; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.client.scanner.ColumnScanner; +import org.apache.fluo.api.client.scanner.RowScanner; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.ColumnValue; +import org.apache.fluo.api.data.RowColumn; +import org.apache.fluo.api.data.Span; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task; + +import com.google.common.base.Preconditions; + +/** + * This class processes {@link SpanBatchDeleteInformation} objects by + * deleting the entries in the Fluo Column corresponding to the {@link Span} + * of the BatchInformation object. This class will delete entries until the + * batch size is met, and then create a new SpanBatchDeleteInformation object + * with an updated Span whose starting point is the stopping point of this + * batch. If the batch limit is not met, then a new batch is not created and + * the task is complete. + * + */ +public class SpanBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater { + +private static final Logger log = Logger.getLogger(SpanBatchBindingSetUpdater.class); + +/** + * Process SpanBatchDeleteInformation objects by deleting all entries indicated + * by Span until batch limit is met. + * @param tx - Fluo Transaction + * @param row - Byte row identifying BatchInformation + * @param batch - SpanBatchDeleteInformation object to be processed + */ +@Override +public void processBatch(TransactionBase tx, Bytes row, BatchInformation batch) throws Exception { +super.processBatch(tx, row, batch); +Preconditions.checkArgument(batch instanceof SpanBatchDeleteInformation); +SpanBatchDeleteInformation spanBatch = (SpanBatchDeleteInformation) batch; +Task task = spanBatch.getTask(); +int batchSize = spanBatch.getBatchSize(); +Span span = spanBatch.getSpan(); +Column column = batch.getColumn(); +Optional rowCol = Optional.empty(); + +switch (task) { +case Add: +log.trace("The Task Add is not supported for SpanBatchBindingSetUpdater. Batch " + batch + " will not be processed."); --- End diff -- I don't think you want to kill the whole application here. If an invalid batch request is made, the application simply logs that it can't process it, then moves on. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127996045 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java --- @@ -0,0 +1,262 @@ +package org.apache.rya.indexing.pcj.fluo.app.batch; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.util.Objects; + +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.Span; +import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side; +import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.openrdf.query.Binding; + +import jline.internal.Preconditions; + +/** + * This class updates join results based on parameters specified for the join's + * children. The join has two children, and for one child a VisibilityBindingSet + * is specified along with the Side of that child. This BindingSet represents an + * update to that join child. For the other child, a Span, Column and + * VariableOrder are specified. This is so that the sibling node (the node that + * wasn't updated) can be scanned to obtain results that can be joined with the + * VisibilityBindingSet. The assumption here is that the Span is derived from + * the {@link Binding}s of common variables between the join children, with + * Values ordered according to the indicated {@link VariableOrder}. This class + * represents a batch order to perform a given task on join BindingSet results. + * The {@link Task} is to Add, Delete, or Update. This batch order is processed + * by the {@link BatchObserver} and used with the nodeId provided to the + * Observer to process the Task specified by the batch order. If the Task is to + * add, the BatchBindingSetUpdater returned by + * {@link JoinBatchInformation#getBatchUpdater()} will scan the join's child for + * results using the indicated Span and Column. These results are joined with + * the indicated VisibilityBindingSet, and the results are added to the parent + * join. The other Tasks are performed analogously. + * + */ +public class JoinBatchInformation extends AbstractSpanBatchInformation { + +private static final BatchBindingSetUpdater updater = new JoinBatchBindingSetUpdater(); +private VisibilityBindingSet bs; //update for join child indicated by side +private VariableOrder varOrder; //variable order for child indicated by Span +private Side side; //join child that was updated by bs +private JoinType join; +/** + * @param batchSize - batch size that Tasks are performed in + * @param task - Add, Delete, or Update + * @param column - Column of join child to be scanned + * @param span - span of join child to be scanned (derived from common variables of left and right join children) + * @param bs - BindingSet to be joined with results of child scan + * @param varOrder - VariableOrder used to form join (order for join child corresponding to Span) + * @param side - The side of the child that the VisibilityBindingSet update occurred at + * @param join - JoinType (left, right, natural inner) + */ +public JoinBatchInformation(int batchSize, Task task, Column column, Span span, VisibilityBindingSet bs, VariableOrder varOrder, Side side, JoinType join) { +super(batchSize, task, column, span); +Preconditions.checkNotNull(bs); +Preconditions.checkNotNull(varOrder); +Preconditions.checkNotNull(side); +Preconditions.checkNotNull(join); +this.bs = bs; +this.varOrder = varOrder; +this.join = join; +this.side = side; +} + +public JoinBatchInformation(Task task, Colum
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127995755 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java --- @@ -0,0 +1,262 @@ +package org.apache.rya.indexing.pcj.fluo.app.batch; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.util.Objects; + +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.Span; +import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side; +import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.openrdf.query.Binding; + +import jline.internal.Preconditions; + +/** + * This class updates join results based on parameters specified for the join's + * children. The join has two children, and for one child a VisibilityBindingSet + * is specified along with the Side of that child. This BindingSet represents an + * update to that join child. For the other child, a Span, Column and + * VariableOrder are specified. This is so that the sibling node (the node that + * wasn't updated) can be scanned to obtain results that can be joined with the + * VisibilityBindingSet. The assumption here is that the Span is derived from + * the {@link Binding}s of common variables between the join children, with + * Values ordered according to the indicated {@link VariableOrder}. This class + * represents a batch order to perform a given task on join BindingSet results. + * The {@link Task} is to Add, Delete, or Update. This batch order is processed + * by the {@link BatchObserver} and used with the nodeId provided to the + * Observer to process the Task specified by the batch order. If the Task is to + * add, the BatchBindingSetUpdater returned by + * {@link JoinBatchInformation#getBatchUpdater()} will scan the join's child for + * results using the indicated Span and Column. These results are joined with + * the indicated VisibilityBindingSet, and the results are added to the parent + * join. The other Tasks are performed analogously. + * + */ +public class JoinBatchInformation extends AbstractSpanBatchInformation { + +private static final BatchBindingSetUpdater updater = new JoinBatchBindingSetUpdater(); +private VisibilityBindingSet bs; //update for join child indicated by side +private VariableOrder varOrder; //variable order for child indicated by Span +private Side side; //join child that was updated by bs +private JoinType join; +/** + * @param batchSize - batch size that Tasks are performed in + * @param task - Add, Delete, or Update + * @param column - Column of join child to be scanned + * @param span - span of join child to be scanned (derived from common variables of left and right join children) + * @param bs - BindingSet to be joined with results of child scan + * @param varOrder - VariableOrder used to form join (order for join child corresponding to Span) + * @param side - The side of the child that the VisibilityBindingSet update occurred at + * @param join - JoinType (left, right, natural inner) + */ +public JoinBatchInformation(int batchSize, Task task, Column column, Span span, VisibilityBindingSet bs, VariableOrder varOrder, Side side, JoinType join) { +super(batchSize, task, column, span); +Preconditions.checkNotNull(bs); +Preconditions.checkNotNull(varOrder); +Preconditions.checkNotNull(side); +Preconditions.checkNotNull(join); +this.bs = bs; +this.varOrder = varOrder; +this.join = join; +this.side = side; +} + +public JoinBatchInformation(Task task, Colum
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127995235 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java --- @@ -0,0 +1,262 @@ +package org.apache.rya.indexing.pcj.fluo.app.batch; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.util.Objects; + +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.Span; +import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side; +import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.openrdf.query.Binding; + +import jline.internal.Preconditions; + +/** + * This class updates join results based on parameters specified for the join's + * children. The join has two children, and for one child a VisibilityBindingSet + * is specified along with the Side of that child. This BindingSet represents an + * update to that join child. For the other child, a Span, Column and + * VariableOrder are specified. This is so that the sibling node (the node that + * wasn't updated) can be scanned to obtain results that can be joined with the + * VisibilityBindingSet. The assumption here is that the Span is derived from + * the {@link Binding}s of common variables between the join children, with + * Values ordered according to the indicated {@link VariableOrder}. This class + * represents a batch order to perform a given task on join BindingSet results. + * The {@link Task} is to Add, Delete, or Update. This batch order is processed + * by the {@link BatchObserver} and used with the nodeId provided to the + * Observer to process the Task specified by the batch order. If the Task is to + * add, the BatchBindingSetUpdater returned by + * {@link JoinBatchInformation#getBatchUpdater()} will scan the join's child for + * results using the indicated Span and Column. These results are joined with + * the indicated VisibilityBindingSet, and the results are added to the parent + * join. The other Tasks are performed analogously. + * + */ +public class JoinBatchInformation extends AbstractSpanBatchInformation { + +private static final BatchBindingSetUpdater updater = new JoinBatchBindingSetUpdater(); +private VisibilityBindingSet bs; //update for join child indicated by side +private VariableOrder varOrder; //variable order for child indicated by Span +private Side side; //join child that was updated by bs +private JoinType join; +/** + * @param batchSize - batch size that Tasks are performed in + * @param task - Add, Delete, or Update + * @param column - Column of join child to be scanned + * @param span - span of join child to be scanned (derived from common variables of left and right join children) + * @param bs - BindingSet to be joined with results of child scan + * @param varOrder - VariableOrder used to form join (order for join child corresponding to Span) + * @param side - The side of the child that the VisibilityBindingSet update occurred at + * @param join - JoinType (left, right, natural inner) + */ +public JoinBatchInformation(int batchSize, Task task, Column column, Span span, VisibilityBindingSet bs, VariableOrder varOrder, Side side, JoinType join) { +super(batchSize, task, column, span); +Preconditions.checkNotNull(bs); +Preconditions.checkNotNull(varOrder); +Preconditions.checkNotNull(side); +Preconditions.checkNotNull(join); +this.bs = bs; --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not ha
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127995043 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BasicBatchInformation.java --- @@ -0,0 +1,82 @@ +package org.apache.rya.indexing.pcj.fluo.app.batch; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import org.apache.fluo.api.data.Column; + +import com.google.common.base.Preconditions; + +/** + * This class contains all of the common info contained in other implementations + * of BatchInformation. + * + */ +public abstract class BasicBatchInformation implements BatchInformation { + +private int batchSize; +private Task task; +private Column column; + +/** + * Create BasicBatchInformation object + * @param batchSize - size of batch to be processed + * @param task - task to be processed + * @param column - Column in which data is proessed + */ +public BasicBatchInformation(int batchSize, Task task, Column column ) { +Preconditions.checkNotNull(task); --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127994577 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.batch; + +import java.util.Objects; + +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.Span; + +import jline.internal.Preconditions; + +/** + * Abstract class for generating span based notifications. A spanned notification + * uses a {@link Span} to begin processing a Fluo Column at the position designated by the Span. + * + */ +public abstract class AbstractSpanBatchInformation extends BasicBatchInformation { + +private Span span; + +/** + * Create AbstractBatchInformation + * @param batchSize - size of batch to be processed + * @param task - type of task processed (Add, Delete, Udpate) + * @param column - Cpolumn that Span notification is applied + * @param span - span used to indicate where processing should begin + */ +public AbstractSpanBatchInformation(int batchSize, Task task, Column column, Span span) { +super(batchSize, task, column); +Preconditions.checkNotNull(span); +this.span = span; +} + +public AbstractSpanBatchInformation(Task task, Column column, Span span) { +this(DEFAULT_BATCH_SIZE, task, column, span); +} + +/** + * @return Span that batch Task will be applied to + */ +public Span getSpan() { +return span; +} + +/** + * Sets span to which batch Task will be applied + * @param span + */ +public void setSpan(Span span) { +this.span = span; +} + +@Override +public String toString() { +return new StringBuilder() +.append("Span Batch Information {\n") +.append("Span: " + span + "\n") +.append("Batch Size: " + super.getBatchSize() + "\n") +.append("Task: " + super.getTask() + "\n") +.append("Column: " + super.getColumn() + "\n") +.append("}") +.toString(); +} + +@Override +public boolean equals(Object other) { +if (this == other) { +return true; +} + +if (!(other instanceof AbstractSpanBatchInformation)) { +return false; +} + +AbstractSpanBatchInformation batch = (AbstractSpanBatchInformation) other; +return (super.getBatchSize() == batch.getBatchSize()) && Objects.equals(super.getColumn(), batch.getColumn()) && Objects.equals(this.span, batch.span) +&& Objects.equals(super.getTask(), batch.getTask()); +} + +@Override +public int hashCode() { +int result = 17; --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127993938 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.batch; + +import java.util.Objects; + +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.Span; + +import jline.internal.Preconditions; + +/** + * Abstract class for generating span based notifications. A spanned notification + * uses a {@link Span} to begin processing a Fluo Column at the position designated by the Span. + * + */ +public abstract class AbstractSpanBatchInformation extends BasicBatchInformation { + +private Span span; + +/** + * Create AbstractBatchInformation + * @param batchSize - size of batch to be processed + * @param task - type of task processed (Add, Delete, Udpate) + * @param column - Cpolumn that Span notification is applied + * @param span - span used to indicate where processing should begin + */ +public AbstractSpanBatchInformation(int batchSize, Task task, Column column, Span span) { +super(batchSize, task, column); +Preconditions.checkNotNull(span); --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127993723 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.batch; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.RowColumn; +import org.apache.fluo.api.data.Span; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; + +public abstract class AbstractBatchBindingSetUpdater implements BatchBindingSetUpdater { --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127993037 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java --- @@ -43,6 +43,7 @@ import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; --- End diff -- yep --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127992511 --- Diff: extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java --- @@ -67,7 +68,8 @@ /** * Constructs an instance of {@link DeletePcj}. * - * @param batchSize - The number of entries that will be deleted at a time. (> 0) + * @param batchSize + *- The number of entries that will be deleted at a time. (> 0) --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127990899 --- Diff: extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetSerDe.java --- @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +import org.apache.fluo.api.data.Bytes; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Serializes and deserializes a {@link VisibilityBindingSet} to and from {@link Bytes} objects. + */ +@DefaultAnnotation(NonNull.class) +public class VisibilityBindingSetSerDe { --- End diff -- This is actually a duplicate class. I think I needed to migrate the original to this project, but forgot to delete the original. Deleted the original. Not my class, so I didn't decide on the naming conventions here. Leaving as is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127988392 --- Diff: extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PeriodicQueryTableNameFactory.java --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo; + +import static java.util.Objects.requireNonNull; + +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; + +/** + * Class for creating the names of {@link PeriodicQueryResultStorage} tables. + * + */ +public class PeriodicQueryTableNameFactory { + +public static final String PeriodicTableSuffix = "PERIODIC_QUERY_"; --- End diff -- It actually is a suffix. This follows the rya prefix in the table name. A UUID is appended after this in the table name, which is why the underscore is added. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127987608 --- Diff: extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java --- @@ -412,9 +413,9 @@ private void writeResults( // Row ID = binding set values, Column Family = variable order of the binding set. final Mutation addResult = new Mutation(rowKey); final String visibility = result.getVisibility(); -addResult.put(varOrder.toString(), "", new ColumnVisibility(visibility), ""); +addResult.put(varOrder.toString(), "", new ColumnVisibility(visibility), new Value(bsSerDe.serialize(result).toArray())); mutations.add(addResult); -} catch(final BindingSetConversionException e) { +} catch(Exception e) { --- End diff -- Just converted back to the original Exception. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127986867 --- Diff: extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java --- @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchDeleter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.io.Text; +import org.apache.rya.indexing.pcj.storage.PCJIdFactory; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; +import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; +import org.openrdf.model.impl.LiteralImpl; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.algebra.AggregateOperatorBase; +import org.openrdf.query.algebra.ExtensionElem; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import com.google.common.base.Preconditions; + +/** + * This class is the Accumulo implementation of {@link PeriodicQueryResultStorage} for + * creating, deleting, and interacting with tables where PeriodicQuery results are stored. + */ +public class AccumuloPeriodicQueryResultStorage implements PeriodicQueryResultStorage { + +private String ryaInstance; +private Connector accumuloConn; +private Authorizations auths; +private final PCJIdFactory pcjIdFactory = new PCJIdFactory(); +private final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); +private static final PcjTables pcjTables = new PcjTables(); +private static final PeriodicQueryTableNameFactory tableNameFactory = new PeriodicQueryTableNameFactory(); + +/** + * Creates a AccumuloPeriodicQueryResultStorage Object. + * @param accumuloConn - Accumulo Connector for connecting to an Accumulo instance + * @param ryaInstance - Rya Instance name for connecting to Rya + */ +public AccumuloPeriodicQueryResultStorage(Connector accumuloConn, String ryaInstance) { +this.accumuloConn = accumuloConn; +this.ryaInstance = ryaInstance; +String user = accumuloConn.whoami(); +try { +this.auths = accumuloConn.securityOperations().getUserAuthorizations(user); +} catch (AccumuloException | AccumuloSecurityException e) { +throw new RuntimeException("Unable access user: " + user + "authorizations."); +} +} + +@Override +public String createPeriodicQuery(String sparql) throws PeriodicQueryStorageException { +Preconditions.checkNotNull(sparql); +String queryId = pcjIdFactory.nextId(); +r
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127986607 --- Diff: extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java --- @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchDeleter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.io.Text; +import org.apache.rya.indexing.pcj.storage.PCJIdFactory; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; +import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; +import org.openrdf.model.impl.LiteralImpl; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.algebra.AggregateOperatorBase; +import org.openrdf.query.algebra.ExtensionElem; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import com.google.common.base.Preconditions; + +/** + * This class is the Accumulo implementation of {@link PeriodicQueryResultStorage} for + * creating, deleting, and interacting with tables where PeriodicQuery results are stored. + */ +public class AccumuloPeriodicQueryResultStorage implements PeriodicQueryResultStorage { + +private String ryaInstance; +private Connector accumuloConn; +private Authorizations auths; +private final PCJIdFactory pcjIdFactory = new PCJIdFactory(); +private final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); +private static final PcjTables pcjTables = new PcjTables(); +private static final PeriodicQueryTableNameFactory tableNameFactory = new PeriodicQueryTableNameFactory(); + +/** + * Creates a AccumuloPeriodicQueryResultStorage Object. + * @param accumuloConn - Accumulo Connector for connecting to an Accumulo instance + * @param ryaInstance - Rya Instance name for connecting to Rya + */ +public AccumuloPeriodicQueryResultStorage(Connector accumuloConn, String ryaInstance) { +this.accumuloConn = accumuloConn; +this.ryaInstance = ryaInstance; +String user = accumuloConn.whoami(); +try { +this.auths = accumuloConn.securityOperations().getUserAuthorizations(user); +} catch (AccumuloException | AccumuloSecurityException e) { +throw new RuntimeException("Unable access user: " + user + "authorizations."); +} +} + +@Override +public String createPeriodicQuery(String sparql) throws PeriodicQueryStorageException { +Preconditions.checkNotNull(sparql); +String queryId = pcjIdFactory.nextId(); +r
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127984352 --- Diff: extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java --- @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchDeleter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.io.Text; +import org.apache.rya.indexing.pcj.storage.PCJIdFactory; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; +import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; +import org.openrdf.model.impl.LiteralImpl; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.algebra.AggregateOperatorBase; +import org.openrdf.query.algebra.ExtensionElem; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import com.google.common.base.Preconditions; + +/** + * This class is the Accumulo implementation of {@link PeriodicQueryResultStorage} for + * creating, deleting, and interacting with tables where PeriodicQuery results are stored. + */ +public class AccumuloPeriodicQueryResultStorage implements PeriodicQueryResultStorage { + +private String ryaInstance; +private Connector accumuloConn; +private Authorizations auths; +private final PCJIdFactory pcjIdFactory = new PCJIdFactory(); +private final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); +private static final PcjTables pcjTables = new PcjTables(); +private static final PeriodicQueryTableNameFactory tableNameFactory = new PeriodicQueryTableNameFactory(); + +/** + * Creates a AccumuloPeriodicQueryResultStorage Object. + * @param accumuloConn - Accumulo Connector for connecting to an Accumulo instance + * @param ryaInstance - Rya Instance name for connecting to Rya + */ +public AccumuloPeriodicQueryResultStorage(Connector accumuloConn, String ryaInstance) { +this.accumuloConn = accumuloConn; +this.ryaInstance = ryaInstance; +String user = accumuloConn.whoami(); +try { +this.auths = accumuloConn.securityOperations().getUserAuthorizations(user); +} catch (AccumuloException | AccumuloSecurityException e) { +throw new RuntimeException("Unable access user: " + user + "authorizations."); +} +} + +@Override +public String createPeriodicQuery(String sparql) throws PeriodicQueryStorageException { +Preconditions.checkNotNull(sparql); +String queryId = pcjIdFactory.nextId(); +r
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127981764 --- Diff: extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java --- @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchDeleter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.io.Text; +import org.apache.rya.indexing.pcj.storage.PCJIdFactory; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; +import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; +import org.openrdf.model.impl.LiteralImpl; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.algebra.AggregateOperatorBase; +import org.openrdf.query.algebra.ExtensionElem; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import com.google.common.base.Preconditions; + +/** + * This class is the Accumulo implementation of {@link PeriodicQueryResultStorage} for + * creating, deleting, and interacting with tables where PeriodicQuery results are stored. + */ +public class AccumuloPeriodicQueryResultStorage implements PeriodicQueryResultStorage { + +private String ryaInstance; +private Connector accumuloConn; +private Authorizations auths; +private final PCJIdFactory pcjIdFactory = new PCJIdFactory(); +private final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); +private static final PcjTables pcjTables = new PcjTables(); +private static final PeriodicQueryTableNameFactory tableNameFactory = new PeriodicQueryTableNameFactory(); + +/** + * Creates a AccumuloPeriodicQueryResultStorage Object. + * @param accumuloConn - Accumulo Connector for connecting to an Accumulo instance + * @param ryaInstance - Rya Instance name for connecting to Rya + */ +public AccumuloPeriodicQueryResultStorage(Connector accumuloConn, String ryaInstance) { +this.accumuloConn = accumuloConn; --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127981484 --- Diff: extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryStorageMetadata.java --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage; + +import java.util.Objects; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; + +import com.google.common.base.Preconditions; + +/** + * Metadata for a given PeriodicQueryStorage table. + */ +public class PeriodicQueryStorageMetadata { + +private String sparql; +private VariableOrder varOrder; + +/** + * Create a PeriodicQueryStorageMetadat object + * @param sparql - SPARQL query whose results are stored in table + * @param varOrder - order that BindingSet values are written in in table + */ +public PeriodicQueryStorageMetadata(String sparql, VariableOrder varOrder) { +Preconditions.checkNotNull(sparql); --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127981314 --- Diff: extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryStorageException.java --- @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage; + +public class PeriodicQueryStorageException extends Exception { --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127736780 --- Diff: extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java --- @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchDeleter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.io.Text; +import org.apache.rya.indexing.pcj.storage.PCJIdFactory; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; +import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; +import org.openrdf.model.impl.LiteralImpl; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.algebra.AggregateOperatorBase; +import org.openrdf.query.algebra.ExtensionElem; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import com.google.common.base.Preconditions; + +/** + * This class is the Accumulo implementation of {@link PeriodicQueryResultStorage} for + * creating, deleting, and interacting with tables where PeriodicQuery results are stored. + */ +public class AccumuloPeriodicQueryResultStorage implements PeriodicQueryResultStorage { + +private String ryaInstance; +private Connector accumuloConn; +private Authorizations auths; +private final PCJIdFactory pcjIdFactory = new PCJIdFactory(); +private final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); +private static final PcjTables pcjTables = new PcjTables(); +private static final PeriodicQueryTableNameFactory tableNameFactory = new PeriodicQueryTableNameFactory(); + +/** + * Creates a AccumuloPeriodicQueryResultStorage Object. + * @param accumuloConn - Accumulo Connector for connecting to an Accumulo instance + * @param ryaInstance - Rya Instance name for connecting to Rya + */ +public AccumuloPeriodicQueryResultStorage(Connector accumuloConn, String ryaInstance) { +this.accumuloConn = accumuloConn; +this.ryaInstance = ryaInstance; +String user = accumuloConn.whoami(); +try { +this.auths = accumuloConn.securityOperations().getUserAuthorizations(user); +} catch (AccumuloException | AccumuloSecurityException e) { +throw new RuntimeException("Unable access user: " + user + "authorizations."); +} +} + +@Override +public String createPeriodicQuery(String sparql) throws PeriodicQueryStorageException { +Preconditions.checkNotNull(sparql); +String queryId = pcjIdFactory.nextId(); +ret
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127734223 --- Diff: extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java --- @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchDeleter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.io.Text; +import org.apache.rya.indexing.pcj.storage.PCJIdFactory; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; +import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; +import org.openrdf.model.impl.LiteralImpl; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.algebra.AggregateOperatorBase; +import org.openrdf.query.algebra.ExtensionElem; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import com.google.common.base.Preconditions; + +/** + * This class is the Accumulo implementation of {@link PeriodicQueryResultStorage} for + * creating, deleting, and interacting with tables where PeriodicQuery results are stored. + */ +public class AccumuloPeriodicQueryResultStorage implements PeriodicQueryResultStorage { + +private String ryaInstance; +private Connector accumuloConn; +private Authorizations auths; +private final PCJIdFactory pcjIdFactory = new PCJIdFactory(); +private final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); +private static final PcjTables pcjTables = new PcjTables(); +private static final PeriodicQueryTableNameFactory tableNameFactory = new PeriodicQueryTableNameFactory(); + +/** + * Creates a AccumuloPeriodicQueryResultStorage Object. + * @param accumuloConn - Accumulo Connector for connecting to an Accumulo instance + * @param ryaInstance - Rya Instance name for connecting to Rya + */ +public AccumuloPeriodicQueryResultStorage(Connector accumuloConn, String ryaInstance) { +this.accumuloConn = accumuloConn; +this.ryaInstance = ryaInstance; +String user = accumuloConn.whoami(); +try { +this.auths = accumuloConn.securityOperations().getUserAuthorizations(user); +} catch (AccumuloException | AccumuloSecurityException e) { +throw new RuntimeException("Unable access user: " + user + "authorizations."); +} +} + +@Override +public String createPeriodicQuery(String sparql) throws PeriodicQueryStorageException { +Preconditions.checkNotNull(sparql); +String queryId = pcjIdFactory.nextId(); +ret
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127732860 --- Diff: extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java --- @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchDeleter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.io.Text; +import org.apache.rya.indexing.pcj.storage.PCJIdFactory; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; +import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; +import org.openrdf.model.impl.LiteralImpl; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.algebra.AggregateOperatorBase; +import org.openrdf.query.algebra.ExtensionElem; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import com.google.common.base.Preconditions; + +/** + * This class is the Accumulo implementation of {@link PeriodicQueryResultStorage} for + * creating, deleting, and interacting with tables where PeriodicQuery results are stored. + */ +public class AccumuloPeriodicQueryResultStorage implements PeriodicQueryResultStorage { + +private String ryaInstance; +private Connector accumuloConn; +private Authorizations auths; +private final PCJIdFactory pcjIdFactory = new PCJIdFactory(); +private final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); +private static final PcjTables pcjTables = new PcjTables(); +private static final PeriodicQueryTableNameFactory tableNameFactory = new PeriodicQueryTableNameFactory(); + +/** + * Creates a AccumuloPeriodicQueryResultStorage Object. + * @param accumuloConn - Accumulo Connector for connecting to an Accumulo instance + * @param ryaInstance - Rya Instance name for connecting to Rya + */ +public AccumuloPeriodicQueryResultStorage(Connector accumuloConn, String ryaInstance) { +this.accumuloConn = accumuloConn; --- End diff -- precondition checks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127799075 --- Diff: extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.processor; + +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; +import org.apache.rya.periodic.notification.api.BinPruner; +import org.apache.rya.periodic.notification.api.NodeBin; +import org.apache.rya.periodic.notification.api.NotificationProcessor; +import org.apache.rya.periodic.notification.exporter.BindingSetRecord; +import org.apache.rya.periodic.notification.exporter.KafkaPeriodicBindingSetExporter; +import org.apache.rya.periodic.notification.notification.TimestampedNotification; +import org.openrdf.query.BindingSet; + +import com.google.common.base.Preconditions; + +/** + * Implementation of {@link NotificationProcessor} that uses the id indicated by + * the {@link TimestampedNotification} to obtain results from the + * {@link PeriodicQueryResultStorage} layer containing the results of the + * Periodic Query. The TimestampedNotificationProcessor then parses the results + * and adds them to work queues to be processed by the {@link BinPruner} and the + * {@link KafkaPeriodicBindingSetExporter}. + * + */ +public class TimestampedNotificationProcessor implements NotificationProcessor, Runnable { + +private static final Logger log = Logger.getLogger(TimestampedNotificationProcessor.class); +private PeriodicQueryResultStorage periodicStorage; +private BlockingQueue notifications; // notifications + // to process +private BlockingQueue bins; // entries to delete from Fluo +private BlockingQueue bindingSets; // query results to export +private AtomicBoolean closed = new AtomicBoolean(false); +private int threadNumber; + + +public TimestampedNotificationProcessor(PeriodicQueryResultStorage periodicStorage, +BlockingQueue notifications, BlockingQueue bins, BlockingQueue bindingSets, +int threadNumber) { +Preconditions.checkNotNull(notifications); +Preconditions.checkNotNull(bins); +Preconditions.checkNotNull(bindingSets); --- End diff -- here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127754845 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.batch; + +import java.util.Objects; + +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.Span; + +import jline.internal.Preconditions; + +/** + * Abstract class for generating span based notifications. A spanned notification + * uses a {@link Span} to begin processing a Fluo Column at the position designated by the Span. + * + */ +public abstract class AbstractSpanBatchInformation extends BasicBatchInformation { + +private Span span; + +/** + * Create AbstractBatchInformation + * @param batchSize - size of batch to be processed + * @param task - type of task processed (Add, Delete, Udpate) + * @param column - Cpolumn that Span notification is applied + * @param span - span used to indicate where processing should begin + */ +public AbstractSpanBatchInformation(int batchSize, Task task, Column column, Span span) { +super(batchSize, task, column); +Preconditions.checkNotNull(span); +this.span = span; +} + +public AbstractSpanBatchInformation(Task task, Column column, Span span) { +this(DEFAULT_BATCH_SIZE, task, column, span); +} + +/** + * @return Span that batch Task will be applied to + */ +public Span getSpan() { +return span; +} + +/** + * Sets span to which batch Task will be applied + * @param span + */ +public void setSpan(Span span) { +this.span = span; +} + +@Override +public String toString() { +return new StringBuilder() +.append("Span Batch Information {\n") +.append("Span: " + span + "\n") +.append("Batch Size: " + super.getBatchSize() + "\n") +.append("Task: " + super.getTask() + "\n") +.append("Column: " + super.getColumn() + "\n") +.append("}") +.toString(); +} + +@Override +public boolean equals(Object other) { +if (this == other) { +return true; +} + +if (!(other instanceof AbstractSpanBatchInformation)) { +return false; +} + +AbstractSpanBatchInformation batch = (AbstractSpanBatchInformation) other; +return (super.getBatchSize() == batch.getBatchSize()) && Objects.equals(super.getColumn(), batch.getColumn()) && Objects.equals(this.span, batch.span) +&& Objects.equals(super.getTask(), batch.getTask()); +} + +@Override +public int hashCode() { +int result = 17; --- End diff -- use Objects.hash() --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127798539 --- Diff: extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java --- @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.notification; + +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import org.apache.rya.periodic.notification.api.Notification; + +import com.google.common.base.Preconditions; + +/** + * Notification Object used by the Periodic Query Service to inform workers to + * process results for a given Periodic Query with the indicated id. + * Additionally, this Object contains a period that indicates a frequency at + * which regular updates are generated. + * + */ +public class PeriodicNotification implements Notification { + +private String id; +private long period; +private TimeUnit periodTimeUnit; +private long initialDelay; + +/** + * Creates a PeriodicNotification. + * @param id - Fluo Query Id that this notification is associated with + * @param period - period at which notifications are generated + * @param periodTimeUnit - time unit associated with the period and delay + * @param initialDelay - amount of time to wait before generating the first notification + */ +public PeriodicNotification(String id, long period, TimeUnit periodTimeUnit, long initialDelay) { +Preconditions.checkNotNull(id); +Preconditions.checkNotNull(periodTimeUnit); +Preconditions.checkArgument(period > 0 && initialDelay >= 0); --- End diff -- here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127754539 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.batch; + +import java.util.Objects; + +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.Span; + +import jline.internal.Preconditions; + +/** + * Abstract class for generating span based notifications. A spanned notification + * uses a {@link Span} to begin processing a Fluo Column at the position designated by the Span. + * + */ +public abstract class AbstractSpanBatchInformation extends BasicBatchInformation { + +private Span span; + +/** + * Create AbstractBatchInformation + * @param batchSize - size of batch to be processed + * @param task - type of task processed (Add, Delete, Udpate) + * @param column - Cpolumn that Span notification is applied + * @param span - span used to indicate where processing should begin + */ +public AbstractSpanBatchInformation(int batchSize, Task task, Column column, Span span) { +super(batchSize, task, column); +Preconditions.checkNotNull(span); --- End diff -- this.span = checkNotNull(span) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127769861 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java --- @@ -0,0 +1,262 @@ +package org.apache.rya.indexing.pcj.fluo.app.batch; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.util.Objects; + +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.Span; +import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side; +import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.openrdf.query.Binding; + +import jline.internal.Preconditions; + +/** + * This class updates join results based on parameters specified for the join's + * children. The join has two children, and for one child a VisibilityBindingSet + * is specified along with the Side of that child. This BindingSet represents an + * update to that join child. For the other child, a Span, Column and + * VariableOrder are specified. This is so that the sibling node (the node that + * wasn't updated) can be scanned to obtain results that can be joined with the + * VisibilityBindingSet. The assumption here is that the Span is derived from + * the {@link Binding}s of common variables between the join children, with + * Values ordered according to the indicated {@link VariableOrder}. This class + * represents a batch order to perform a given task on join BindingSet results. + * The {@link Task} is to Add, Delete, or Update. This batch order is processed + * by the {@link BatchObserver} and used with the nodeId provided to the + * Observer to process the Task specified by the batch order. If the Task is to + * add, the BatchBindingSetUpdater returned by + * {@link JoinBatchInformation#getBatchUpdater()} will scan the join's child for + * results using the indicated Span and Column. These results are joined with + * the indicated VisibilityBindingSet, and the results are added to the parent + * join. The other Tasks are performed analogously. + * + */ +public class JoinBatchInformation extends AbstractSpanBatchInformation { + +private static final BatchBindingSetUpdater updater = new JoinBatchBindingSetUpdater(); +private VisibilityBindingSet bs; //update for join child indicated by side +private VariableOrder varOrder; //variable order for child indicated by Span +private Side side; //join child that was updated by bs +private JoinType join; +/** + * @param batchSize - batch size that Tasks are performed in + * @param task - Add, Delete, or Update + * @param column - Column of join child to be scanned + * @param span - span of join child to be scanned (derived from common variables of left and right join children) + * @param bs - BindingSet to be joined with results of child scan + * @param varOrder - VariableOrder used to form join (order for join child corresponding to Span) + * @param side - The side of the child that the VisibilityBindingSet update occurred at + * @param join - JoinType (left, right, natural inner) + */ +public JoinBatchInformation(int batchSize, Task task, Column column, Span span, VisibilityBindingSet bs, VariableOrder varOrder, Side side, JoinType join) { +super(batchSize, task, column, span); +Preconditions.checkNotNull(bs); +Preconditions.checkNotNull(varOrder); +Preconditions.checkNotNull(side); +Preconditions.checkNotNull(join); +this.bs = bs; +this.varOrder = varOrder; +this.join = join; +this.side = side; +} + +public JoinBatchInformation(Task task, Column
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127798013 --- Diff: extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java --- @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.notification; + +import org.apache.rya.periodic.notification.api.Notification; + +import com.google.common.base.Objects; + +/** + * Notification Object used by the Periodic Query Service + * to inform workers to process results for a given Periodic + * Query with the indicated id. + * + */ +public class BasicNotification implements Notification { + +private String id; + +/** + * Creates a BasicNotification + * @param id - Fluo query id associated with this Notification + */ +public BasicNotification(String id) { +this.id = id; +} + +/** + * @return the Fluo Query Id that this notification will generate results for + */ +@Override +public String getId() { +return id; +} + +@Override +public boolean equals(Object other) { +if (this == other) { +return true; +} + +if (other instanceof BasicNotification) { +BasicNotification not = (BasicNotification) other; +return Objects.equal(this.id, not.id); +} + +return false; +} + +@Override +public int hashCode() { +int result = 17; --- End diff -- objects.hash() --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127769759 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java --- @@ -0,0 +1,262 @@ +package org.apache.rya.indexing.pcj.fluo.app.batch; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.util.Objects; + +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.Span; +import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side; +import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.openrdf.query.Binding; + +import jline.internal.Preconditions; + +/** + * This class updates join results based on parameters specified for the join's + * children. The join has two children, and for one child a VisibilityBindingSet + * is specified along with the Side of that child. This BindingSet represents an + * update to that join child. For the other child, a Span, Column and + * VariableOrder are specified. This is so that the sibling node (the node that + * wasn't updated) can be scanned to obtain results that can be joined with the + * VisibilityBindingSet. The assumption here is that the Span is derived from + * the {@link Binding}s of common variables between the join children, with + * Values ordered according to the indicated {@link VariableOrder}. This class + * represents a batch order to perform a given task on join BindingSet results. + * The {@link Task} is to Add, Delete, or Update. This batch order is processed + * by the {@link BatchObserver} and used with the nodeId provided to the + * Observer to process the Task specified by the batch order. If the Task is to + * add, the BatchBindingSetUpdater returned by + * {@link JoinBatchInformation#getBatchUpdater()} will scan the join's child for + * results using the indicated Span and Column. These results are joined with + * the indicated VisibilityBindingSet, and the results are added to the parent + * join. The other Tasks are performed analogously. + * + */ +public class JoinBatchInformation extends AbstractSpanBatchInformation { + +private static final BatchBindingSetUpdater updater = new JoinBatchBindingSetUpdater(); +private VisibilityBindingSet bs; //update for join child indicated by side +private VariableOrder varOrder; //variable order for child indicated by Span +private Side side; //join child that was updated by bs +private JoinType join; +/** + * @param batchSize - batch size that Tasks are performed in + * @param task - Add, Delete, or Update + * @param column - Column of join child to be scanned + * @param span - span of join child to be scanned (derived from common variables of left and right join children) + * @param bs - BindingSet to be joined with results of child scan + * @param varOrder - VariableOrder used to form join (order for join child corresponding to Span) + * @param side - The side of the child that the VisibilityBindingSet update occurred at + * @param join - JoinType (left, right, natural inner) + */ +public JoinBatchInformation(int batchSize, Task task, Column column, Span span, VisibilityBindingSet bs, VariableOrder varOrder, Side side, JoinType join) { +super(batchSize, task, column, span); +Preconditions.checkNotNull(bs); +Preconditions.checkNotNull(varOrder); +Preconditions.checkNotNull(side); +Preconditions.checkNotNull(join); +this.bs = bs; --- End diff -- same stuff here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project do
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127792656 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryNode.java --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; +import org.openrdf.query.algebra.QueryModelVisitor; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.UnaryTupleOperator; +import org.openrdf.query.algebra.evaluation.function.Function; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkArgument; + +/** + * This is a {@link UnaryTupleOperator} that gets placed in the parsed query + * {@link TupleExpr} when a {@link Filter} is encountered in the SPARQL String that + * contains the Periodic {@link Function} {@link PeriodicQueryUtil#PeriodicQueryURI}. + * The PeiodicQueryNode is created from the arguments passed to the Periodic Function, + * which consist of a time unit, a temporal period, a temporal window of time, and the + * temporal variable in the query, which assumes a value indicated by the + * Time ontology: http://www.w3.org/2006/time. The purpose of the PeriodicQueryNode + * is to filter out all events that did not occur within the specified window of time + * of this instant and to generate notifications at a regular interval indicated by the period. + * + */ +public class PeriodicQueryNode extends UnaryTupleOperator { + +private TimeUnit unit; +private long windowDuration; +private long periodDuration; +private String temporalVar; + +/** + * Creates a PeriodicQueryNode from the specified values. + * @param window - specifies the window of time that event must occur within from this instant + * @param period - regular interval at which notifications are generated (must be leq window). + * @param unit - time unit of the period and window + * @param temporalVar - temporal variable in query used for filtering + * @param arg - child of PeriodicQueryNode in parsed query + */ +public PeriodicQueryNode(long window, long period, TimeUnit unit, String temporalVar, TupleExpr arg) { +super(arg); +checkArgument(period <= window); +checkNotNull(temporalVar); +checkNotNull(arg); +checkNotNull(unit); +this.windowDuration = window; --- End diff -- = checkNotNull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127794413 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.batch.serializer; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.Optional; + +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Span; +import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side; +import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation; +import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task; +import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation; +import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.junit.Test; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +public class BatchInformationSerializerTest { + +@Test +public void testSpanBatchInformationSerialization() { + +SpanBatchDeleteInformation batch = SpanBatchDeleteInformation.builder().setBatchSize(1000) + .setColumn(FluoQueryColumns.PERIODIC_QUERY_BINDING_SET).setSpan(Span.prefix(Bytes.of("prefix"))).build(); +System.out.println(batch); +byte[] batchBytes = BatchInformationSerializer.toBytes(batch); +Optional decodedBatch = BatchInformationSerializer.fromBytes(batchBytes); +System.out.println(decodedBatch); +assertEquals(batch, decodedBatch.get()); +} + +@Test +public void testJoinBatchInformationSerialization() { + +QueryBindingSet bs = new QueryBindingSet(); +bs.addBinding("a", new URIImpl("urn:123")); +bs.addBinding("b", new URIImpl("urn:456")); +VisibilityBindingSet vBis = new VisibilityBindingSet(bs, "FOUO"); + +JoinBatchInformation batch = JoinBatchInformation.builder().setBatchSize(1000).setTask(Task.Update) + .setColumn(FluoQueryColumns.PERIODIC_QUERY_BINDING_SET).setSpan(Span.prefix(Bytes.of("prefix346"))) + .setJoinType(JoinType.LEFT_OUTER_JOIN).setSide(Side.RIGHT).setVarOrder(new VariableOrder(Arrays.asList("a", "b"))) +.setBs(vBis).build(); + +System.out.println(batch); --- End diff -- System.println()? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127798259 --- Diff: extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.notification; + +import org.apache.rya.periodic.notification.api.Notification; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; + +/** + * This Object contains a Notification Object used by the Periodic Query Service + * to inform workers to process results for a given Periodic Query with the + * indicated id. Additionally, the CommandNotification contains a + * {@link Command} about which action the + * {@link NotificationCoordinatorExecutor} should take (adding or deleting). + * CommandNotifications are meant to be added to an external work queue (such as + * Kafka) to be processed by the NotificationCoordinatorExecutor. + * + */ +public class CommandNotification implements Notification { + +private Notification notification; +private Command command; + +public enum Command { +ADD, DELETE +}; + +/** + * Creates a new CommandNotification + * @param command - the command associated with this notification (either add, update, or delete) + * @param notification - the underlying notification associated with this command + */ +public CommandNotification(Command command, Notification notification) { +Preconditions.checkNotNull(notification); +Preconditions.checkNotNull(command); --- End diff -- here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127769914 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java --- @@ -0,0 +1,262 @@ +package org.apache.rya.indexing.pcj.fluo.app.batch; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.util.Objects; + +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.Span; +import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side; +import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.openrdf.query.Binding; + +import jline.internal.Preconditions; + +/** + * This class updates join results based on parameters specified for the join's + * children. The join has two children, and for one child a VisibilityBindingSet + * is specified along with the Side of that child. This BindingSet represents an + * update to that join child. For the other child, a Span, Column and + * VariableOrder are specified. This is so that the sibling node (the node that + * wasn't updated) can be scanned to obtain results that can be joined with the + * VisibilityBindingSet. The assumption here is that the Span is derived from + * the {@link Binding}s of common variables between the join children, with + * Values ordered according to the indicated {@link VariableOrder}. This class + * represents a batch order to perform a given task on join BindingSet results. + * The {@link Task} is to Add, Delete, or Update. This batch order is processed + * by the {@link BatchObserver} and used with the nodeId provided to the + * Observer to process the Task specified by the batch order. If the Task is to + * add, the BatchBindingSetUpdater returned by + * {@link JoinBatchInformation#getBatchUpdater()} will scan the join's child for + * results using the indicated Span and Column. These results are joined with + * the indicated VisibilityBindingSet, and the results are added to the parent + * join. The other Tasks are performed analogously. + * + */ +public class JoinBatchInformation extends AbstractSpanBatchInformation { + +private static final BatchBindingSetUpdater updater = new JoinBatchBindingSetUpdater(); +private VisibilityBindingSet bs; //update for join child indicated by side +private VariableOrder varOrder; //variable order for child indicated by Span +private Side side; //join child that was updated by bs +private JoinType join; +/** + * @param batchSize - batch size that Tasks are performed in + * @param task - Add, Delete, or Update + * @param column - Column of join child to be scanned + * @param span - span of join child to be scanned (derived from common variables of left and right join children) + * @param bs - BindingSet to be joined with results of child scan + * @param varOrder - VariableOrder used to form join (order for join child corresponding to Span) + * @param side - The side of the child that the VisibilityBindingSet update occurred at + * @param join - JoinType (left, right, natural inner) + */ +public JoinBatchInformation(int batchSize, Task task, Column column, Span span, VisibilityBindingSet bs, VariableOrder varOrder, Side side, JoinType join) { +super(batchSize, task, column, span); +Preconditions.checkNotNull(bs); +Preconditions.checkNotNull(varOrder); +Preconditions.checkNotNull(side); +Preconditions.checkNotNull(join); +this.bs = bs; +this.varOrder = varOrder; +this.join = join; +this.side = side; +} + +public JoinBatchInformation(Task task, Column
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127798313 --- Diff: extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.notification; + +import org.apache.rya.periodic.notification.api.Notification; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; + +/** + * This Object contains a Notification Object used by the Periodic Query Service + * to inform workers to process results for a given Periodic Query with the + * indicated id. Additionally, the CommandNotification contains a + * {@link Command} about which action the + * {@link NotificationCoordinatorExecutor} should take (adding or deleting). + * CommandNotifications are meant to be added to an external work queue (such as + * Kafka) to be processed by the NotificationCoordinatorExecutor. + * + */ +public class CommandNotification implements Notification { + +private Notification notification; +private Command command; + +public enum Command { +ADD, DELETE +}; + +/** + * Creates a new CommandNotification + * @param command - the command associated with this notification (either add, update, or delete) + * @param notification - the underlying notification associated with this command + */ +public CommandNotification(Command command, Notification notification) { +Preconditions.checkNotNull(notification); +Preconditions.checkNotNull(command); +this.command = command; +this.notification = notification; +} + +@Override +public String getId() { +return notification.getId(); +} + +/** + * Returns {@link Notification} contained by this CommmandNotification. + * @return - Notification contained by this Object + */ +public Notification getNotification() { +return this.notification; +} + +/** + * @return Command contained by this Object (either add or delete) + */ +public Command getCommand() { +return this.command; +} + +@Override +public boolean equals(Object other) { +if (this == other) { +return true; +} +if (other instanceof CommandNotification) { +CommandNotification cn = (CommandNotification) other; +return Objects.equal(this.command, cn.command) && Objects.equal(this.notification, cn.notification); +} else { +return false; +} +} + +@Override +public int hashCode() { +int result = 17; +result = 31 * result + Objects.hashCode(command); --- End diff -- objects.hash --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127798607 --- Diff: extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java --- @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.notification; + +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import org.apache.rya.periodic.notification.api.Notification; + +import com.google.common.base.Preconditions; + +/** + * Notification Object used by the Periodic Query Service to inform workers to + * process results for a given Periodic Query with the indicated id. + * Additionally, this Object contains a period that indicates a frequency at + * which regular updates are generated. + * + */ +public class PeriodicNotification implements Notification { + +private String id; +private long period; +private TimeUnit periodTimeUnit; +private long initialDelay; + +/** + * Creates a PeriodicNotification. + * @param id - Fluo Query Id that this notification is associated with + * @param period - period at which notifications are generated + * @param periodTimeUnit - time unit associated with the period and delay + * @param initialDelay - amount of time to wait before generating the first notification + */ +public PeriodicNotification(String id, long period, TimeUnit periodTimeUnit, long initialDelay) { +Preconditions.checkNotNull(id); +Preconditions.checkNotNull(periodTimeUnit); +Preconditions.checkArgument(period > 0 && initialDelay >= 0); +this.id = id; +this.period = period; +this.periodTimeUnit = periodTimeUnit; +this.initialDelay = initialDelay; +} + + +/** + * Create a PeriodicNotification + * @param other - other PeriodicNotification used in copy constructor + */ +public PeriodicNotification(PeriodicNotification other) { +this(other.id, other.period, other.periodTimeUnit, other.initialDelay); +} + +public String getId() { +return id; +} + +/** + * @return - period at which regular notifications are generated + */ +public long getPeriod() { +return period; +} + +/** + * @return time unit of period and initial delay + */ +public TimeUnit getTimeUnit() { +return periodTimeUnit; +} + +/** + * @return amount of time to delay before beginning to generate notifications + */ +public long getInitialDelay() { +return initialDelay; +} + +@Override +public String toString() { +StringBuilder builder = new StringBuilder(); +String delim = "="; +String delim2 = ";"; +return builder.append("id").append(delim).append(id).append(delim2).append("period").append(delim).append(period).append(delim2) + .append("periodTimeUnit").append(delim).append(periodTimeUnit).append(delim2).append("initialDelay").append(delim) +.append(initialDelay).toString(); +} + +@Override +public boolean equals(Object other) { +if (this == other) { +return true; +} + +if (!(other instanceof PeriodicNotification)) { +return false; +} + +PeriodicNotification notification = (PeriodicNotification) other; +return Objects.equals(this.id, notification.id) && (this.period == notification.period) +&& Objects.equals(this.periodTimeUnit, notification.periodTimeUnit) && (this.initialDelay == notification.initialDelay); +}
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127791538 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/PeriodicQueryObserver.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.observers; + +import static java.util.Objects.requireNonNull; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; + --- End diff -- docs! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127734010 --- Diff: extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java --- @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchDeleter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.io.Text; +import org.apache.rya.indexing.pcj.storage.PCJIdFactory; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; +import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; +import org.openrdf.model.impl.LiteralImpl; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.algebra.AggregateOperatorBase; +import org.openrdf.query.algebra.ExtensionElem; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import com.google.common.base.Preconditions; + +/** + * This class is the Accumulo implementation of {@link PeriodicQueryResultStorage} for + * creating, deleting, and interacting with tables where PeriodicQuery results are stored. + */ +public class AccumuloPeriodicQueryResultStorage implements PeriodicQueryResultStorage { + +private String ryaInstance; +private Connector accumuloConn; +private Authorizations auths; +private final PCJIdFactory pcjIdFactory = new PCJIdFactory(); +private final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); +private static final PcjTables pcjTables = new PcjTables(); +private static final PeriodicQueryTableNameFactory tableNameFactory = new PeriodicQueryTableNameFactory(); + +/** + * Creates a AccumuloPeriodicQueryResultStorage Object. + * @param accumuloConn - Accumulo Connector for connecting to an Accumulo instance + * @param ryaInstance - Rya Instance name for connecting to Rya + */ +public AccumuloPeriodicQueryResultStorage(Connector accumuloConn, String ryaInstance) { +this.accumuloConn = accumuloConn; +this.ryaInstance = ryaInstance; +String user = accumuloConn.whoami(); +try { +this.auths = accumuloConn.securityOperations().getUserAuthorizations(user); +} catch (AccumuloException | AccumuloSecurityException e) { +throw new RuntimeException("Unable access user: " + user + "authorizations."); +} +} + +@Override +public String createPeriodicQuery(String sparql) throws PeriodicQueryStorageException { +Preconditions.checkNotNull(sparql); +String queryId = pcjIdFactory.nextId(); +ret
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127740934 --- Diff: extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java --- @@ -412,9 +413,9 @@ private void writeResults( // Row ID = binding set values, Column Family = variable order of the binding set. final Mutation addResult = new Mutation(rowKey); final String visibility = result.getVisibility(); -addResult.put(varOrder.toString(), "", new ColumnVisibility(visibility), ""); +addResult.put(varOrder.toString(), "", new ColumnVisibility(visibility), new Value(bsSerDe.serialize(result).toArray())); mutations.add(addResult); -} catch(final BindingSetConversionException e) { +} catch(Exception e) { --- End diff -- List out the exceptions being caught here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127792369 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryMetadata.java --- @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; + +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; + +/** + * Metadata that is required for periodic queries in the Rya Fluo Application. + * If a periodic query is registered with the Rya Fluo application, the BindingSets + * are placed into temporal bins according to whether they occur within the window of + * a period's ending time. This Metadata is used to create a Bin Id, which is equivalent + * to the period's ending time, to be inserted into each BindingSet that occurs within that + * bin. This is to allow the AggregationUpdater to aggregate the bins by grouping on the + * Bin Id. + * + */ +public class PeriodicQueryMetadata extends CommonNodeMetadata { + +private String parentNodeId; +private String childNodeId; +private long windowSize; +private long period; +private TimeUnit unit; +private String temporalVariable; + +/** + * Constructs an instance of PeriodicQueryMetadata + * @param nodeId - id of periodic query node + * @param varOrder - variable order indicating the order the BindingSet results are written in + * @param parentNodeId - id of parent node + * @param childNodeId - id of child node + * @param windowSize - size of window used for filtering + * @param period - period size that indicates frequency of notifications + * @param unit - TimeUnit corresponding to window and period + * @param temporalVariable - temporal variable that periodic conditions are applied to + */ +public PeriodicQueryMetadata(String nodeId, VariableOrder varOrder, String parentNodeId, String childNodeId, long windowSize, long period, +TimeUnit unit, String temporalVariable) { +super(nodeId, varOrder); +Preconditions.checkNotNull(parentNodeId); +Preconditions.checkNotNull(childNodeId); +Preconditions.checkNotNull(temporalVariable); +Preconditions.checkNotNull(unit); +Preconditions.checkNotNull(period > 0); --- End diff -- = checkNotNull --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127798892 --- Diff: extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java --- @@ -0,0 +1,117 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */package org.apache.rya.periodic.notification.processor; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.periodic.notification.api.LifeCycle; +import org.apache.rya.periodic.notification.api.NodeBin; +import org.apache.rya.periodic.notification.exporter.BindingSetRecord; +import org.apache.rya.periodic.notification.notification.TimestampedNotification; + +import com.google.common.base.Preconditions; + +/** + * Executor service that runs {@link TimestampedNotificationProcessor}s with basic + * functionality for starting, stopping, and determining whether notification processors are + * being executed. + * + */ +public class NotificationProcessorExecutor implements LifeCycle { + +private static final Logger log = Logger.getLogger(TimestampedNotificationProcessor.class); +private BlockingQueue notifications; // notifications +private BlockingQueue bins; // entries to delete from Fluo +private BlockingQueue bindingSets; // query results to + // export +private PeriodicQueryResultStorage periodicStorage; +private List processors; +private int numberThreads; +private ExecutorService executor; +private boolean running = false; + +/** + * Creates NotificationProcessorExecutor. + * @param periodicStorage - storage layer that periodic results are read from + * @param notifications - notifications are pulled from this queue, and the timestamp indicates which bin of results to query for + * @param bins - after notifications are processed, they are added to the bin to be deleted + * @param bindingSets - results read from the storage layer to be exported + * @param numberThreads - number of threads used for processing + */ +public NotificationProcessorExecutor(PeriodicQueryResultStorage periodicStorage, BlockingQueue notifications, +BlockingQueue bins, BlockingQueue bindingSets, int numberThreads) { +Preconditions.checkNotNull(notifications); +Preconditions.checkNotNull(bins); +Preconditions.checkNotNull(bindingSets); +this.notifications = notifications; --- End diff -- here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127771229 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java --- @@ -0,0 +1,92 @@ +package org.apache.rya.indexing.pcj.fluo.app.batch; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.Span; + +/** + * This class represents a batch order to delete all entries in the Fluo table indicated + * by the given Span and Column. These batch orders are processed by the {@link BatchObserver}, + * which uses this batch information along with the nodeId passed into the Observer to perform + * batch deletes. + * + */ +public class SpanBatchDeleteInformation extends AbstractSpanBatchInformation { + +private static final BatchBindingSetUpdater updater = new SpanBatchBindingSetUpdater(); + +public SpanBatchDeleteInformation(int batchSize, Column column, Span span) { +super(batchSize, Task.Delete, column, span); +} + +/** + * @return Updater that applies the {@link Task} to the given {@link Span} and {@link Column} + */ +@Override +public BatchBindingSetUpdater getBatchUpdater() { +return updater; +} + + +public static Builder builder() { +return new Builder(); +} + +public static class Builder { + +private int batchSize = DEFAULT_BATCH_SIZE; +private Column column; +private Span span; + +/** + * @param batchSize - {@link Task}s are applied in batches of this size + */ +public Builder setBatchSize(int batchSize) { +this.batchSize = batchSize; +return this; +} + +/** + * Sets column to apply batch {@link Task} to + * @param column - column batch Task will be applied to + * @return + */ +public Builder setColumn(Column column) { +this.column = column; +return this; +} + +/** + * @param span - span that batch {@link Task} will be applied to + * + */ +public Builder setSpan(Span span) { +this.span = span; +return this; +} + + +public SpanBatchDeleteInformation build() { --- End diff -- docs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127742123 --- Diff: extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VisibilityBindingSetSerDe.java --- @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +import org.apache.fluo.api.data.Bytes; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Serializes and deserializes a {@link VisibilityBindingSet} to and from {@link Bytes} objects. + */ +@DefaultAnnotation(NonNull.class) +public class VisibilityBindingSetSerDe { --- End diff -- Call this an adapter or something like that. also: implements Serializable? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127771163 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchDeleteInformation.java --- @@ -0,0 +1,92 @@ +package org.apache.rya.indexing.pcj.fluo.app.batch; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.Span; + +/** + * This class represents a batch order to delete all entries in the Fluo table indicated + * by the given Span and Column. These batch orders are processed by the {@link BatchObserver}, + * which uses this batch information along with the nodeId passed into the Observer to perform + * batch deletes. + * + */ +public class SpanBatchDeleteInformation extends AbstractSpanBatchInformation { + +private static final BatchBindingSetUpdater updater = new SpanBatchBindingSetUpdater(); + +public SpanBatchDeleteInformation(int batchSize, Column column, Span span) { +super(batchSize, Task.Delete, column, span); +} + +/** + * @return Updater that applies the {@link Task} to the given {@link Span} and {@link Column} + */ +@Override +public BatchBindingSetUpdater getBatchUpdater() { +return updater; +} + + +public static Builder builder() { --- End diff -- docs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127732487 --- Diff: extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryStorageMetadata.java --- @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage; + +import java.util.Objects; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; + +import com.google.common.base.Preconditions; + +/** + * Metadata for a given PeriodicQueryStorage table. + */ +public class PeriodicQueryStorageMetadata { + +private String sparql; +private VariableOrder varOrder; + +/** + * Create a PeriodicQueryStorageMetadat object + * @param sparql - SPARQL query whose results are stored in table + * @param varOrder - order that BindingSet values are written in in table + */ +public PeriodicQueryStorageMetadata(String sparql, VariableOrder varOrder) { +Preconditions.checkNotNull(sparql); --- End diff -- this can be simplified to this.sparql = checkNotNull(sparql) and the others here too --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127796542 --- Diff: extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java --- @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.application; + +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; +import org.apache.rya.periodic.notification.api.BinPruner; +import org.apache.rya.periodic.notification.api.LifeCycle; +import org.apache.rya.periodic.notification.api.NodeBin; +import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor; +import org.apache.rya.periodic.notification.exporter.BindingSetRecord; +import org.apache.rya.periodic.notification.exporter.KafkaExporterExecutor; +import org.apache.rya.periodic.notification.processor.NotificationProcessorExecutor; +import org.apache.rya.periodic.notification.pruner.PeriodicQueryPrunerExecutor; +import org.apache.rya.periodic.notification.registration.kafka.KafkaNotificationProvider; +import org.openrdf.query.algebra.evaluation.function.Function; + +import com.google.common.base.Preconditions; + +/** + * The PeriodicNotificationApplication runs the key components of the Periodic + * Query Service. It consists of a {@link KafkaNotificationProvider}, a + * {@link NotificationCoordinatorExecutor}, a + * {@link NotificationProcessorExecutor}, a {@link KafkaExporterExecutor}, and a + * {@link PeriodicQueryPrunerExecutor}. These services run in coordination with + * one another to perform the following tasks in the indicated order: + * Retrieve new requests to generate periodic notifications from Kafka + * Register them with the {@link NotificationCoordinatorExecutor} to + * generate the periodic notifications + * As notifications are generated, they are added to a work queue that is + * monitored by the {@link NotificationProcessorExecutor}. + * The processor processes the notifications by reading all of the query + * results corresponding to the bin and query id indicated by the notification. + * After reading the results, the processor adds a {@link BindingSetRecord} + * to a work queue monitored by the {@link KafkaExporterExecutor}. + * The processor then adds a {@link NodeBin} to a workqueue monitored by the + * {@link BinPruner} + * The exporter processes the BindingSetRecord by exporing the result to + * Kafka + * The BinPruner processes the NodeBin by cleaning up the results for the + * indicated bin and query in Accumulo and Fluo. + * + * The purpose of this Periodic Query Service is to facilitate the ability to + * answer Periodic Queries using the Rya Fluo application, where a Periodic + * Query is any query requesting periodic updates about events that occurred + * within a given window of time of this instant. This is also known as a + * rolling window query. Period Queries can be expressed using SPARQL by + * including the {@link Function} indicated by the URI + * {@link PeriodicQueryUtil#PeriodicQueryURI}. The user must provide this + * Function with the following arguments: the temporal variable in the query + * that will be filtered on, the window of time that events must occur within, + * the period at which the user wants to receive updates, and the time unit. The + * following query requests all observations that occurred within the last + * minute and requests updates every 15 seconds. It also performs a count on + * those observations. + * + * prefix function: http://org.apache.rya/function# + * "prefix time: http://www.w3.org/2006/time# + * "select (count(?obs) as ?total) where { + * "Filter(function:periodic(?time, 1, .25, time:minutes)) + * "?obs uri:hasTime ?time. + * "?obs uri
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127754934 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/BasicBatchInformation.java --- @@ -0,0 +1,82 @@ +package org.apache.rya.indexing.pcj.fluo.app.batch; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import org.apache.fluo.api.data.Column; + +import com.google.common.base.Preconditions; + +/** + * This class contains all of the common info contained in other implementations + * of BatchInformation. + * + */ +public abstract class BasicBatchInformation implements BatchInformation { + +private int batchSize; +private Task task; +private Column column; + +/** + * Create BasicBatchInformation object + * @param batchSize - size of batch to be processed + * @param task - task to be processed + * @param column - Column in which data is proessed + */ +public BasicBatchInformation(int batchSize, Task task, Column column ) { +Preconditions.checkNotNull(task); --- End diff -- this, = not null() --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127770676 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/SpanBatchBindingSetUpdater.java --- @@ -0,0 +1,128 @@ +package org.apache.rya.indexing.pcj.fluo.app.batch; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.util.Iterator; +import java.util.Optional; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.client.scanner.ColumnScanner; +import org.apache.fluo.api.client.scanner.RowScanner; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.ColumnValue; +import org.apache.fluo.api.data.RowColumn; +import org.apache.fluo.api.data.Span; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task; + +import com.google.common.base.Preconditions; + +/** + * This class processes {@link SpanBatchDeleteInformation} objects by + * deleting the entries in the Fluo Column corresponding to the {@link Span} + * of the BatchInformation object. This class will delete entries until the + * batch size is met, and then create a new SpanBatchDeleteInformation object + * with an updated Span whose starting point is the stopping point of this + * batch. If the batch limit is not met, then a new batch is not created and + * the task is complete. + * + */ +public class SpanBatchBindingSetUpdater extends AbstractBatchBindingSetUpdater { + +private static final Logger log = Logger.getLogger(SpanBatchBindingSetUpdater.class); + +/** + * Process SpanBatchDeleteInformation objects by deleting all entries indicated + * by Span until batch limit is met. + * @param tx - Fluo Transaction + * @param row - Byte row identifying BatchInformation + * @param batch - SpanBatchDeleteInformation object to be processed + */ +@Override +public void processBatch(TransactionBase tx, Bytes row, BatchInformation batch) throws Exception { +super.processBatch(tx, row, batch); +Preconditions.checkArgument(batch instanceof SpanBatchDeleteInformation); +SpanBatchDeleteInformation spanBatch = (SpanBatchDeleteInformation) batch; +Task task = spanBatch.getTask(); +int batchSize = spanBatch.getBatchSize(); +Span span = spanBatch.getSpan(); +Column column = batch.getColumn(); +Optional rowCol = Optional.empty(); + +switch (task) { +case Add: +log.trace("The Task Add is not supported for SpanBatchBindingSetUpdater. Batch " + batch + " will not be processed."); --- End diff -- throw OperationUnsupportedException? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127791396 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java --- @@ -36,10 +36,11 @@ * Incrementally exports SPARQL query results to Kafka topics. */ public class KafkaBindingSetExporter implements IncrementalBindingSetExporter { + --- End diff -- no significant changes here... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127741029 --- Diff: extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PeriodicQueryTableNameFactory.java --- @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo; + +import static java.util.Objects.requireNonNull; + +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; + +/** + * Class for creating the names of {@link PeriodicQueryResultStorage} tables. + * + */ +public class PeriodicQueryTableNameFactory { + +public static final String PeriodicTableSuffix = "PERIODIC_QUERY_"; --- End diff -- if this is a suffix, why end with an underscore? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127799662 --- Diff: extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.pruner; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.Snapshot; +import org.apache.fluo.api.client.SnapshotBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; +import org.apache.rya.periodic.notification.api.BinPruner; +import org.apache.rya.periodic.notification.api.NodeBin; + +import jline.internal.Preconditions; + +/** + * Implementation of {@link BinPruner} that deletes old, already processed + * Periodic Query results from Fluo and the PCJ table to which the Fluo results + * are exported. + * + */ +public class PeriodicQueryPruner implements BinPruner, Runnable { + +private static final Logger log = Logger.getLogger(PeriodicQueryPruner.class); +private FluoClient client; +private AccumuloBinPruner accPruner; +private FluoBinPruner fluoPruner; +private BlockingQueue bins; +private AtomicBoolean closed = new AtomicBoolean(false); +private int threadNumber; + +public PeriodicQueryPruner(FluoBinPruner fluoPruner, AccumuloBinPruner accPruner, FluoClient client, BlockingQueue bins, int threadNumber) { +Preconditions.checkNotNull(fluoPruner); +Preconditions.checkNotNull(accPruner); +Preconditions.checkNotNull(client); +this.client = client; --- End diff -- here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127750682 --- Diff: extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java --- @@ -67,7 +68,8 @@ /** * Constructs an instance of {@link DeletePcj}. * - * @param batchSize - The number of entries that will be deleted at a time. (> 0) + * @param batchSize + *- The number of entries that will be deleted at a time. (> 0) --- End diff -- I don't like the reformatting of these docs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127753530 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractBatchBindingSetUpdater.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.batch; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.RowColumn; +import org.apache.fluo.api.data.Span; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; + +public abstract class AbstractBatchBindingSetUpdater implements BatchBindingSetUpdater { --- End diff -- docs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127731724 --- Diff: extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PeriodicQueryStorageException.java --- @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage; + +public class PeriodicQueryStorageException extends Exception { --- End diff -- doc when/where/why this exception gets used --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127751572 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/JoinResultUpdater.java --- @@ -43,6 +43,7 @@ import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; --- End diff -- are you actually using this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-rya pull request #177: RYA-280-Periodic Query Service
GitHub user meiercaleb opened a pull request: https://github.com/apache/incubator-rya/pull/177 RYA-280-Periodic Query Service This pull request contains four main components. The first component is the PeriodicQueryMetadata and observer framework for Fluo. This capability adds a periodic bin id to BindingSets as they percolate through Fluo. The second component is a PeriodicQueryResultStorage API, which essentially wraps existing PCJTables class and allows results to be queried based on their periodic bin id. The third capability is the external PeriodicQueryService that generates periodic notifications. When a notification is generated, it uses the time stamp of that notification to poll the PeriodicQueryResultStorage table for results whose periodic bin matches the notification's time stamp. It then exports the matching results to Kafka. The fourth and final component is a Batch Processing framework that allows Fluo to batch any process or task that could lead to an out of memory exception. This can occur when Fluo issues a scan to its underlying table in the middle of a transaction. Using the Batch framework, a BatchInformation object is written to the BatchObserver column (the BatchInformation objects consist of a task (add, delete, update), a batch size, the column that the task will be applied to, and the query node id that it will be applied to). Additional information is included as well, such as the Span (range of data that task is applied to). When the BatchObserver processes a BatchInformation object, it performs the indicated task until it reaches the batch size, at which point it creates a new BatchInformation object with info detailing the stopping point of the last processing stage. That BatchInformation object is then fed back to the BatchObserver, and this process continues until the task can be completed before reaching the batch size. ### Tests Integration tests were written to test that the PeriodicQueryMetadata was being inserted correctly into Fluo, that BindingSets were being binned correctly, and that all components of the external PeriodicQueryService were operating as expected. Integration tests can be found in rya.periodic.service.integration.tests and rya.pcj.fluo.integration. ### Links [Jira](https://issues.apache.org/jira/browse/RYA-280) ### Checklist - [ ] Code Review - [X] Squash Commits People To Reivew Andrew Smith, Aaron Mihalik You can merge this pull request into a Git repository by running: $ git pull https://github.com/meiercaleb/incubator-rya RYA-280 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-rya/pull/177.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #177 commit 4b21c1aa3541172905a2d0415d07cae1ae3a Author: Caleb Meier Date: 2017-04-15T02:20:25Z RYA-280-Periodic Query Service --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---