[ 
https://issues.apache.org/jira/browse/RYA-260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15979251#comment-15979251
 ] 

ASF GitHub Bot commented on RYA-260:
------------------------------------

Github user meiercaleb commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/156#discussion_r112760188
  
    --- Diff: 
extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java
 ---
    @@ -0,0 +1,583 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.indexing.pcj.fluo.app;
    +
    +import static com.google.common.base.Preconditions.checkArgument;
    +import static java.util.Objects.requireNonNull;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.io.Serializable;
    +import java.math.BigDecimal;
    +import java.math.BigInteger;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Optional;
    +
    +import org.apache.fluo.api.client.TransactionBase;
    +import org.apache.fluo.api.data.Bytes;
    +import org.apache.fluo.api.data.Column;
    +import org.apache.log4j.Logger;
    +import org.apache.rya.accumulo.utils.VisibilitySimplifier;
    +import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata;
    +import 
org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement;
    +import 
org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType;
    +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
    +import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
    +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
    +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
    +import org.openrdf.model.Literal;
    +import org.openrdf.model.Value;
    +import org.openrdf.model.datatypes.XMLDatatypeUtil;
    +import org.openrdf.model.impl.DecimalLiteralImpl;
    +import org.openrdf.model.impl.IntegerLiteralImpl;
    +import org.openrdf.query.algebra.MathExpr.MathOp;
    +import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException;
    +import org.openrdf.query.algebra.evaluation.util.MathUtil;
    +import org.openrdf.query.algebra.evaluation.util.ValueComparator;
    +import org.openrdf.query.impl.MapBindingSet;
    +
    +import com.google.common.collect.ImmutableMap;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * Updates the results of an Aggregate node when its child has added a new 
Binding Set to its results.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class AggregationResultUpdater {
    +    private static final Logger log = 
Logger.getLogger(AggregationResultUpdater.class);
    +
    +    private static final AggregationStateSerDe AGG_STATE_SERDE = new 
ObjectSerializationAggregationStateSerDe();
    +    private static final VisibilityBindingSetSerde BS_SERDE = new 
VisibilityBindingSetSerde();
    +
    +    private static final ImmutableMap<AggregationType, 
AggregationFunction> FUNCTIONS;
    +    static {
    +        final ImmutableMap.Builder<AggregationType, AggregationFunction> 
builder = ImmutableMap.builder();
    +        builder.put(AggregationType.COUNT, new CountFunction());
    +        builder.put(AggregationType.SUM, new SumFunction());
    +        builder.put(AggregationType.AVERAGE, new AverageFunction());
    +        builder.put(AggregationType.MIN, new MinFunction());
    +        builder.put(AggregationType.MAX, new MaxFunction());
    +        FUNCTIONS = builder.build();
    +    }
    +
    +    /**
    +     * Updates the results of an Aggregation node where its child has 
emitted a new Binding Set.
    +     *
    +     * @param tx - The transaction all Fluo queries will use. (not null)
    +     * @oaram childRow - The Row Key of the child Binding Set that 
changed. (not null)
    +     * @param childCol - The Column Key of the child Binding Set that 
changed. (not null)
    +     * @param aggregationMetadata - The metadata of the Aggregation node 
whose results will be updated. (not null)
    +     * @throws Exception The update could not be successfulyl performed.
    +     */
    +    public void updateAggregateResults(
    +            final TransactionBase tx,
    +            final Bytes childRow,
    +            final Column childCol,
    +            final AggregationMetadata aggregationMetadata) throws 
Exception {
    +        requireNonNull(tx);
    +        requireNonNull(childRow);
    +        requireNonNull(childCol);
    +        requireNonNull(aggregationMetadata);
    +
    +        // Make sure the child binding set has not already been handled.
    +        final Bytes childValue = tx.get(childRow, childCol);
    +        final VisibilityBindingSet childBindingSet = 
BS_SERDE.deserialize(childValue);
    +
    +        log.trace(
    +                "Transaction ID: " + tx.getStartTimestamp() + "\n" +
    +                "Row Key: " + childRow + "\n" +
    +                "Column Key: " + childCol + "\n" +
    +                "Binding Set:\n" + childBindingSet + "\n");
    --- End diff --
    
    Given that the VisibilityBindingSet is available when you call this method, 
it seems like you are just passing in the Column for logging purposes.


> Add Aggregation support for Fluo/PCJ app
> ----------------------------------------
>
>                 Key: RYA-260
>                 URL: https://issues.apache.org/jira/browse/RYA-260
>             Project: Rya
>          Issue Type: New Feature
>            Reporter: Andrew Smith
>            Assignee: Kevin Chilton
>
> A user must be able to submit a PCJ query that contains the following 
> aggregation functions from SPARQL:
> * Sum
> * Count
> * Average
> * Min
> * Max
> This task does not include any aggregations that appear within a GroupBy 
> clause. We only need to support queries that have the aggregation within the 
> SELECT section.
> For example, the following query should be processed:
> {code}
> SELECT (avg(?price) as ?averagePrice)
> {
>     urn:BookA urn:price ?price.
> }
> {code}
> And the following query should not be processed because it requires a group 
> by:
> {code}
> SELECT ?title (avg(?price) as ?averagePrice)
> {
>     ?title urn:price ?price.
> }
> GROUP BY ?title
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to