[ https://issues.apache.org/jira/browse/RYA-280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16091583#comment-16091583 ]
ASF GitHub Bot commented on RYA-280: ------------------------------------ Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/177#discussion_r127984352 --- Diff: extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java --- @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchDeleter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.hadoop.io.Text; +import org.apache.rya.indexing.pcj.storage.PCJIdFactory; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; +import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; +import org.openrdf.model.impl.LiteralImpl; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.algebra.AggregateOperatorBase; +import org.openrdf.query.algebra.ExtensionElem; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; +import org.openrdf.query.parser.sparql.SPARQLParser; + +import com.google.common.base.Preconditions; + +/** + * This class is the Accumulo implementation of {@link PeriodicQueryResultStorage} for + * creating, deleting, and interacting with tables where PeriodicQuery results are stored. + */ +public class AccumuloPeriodicQueryResultStorage implements PeriodicQueryResultStorage { + + private String ryaInstance; + private Connector accumuloConn; + private Authorizations auths; + private final PCJIdFactory pcjIdFactory = new PCJIdFactory(); + private final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); + private static final PcjTables pcjTables = new PcjTables(); + private static final PeriodicQueryTableNameFactory tableNameFactory = new PeriodicQueryTableNameFactory(); + + /** + * Creates a AccumuloPeriodicQueryResultStorage Object. + * @param accumuloConn - Accumulo Connector for connecting to an Accumulo instance + * @param ryaInstance - Rya Instance name for connecting to Rya + */ + public AccumuloPeriodicQueryResultStorage(Connector accumuloConn, String ryaInstance) { + this.accumuloConn = accumuloConn; + this.ryaInstance = ryaInstance; + String user = accumuloConn.whoami(); + try { + this.auths = accumuloConn.securityOperations().getUserAuthorizations(user); + } catch (AccumuloException | AccumuloSecurityException e) { + throw new RuntimeException("Unable access user: " + user + "authorizations."); + } + } + + @Override + public String createPeriodicQuery(String sparql) throws PeriodicQueryStorageException { + Preconditions.checkNotNull(sparql); + String queryId = pcjIdFactory.nextId(); + return createPeriodicQuery(queryId, sparql); + } + + @Override + public String createPeriodicQuery(String queryId, String sparql) throws PeriodicQueryStorageException { + Set<String> bindingNames; + try { + bindingNames = new AggregateVariableRemover().getNonAggregationVariables(sparql); + } catch (MalformedQueryException e) { + throw new PeriodicQueryStorageException(e.getMessage()); + } + List<String> varOrderList = new ArrayList<>(); + varOrderList.add(PeriodicQueryResultStorage.PeriodicBinId); + varOrderList.addAll(bindingNames); + createPeriodicQuery(queryId, sparql, new VariableOrder(varOrderList)); + return queryId; + } + + @Override + public void createPeriodicQuery(String queryId, String sparql, VariableOrder order) throws PeriodicQueryStorageException { + Preconditions.checkNotNull(sparql); + Preconditions.checkNotNull(queryId); + Preconditions.checkNotNull(order); + Preconditions.checkArgument(PeriodicQueryResultStorage.PeriodicBinId.equals(order.getVariableOrders().get(0)), + "periodicBinId binding name must occur first in VariableOrder."); + String tableName = tableNameFactory.makeTableName(ryaInstance, queryId); + Set<VariableOrder> varOrders = new HashSet<>(); + varOrders.add(order); + try { + pcjTables.createPcjTable(accumuloConn, tableName, varOrders, sparql); + } catch (Exception e) { + throw new PeriodicQueryStorageException(e.getMessage()); + } + } + + @Override + public PeriodicQueryStorageMetadata getPeriodicQueryMetadata(String queryId) throws PeriodicQueryStorageException { + try { + return new PeriodicQueryStorageMetadata( + pcjTables.getPcjMetadata(accumuloConn, tableNameFactory.makeTableName(ryaInstance, queryId))); + } catch (Exception e) { + throw new PeriodicQueryStorageException(e.getMessage()); + } + } + + @Override + public void addPeriodicQueryResults(String queryId, Collection<VisibilityBindingSet> results) throws PeriodicQueryStorageException { + results.forEach(x -> Preconditions.checkArgument(x.hasBinding(PeriodicQueryResultStorage.PeriodicBinId), --- End diff -- No. The constructor takes in an Accumulo Connector and uses this each time it needs to retrieve results. > PeriodicQuery Support for Fluo > ------------------------------ > > Key: RYA-280 > URL: https://issues.apache.org/jira/browse/RYA-280 > Project: Rya > Issue Type: New Feature > Components: clients > Affects Versions: 3.2.10 > Reporter: Caleb Meier > Assignee: Caleb Meier > > Add the capability to Rya-Fluo App to provide periodic updates for queries > registered with Fluo. That is, provide the application with the ability to > satisfy the standing query "tell me every 12 hours about all of the events of > a particular type that occurred within the last 24 hours". Given that Fluo > operates using a push based notification system, some external service needs > to be implemented to periodically notify Fluo to generate a result > notification (this helps handle the non-event event that occurs when nothing > happens in a given period of time). -- This message was sent by Atlassian JIRA (v6.4.14#64029)