[ https://issues.apache.org/jira/browse/RYA-260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15979251#comment-15979251 ]
ASF GitHub Bot commented on RYA-260: ------------------------------------ Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/156#discussion_r112760188 --- Diff: extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java --- @@ -0,0 +1,583 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.log4j.Logger; +import org.apache.rya.accumulo.utils.VisibilitySimplifier; +import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement; +import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.openrdf.model.Literal; +import org.openrdf.model.Value; +import org.openrdf.model.datatypes.XMLDatatypeUtil; +import org.openrdf.model.impl.DecimalLiteralImpl; +import org.openrdf.model.impl.IntegerLiteralImpl; +import org.openrdf.query.algebra.MathExpr.MathOp; +import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException; +import org.openrdf.query.algebra.evaluation.util.MathUtil; +import org.openrdf.query.algebra.evaluation.util.ValueComparator; +import org.openrdf.query.impl.MapBindingSet; + +import com.google.common.collect.ImmutableMap; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Updates the results of an Aggregate node when its child has added a new Binding Set to its results. + */ +@DefaultAnnotation(NonNull.class) +public class AggregationResultUpdater { + private static final Logger log = Logger.getLogger(AggregationResultUpdater.class); + + private static final AggregationStateSerDe AGG_STATE_SERDE = new ObjectSerializationAggregationStateSerDe(); + private static final VisibilityBindingSetSerde BS_SERDE = new VisibilityBindingSetSerde(); + + private static final ImmutableMap<AggregationType, AggregationFunction> FUNCTIONS; + static { + final ImmutableMap.Builder<AggregationType, AggregationFunction> builder = ImmutableMap.builder(); + builder.put(AggregationType.COUNT, new CountFunction()); + builder.put(AggregationType.SUM, new SumFunction()); + builder.put(AggregationType.AVERAGE, new AverageFunction()); + builder.put(AggregationType.MIN, new MinFunction()); + builder.put(AggregationType.MAX, new MaxFunction()); + FUNCTIONS = builder.build(); + } + + /** + * Updates the results of an Aggregation node where its child has emitted a new Binding Set. + * + * @param tx - The transaction all Fluo queries will use. (not null) + * @oaram childRow - The Row Key of the child Binding Set that changed. (not null) + * @param childCol - The Column Key of the child Binding Set that changed. (not null) + * @param aggregationMetadata - The metadata of the Aggregation node whose results will be updated. (not null) + * @throws Exception The update could not be successfulyl performed. + */ + public void updateAggregateResults( + final TransactionBase tx, + final Bytes childRow, + final Column childCol, + final AggregationMetadata aggregationMetadata) throws Exception { + requireNonNull(tx); + requireNonNull(childRow); + requireNonNull(childCol); + requireNonNull(aggregationMetadata); + + // Make sure the child binding set has not already been handled. + final Bytes childValue = tx.get(childRow, childCol); + final VisibilityBindingSet childBindingSet = BS_SERDE.deserialize(childValue); + + log.trace( + "Transaction ID: " + tx.getStartTimestamp() + "\n" + + "Row Key: " + childRow + "\n" + + "Column Key: " + childCol + "\n" + + "Binding Set:\n" + childBindingSet + "\n"); --- End diff -- Given that the VisibilityBindingSet is available when you call this method, it seems like you are just passing in the Column for logging purposes. > Add Aggregation support for Fluo/PCJ app > ---------------------------------------- > > Key: RYA-260 > URL: https://issues.apache.org/jira/browse/RYA-260 > Project: Rya > Issue Type: New Feature > Reporter: Andrew Smith > Assignee: Kevin Chilton > > A user must be able to submit a PCJ query that contains the following > aggregation functions from SPARQL: > * Sum > * Count > * Average > * Min > * Max > This task does not include any aggregations that appear within a GroupBy > clause. We only need to support queries that have the aggregation within the > SELECT section. > For example, the following query should be processed: > {code} > SELECT (avg(?price) as ?averagePrice) > { > urn:BookA urn:price ?price. > } > {code} > And the following query should not be processed because it requires a group > by: > {code} > SELECT ?title (avg(?price) as ?averagePrice) > { > ?title urn:price ?price. > } > GROUP BY ?title > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)