RYA-377 Repackaged the common Aggregation code into the rya.api.functions project.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/8363724b Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/8363724b Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/8363724b Branch: refs/heads/master Commit: 8363724b4d684c47fe806a4f364e40134964e6cb Parents: 1535b46 Author: kchilton2 <kevin.e.chil...@gmail.com> Authored: Tue Nov 21 16:29:30 2017 -0500 Committer: caleb <caleb.me...@parsons.com> Committed: Tue Jan 9 15:13:01 2018 -0500 ---------------------------------------------------------------------- .../aggregation/AggregationElement.java | 105 +++++ .../aggregation/AggregationFunction.java | 41 ++ .../function/aggregation/AggregationState.java | 120 ++++++ .../function/aggregation/AggregationType.java | 66 +++ .../function/aggregation/AverageFunction.java | 96 +++++ .../api/function/aggregation/AverageState.java | 93 +++++ .../api/function/aggregation/CountFunction.java | 61 +++ .../api/function/aggregation/MaxFunction.java | 63 +++ .../api/function/aggregation/MinFunction.java | 63 +++ .../api/function/aggregation/SumFunction.java | 85 ++++ .../pcj/fluo/app/AggregationResultUpdater.java | 410 +------------------ .../fluo/app/observers/AggregationObserver.java | 4 +- .../pcj/fluo/app/query/AggregationMetadata.java | 130 +----- .../pcj/fluo/app/query/FluoQueryColumns.java | 2 +- .../fluo/app/query/FluoQueryMetadataDAO.java | 11 +- .../fluo/app/query/SparqlFluoQueryBuilder.java | 255 ++++++------ .../fluo/app/query/FluoQueryMetadataDAOIT.java | 66 +-- 17 files changed, 994 insertions(+), 677 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationElement.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationElement.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationElement.java new file mode 100644 index 0000000..3112059 --- /dev/null +++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationElement.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.function.aggregation; + +import static java.util.Objects.requireNonNull; + +import java.io.Serializable; +import java.util.Objects; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Represents all of the metadata require to perform an Aggregation that is part of a SPARQL query. + * </p> + * For example, if you have the following in SPARQL: + * <pre> + * SELECT (avg(?price) as ?avgPrice) { + * ... + * } + * </pre> + * You would construct an instance of this object like so: + * <pre> + * new AggregationElement(AggregationType.AVERAGE, "price", "avgPrice"); + * </pre> + */ +@DefaultAnnotation(NonNull.class) +public final class AggregationElement implements Serializable { + private static final long serialVersionUID = 1L; + + private final AggregationType aggregationType; + private final String aggregatedBindingName; + private final String resultBindingName; + + /** + * Constructs an instance of {@link AggregationElement}. + * + * @param aggregationType - Defines how the binding values will be aggregated. (not null) + * @param aggregatedBindingName - The name of the binding whose values is aggregated. This binding must + * appear within the child node's emitted binding sets. (not null) + * @param resultBindingName - The name of the binding this aggregation's results are written to. This binding + * must appeared within the AggregationMetadata's variable order. (not null) + */ + public AggregationElement( + final AggregationType aggregationType, + final String aggregatedBindingName, + final String resultBindingName) { + this.aggregationType = requireNonNull(aggregationType); + this.aggregatedBindingName = requireNonNull(aggregatedBindingName); + this.resultBindingName = requireNonNull(resultBindingName); + } + + /** + * @return Defines how the binding values will be aggregated. + */ + public AggregationType getAggregationType() { + return aggregationType; + } + + /** + * @return The name of the binding whose values is aggregated. This binding must appear within the child node's emitted binding sets. + */ + public String getAggregatedBindingName() { + return aggregatedBindingName; + } + + /** + * @return The name of the binding this aggregation's results are written to. This binding must appeared within the AggregationMetadata's variable order. + */ + public String getResultBindingName() { + return resultBindingName; + } + + @Override + public int hashCode() { + return Objects.hash(aggregationType, aggregatedBindingName, resultBindingName); + } + + @Override + public boolean equals(final Object o ) { + if(o instanceof AggregationElement) { + final AggregationElement agg = (AggregationElement) o; + return Objects.equals(aggregationType, agg.aggregationType) && + Objects.equals(aggregatedBindingName, agg.aggregatedBindingName) && + Objects.equals(resultBindingName, agg.resultBindingName); + } + return false; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationFunction.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationFunction.java new file mode 100644 index 0000000..e8c49e7 --- /dev/null +++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationFunction.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.function.aggregation; + +import org.apache.rya.api.model.VisibilityBindingSet; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A function that updates an {@link AggregationState}. + */ +@DefaultAnnotation(NonNull.class) +public interface AggregationFunction { + + /** + * Updates an {@link AggregationState} based on the values of a child Binding Set. + * + * @param aggregation - Defines which function needs to be performed as well as any details required + * to do the aggregation work. (not null) + * @param state - The state that will be updated. (not null) + * @param childBindingSet - The Binding Set whose values will be used to update the state. + */ + public void update(AggregationElement aggregation, AggregationState state, VisibilityBindingSet childBindingSet); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationState.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationState.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationState.java new file mode 100644 index 0000000..2551696 --- /dev/null +++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationState.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.function.aggregation; + +import static java.util.Objects.requireNonNull; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import org.openrdf.query.impl.MapBindingSet; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Keeps track information required to update and build the resulting Binding Set for a set of Group By values. + */ +@DefaultAnnotation(NonNull.class) +public final class AggregationState implements Serializable { + private static final long serialVersionUID = 1L; + + // The visibility equation that encompasses all data the aggregation state is derived from. + private String visibility; + + // A binding set that holds the current state of the aggregations. + private final MapBindingSet bindingSet; + + // A map from result binding name to the state that derived that binding's value. + private final Map<String, AverageState> avgStates; + + /** + * Constructs an instance of {@link AggregationState}. + */ + public AggregationState() { + this.visibility = ""; + this.bindingSet = new MapBindingSet(); + this.avgStates = new HashMap<>(); + } + + /** + * Constructs an instance of {@link AggregationState}. + * + * @param visibility - The visibility equation associated with the resulting binding set. (not null) + * @param bindingSet - The Binding Set whose values are being updated. It holds the result for a set of + * Group By values. (not null) + * @param avgStates - If the aggregation is doing an Average, this is a map from result binding name to + * average state for that binding. + */ + public AggregationState( + final String visibility, + final MapBindingSet bindingSet, + final Map<String, AverageState> avgStates) { + this.visibility = requireNonNull(visibility); + this.bindingSet = requireNonNull(bindingSet); + this.avgStates = requireNonNull(avgStates); + } + + /** + * @return The visibility equation associated with the resulting binding set. + */ + public String getVisibility() { + return visibility; + } + + /** + * @param visibility - The visibility equation associated with the resulting binding set. + */ + public void setVisibility(final String visibility) { + this.visibility = requireNonNull(visibility); + } + + /** + * @return The Binding Set whose values are being updated. It holds the result for a set of Group By values. + */ + public MapBindingSet getBindingSet() { + return bindingSet; + } + + /** + * @return If the aggregation is doing an Average, this is a map from result binding name to + * average state for that binding. + */ + public Map<String, AverageState> getAverageStates() { + return avgStates; + } + + @Override + public int hashCode() { + return Objects.hash(visibility, bindingSet, avgStates); + } + + @Override + public boolean equals(final Object o) { + if(o instanceof AggregationState) { + final AggregationState state = (AggregationState) o; + return Objects.equals(visibility, state.visibility) && + Objects.equals(bindingSet, state.bindingSet) && + Objects.equals(avgStates, state.avgStates); + } + return false; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationType.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationType.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationType.java new file mode 100644 index 0000000..5383da1 --- /dev/null +++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AggregationType.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.function.aggregation; + +import static java.util.Objects.requireNonNull; + +import java.util.Optional; + +import org.openrdf.query.algebra.AggregateOperator; +import org.openrdf.query.algebra.Avg; +import org.openrdf.query.algebra.Count; +import org.openrdf.query.algebra.Max; +import org.openrdf.query.algebra.Min; +import org.openrdf.query.algebra.Sum; + +import com.google.common.collect.ImmutableMap; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * The different types of Aggregation functions that an aggregate node may perform. + */ +@DefaultAnnotation(NonNull.class) +public enum AggregationType { + MIN(Min.class), + MAX(Max.class), + COUNT(Count.class), + SUM(Sum.class), + AVERAGE(Avg.class); + + private final Class<? extends AggregateOperator> operatorClass; + + private AggregationType(final Class<? extends AggregateOperator> operatorClass) { + this.operatorClass = requireNonNull(operatorClass); + } + + private static final ImmutableMap<Class<? extends AggregateOperator>, AggregationType> byOperatorClass; + static { + final ImmutableMap.Builder<Class<? extends AggregateOperator>, AggregationType> builder = ImmutableMap.builder(); + for(final AggregationType type : AggregationType.values()) { + builder.put(type.operatorClass, type); + } + byOperatorClass = builder.build(); + } + + public static Optional<AggregationType> byOperatorClass(final Class<? extends AggregateOperator> operatorClass) { + return Optional.ofNullable( byOperatorClass.get(operatorClass) ); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageFunction.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageFunction.java new file mode 100644 index 0000000..a73d5ac --- /dev/null +++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageFunction.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.function.aggregation; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.Map; + +import org.apache.rya.api.model.VisibilityBindingSet; +import org.openrdf.model.Literal; +import org.openrdf.model.Value; +import org.openrdf.model.datatypes.XMLDatatypeUtil; +import org.openrdf.model.impl.DecimalLiteralImpl; +import org.openrdf.model.impl.IntegerLiteralImpl; +import org.openrdf.query.algebra.MathExpr.MathOp; +import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException; +import org.openrdf.query.algebra.evaluation.util.MathUtil; +import org.openrdf.query.impl.MapBindingSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Update the {@link AggregationState}'s average if the child Binding Set contains the binding name + * that is being averaged by the {@link AggregationElement}. + */ +@DefaultAnnotation(NonNull.class) +public final class AverageFunction implements AggregationFunction { + private static final Logger log = LoggerFactory.getLogger(AverageFunction.class); + + @Override + public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) { + checkArgument(aggregation.getAggregationType() == AggregationType.AVERAGE, "The AverageFunction only accepts AVERAGE AggregationElements."); + + // Only update the average if the child contains the binding that we are averaging. + final String aggregatedName = aggregation.getAggregatedBindingName(); + if(childBindingSet.hasBinding(aggregatedName)) { + final MapBindingSet result = state.getBindingSet(); + final String resultName = aggregation.getResultBindingName(); + final boolean newBinding = !result.hasBinding(resultName); + + // Get the state of the average. + final Map<String, AverageState> averageStates = state.getAverageStates(); + AverageState averageState = newBinding ? new AverageState() : averageStates.get(resultName); + + // Update the state of the average. + final Value childValue = childBindingSet.getValue(aggregatedName); + if(childValue instanceof Literal) { + final Literal childLiteral = (Literal) childValue; + if (childLiteral.getDatatype() != null && XMLDatatypeUtil.isNumericDatatype(childLiteral.getDatatype())) { + try { + // Update the sum. + final Literal oldSum = new DecimalLiteralImpl(averageState.getSum()); + final BigDecimal sum = MathUtil.compute(oldSum, childLiteral, MathOp.PLUS).decimalValue(); + + // Update the count. + final BigInteger count = averageState.getCount().add( BigInteger.ONE ); + + // Update the BindingSet to include the new average. + final Literal sumLiteral = new DecimalLiteralImpl(sum); + final Literal countLiteral = new IntegerLiteralImpl(count); + final Literal average = MathUtil.compute(sumLiteral, countLiteral, MathOp.DIVIDE); + result.addBinding(resultName, average); + + // Update the average state that is stored. + averageState = new AverageState(sum, count); + averageStates.put(resultName, averageState); + } catch (final ValueExprEvaluationException e) { + log.error("A problem was encountered while updating an Average Aggregation. This binding set will be ignored: " + childBindingSet); + return; + } + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageState.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageState.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageState.java new file mode 100644 index 0000000..8917751 --- /dev/null +++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/AverageState.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.function.aggregation; + +import static java.util.Objects.requireNonNull; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.Objects; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * The Sum and Count of the values that are being averaged. The average itself is derived from these values. + */ +@DefaultAnnotation(NonNull.class) +public class AverageState implements Serializable { + private static final long serialVersionUID = 1L; + + private final BigDecimal sum; + private final BigInteger count; + + /** + * Constructs an instance of {@link AverageState} where the count and sum start at 0. + */ + public AverageState() { + sum = BigDecimal.ZERO; + count = BigInteger.ZERO; + } + + /** + * Constructs an instance of {@link AverageState}. + * + * @param sum - The sum of the values that are averaged. (not null) + * @param count - The number of values that are averaged. (not null) + */ + public AverageState(final BigDecimal sum, final BigInteger count) { + this.sum = requireNonNull(sum); + this.count = requireNonNull(count); + } + + /** + * @return The sum of the values that are averaged. + */ + public BigDecimal getSum() { + return sum; + } + + /** + * @return The number of values that are averaged. + */ + public BigInteger getCount() { + return count; + } + + @Override + public int hashCode() { + return Objects.hash(sum, count); + } + + @Override + public boolean equals(final Object o) { + if(o instanceof AverageState) { + final AverageState state = (AverageState) o; + return Objects.equals(sum, state.sum) && + Objects.equals(count, state.count); + } + return false; + } + + @Override + public String toString() { + return "Sum: " + sum + " Count: " + count; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/CountFunction.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/CountFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/CountFunction.java new file mode 100644 index 0000000..7dd5b21 --- /dev/null +++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/CountFunction.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.function.aggregation; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.math.BigInteger; + +import org.apache.rya.api.model.VisibilityBindingSet; +import org.openrdf.model.Literal; +import org.openrdf.model.impl.IntegerLiteralImpl; +import org.openrdf.query.impl.MapBindingSet; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Increments the {@link AggregationState}'s count if the child Binding Set contains the binding name + * that is being counted by the {@link AggregationElement}. + */ +@DefaultAnnotation(NonNull.class) +public final class CountFunction implements AggregationFunction { + @Override + public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) { + checkArgument(aggregation.getAggregationType() == AggregationType.COUNT, "The CountFunction only accepts COUNT AggregationElements."); + + // Only add one to the count if the child contains the binding that we are counting. + final String aggregatedName = aggregation.getAggregatedBindingName(); + if(childBindingSet.hasBinding(aggregatedName)) { + final MapBindingSet result = state.getBindingSet(); + final String resultName = aggregation.getResultBindingName(); + final boolean newBinding = !result.hasBinding(resultName); + + if(newBinding) { + // Initialize the binding. + result.addBinding(resultName, new IntegerLiteralImpl(BigInteger.ONE)); + } else { + // Update the existing binding. + final Literal count = (Literal) result.getValue(resultName); + final BigInteger updatedCount = count.integerValue().add( BigInteger.ONE ); + result.addBinding(resultName, new IntegerLiteralImpl(updatedCount)); + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MaxFunction.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MaxFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MaxFunction.java new file mode 100644 index 0000000..3295fbb --- /dev/null +++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MaxFunction.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.function.aggregation; + +import static com.google.common.base.Preconditions.checkArgument; + +import org.apache.rya.api.model.VisibilityBindingSet; +import org.openrdf.model.Value; +import org.openrdf.query.algebra.evaluation.util.ValueComparator; +import org.openrdf.query.impl.MapBindingSet; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Update the {@link AggregationState}'s max if the child binding Set contains the binding name that is being + * maxed by the {@link AggregationElement}. + */ +@DefaultAnnotation(NonNull.class) +public final class MaxFunction implements AggregationFunction { + + private final ValueComparator compare = new ValueComparator(); + + @Override + public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) { + checkArgument(aggregation.getAggregationType() == AggregationType.MAX, "The MaxFunction only accepts MAX AggregationElements."); + + // Only update the max if the child contains the binding that we are finding the max value for. + final String aggregatedName = aggregation.getAggregatedBindingName(); + if(childBindingSet.hasBinding(aggregatedName)) { + final MapBindingSet result = state.getBindingSet(); + final String resultName = aggregation.getResultBindingName(); + final boolean newBinding = !result.hasBinding(resultName); + + Value max; + if(newBinding) { + max = childBindingSet.getValue(aggregatedName); + } else { + final Value oldMax = result.getValue(resultName); + final Value childMax = childBindingSet.getValue(aggregatedName); + max = compare.compare(childMax, oldMax) > 0 ? childMax : oldMax; + } + + result.addBinding(resultName, max); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MinFunction.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MinFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MinFunction.java new file mode 100644 index 0000000..d6bf751 --- /dev/null +++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/MinFunction.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.function.aggregation; + +import static com.google.common.base.Preconditions.checkArgument; + +import org.apache.rya.api.model.VisibilityBindingSet; +import org.openrdf.model.Value; +import org.openrdf.query.algebra.evaluation.util.ValueComparator; +import org.openrdf.query.impl.MapBindingSet; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Update the {@link AggregationState}'s min if the child binding Set contains the binding name that is being + * mined by the {@link AggregationElement}. + */ +@DefaultAnnotation(NonNull.class) +public final class MinFunction implements AggregationFunction { + + private final ValueComparator compare = new ValueComparator(); + + @Override + public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) { + checkArgument(aggregation.getAggregationType() == AggregationType.MIN, "The MinFunction only accepts MIN AggregationElements."); + + // Only update the min if the child contains the binding that we are finding the min value for. + final String aggregatedName = aggregation.getAggregatedBindingName(); + if(childBindingSet.hasBinding(aggregatedName)) { + final MapBindingSet result = state.getBindingSet(); + final String resultName = aggregation.getResultBindingName(); + final boolean newBinding = !result.hasBinding(resultName); + + Value min; + if(newBinding) { + min = childBindingSet.getValue(aggregatedName); + } else { + final Value oldMin = result.getValue(resultName); + final Value chidlMin = childBindingSet.getValue(aggregatedName); + min = compare.compare(chidlMin, oldMin) < 0 ? chidlMin : oldMin; + } + + result.addBinding(resultName, min); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/SumFunction.java ---------------------------------------------------------------------- diff --git a/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/SumFunction.java b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/SumFunction.java new file mode 100644 index 0000000..97735f2 --- /dev/null +++ b/common/rya.api.function/src/main/java/org/apache/rya/api/function/aggregation/SumFunction.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.function.aggregation; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.math.BigInteger; + +import org.apache.rya.api.model.VisibilityBindingSet; +import org.openrdf.model.Literal; +import org.openrdf.model.Value; +import org.openrdf.model.datatypes.XMLDatatypeUtil; +import org.openrdf.model.impl.IntegerLiteralImpl; +import org.openrdf.query.algebra.MathExpr.MathOp; +import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException; +import org.openrdf.query.algebra.evaluation.util.MathUtil; +import org.openrdf.query.impl.MapBindingSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Add to the {@link AggregationState}'s sum if the child Binding Set contains the binding name + * that is being summed by the {@link AggregationElement}. + */ +@DefaultAnnotation(NonNull.class) +public final class SumFunction implements AggregationFunction { + private static final Logger log = LoggerFactory.getLogger(SumFunction.class); + + @Override + public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) { + checkArgument(aggregation.getAggregationType() == AggregationType.SUM, "The SumFunction only accepts SUM AggregationElements."); + + // Only add values to the sum if the child contains the binding that we are summing. + final String aggregatedName = aggregation.getAggregatedBindingName(); + if(childBindingSet.hasBinding(aggregatedName)) { + final MapBindingSet result = state.getBindingSet(); + final String resultName = aggregation.getResultBindingName(); + final boolean newBinding = !result.hasBinding(resultName); + + // Get the starting number for the sum. + Literal sum; + if(newBinding) { + sum = new IntegerLiteralImpl(BigInteger.ZERO); + } else { + sum = (Literal) state.getBindingSet().getValue(resultName); + } + + // Add the child binding set's value if it is a numeric literal. + final Value childValue = childBindingSet.getValue(aggregatedName); + if(childValue instanceof Literal) { + final Literal childLiteral = (Literal) childValue; + if (childLiteral.getDatatype() != null && XMLDatatypeUtil.isNumericDatatype(childLiteral.getDatatype())) { + try { + sum = MathUtil.compute(sum, childLiteral, MathOp.PLUS); + } catch (final ValueExprEvaluationException e) { + log.error("A problem was encountered while updating a Sum Aggregation. This binding set will be ignored: " + childBindingSet); + return; + } + } + } + + // Update the state to include the new sum. + result.addBinding(resultName, sum); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java index bb96a6a..4fbaad9 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/AggregationResultUpdater.java @@ -18,19 +18,12 @@ */ package org.apache.rya.indexing.pcj.fluo.app; -import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; -import java.io.Serializable; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; import java.util.Optional; import org.apache.commons.io.serialization.ValidatingObjectInputStream; @@ -38,22 +31,21 @@ import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.data.Bytes; import org.apache.log4j.Logger; import org.apache.rya.accumulo.utils.VisibilitySimplifier; +import org.apache.rya.api.function.aggregation.AggregationElement; +import org.apache.rya.api.function.aggregation.AggregationFunction; +import org.apache.rya.api.function.aggregation.AggregationState; +import org.apache.rya.api.function.aggregation.AggregationType; +import org.apache.rya.api.function.aggregation.AverageFunction; +import org.apache.rya.api.function.aggregation.AverageState; +import org.apache.rya.api.function.aggregation.CountFunction; +import org.apache.rya.api.function.aggregation.MaxFunction; +import org.apache.rya.api.function.aggregation.MinFunction; +import org.apache.rya.api.function.aggregation.SumFunction; import org.apache.rya.api.log.LogUtils; import org.apache.rya.api.model.VisibilityBindingSet; import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata; -import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement; -import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; -import org.openrdf.model.Literal; -import org.openrdf.model.Value; -import org.openrdf.model.datatypes.XMLDatatypeUtil; -import org.openrdf.model.impl.DecimalLiteralImpl; -import org.openrdf.model.impl.IntegerLiteralImpl; -import org.openrdf.query.algebra.MathExpr.MathOp; -import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException; -import org.openrdf.query.algebra.evaluation.util.MathUtil; -import org.openrdf.query.algebra.evaluation.util.ValueComparator; import org.openrdf.query.impl.MapBindingSet; import com.google.common.collect.ImmutableMap; @@ -154,213 +146,6 @@ public class AggregationResultUpdater extends AbstractNodeUpdater { } /** - * A function that updates an {@link AggregationState}. - */ - public static interface AggregationFunction { - - /** - * Updates an {@link AggregationState} based on the values of a child Binding Set. - * - * @param aggregation - Defines which function needs to be performed as well as any details required - * to do the aggregation work. (not null) - * @param state - The state that will be updated. (not null) - * @param childBindingSet - The Binding Set whose values will be used to update the state. - */ - public void update(AggregationElement aggregation, AggregationState state, VisibilityBindingSet childBindingSet); - } - - /** - * Increments the {@link AggregationState}'s count if the child Binding Set contains the binding name - * that is being counted by the {@link AggregationElement}. - */ - public static final class CountFunction implements AggregationFunction { - @Override - public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) { - checkArgument(aggregation.getAggregationType() == AggregationType.COUNT, "The CountFunction only accepts COUNT AggregationElements."); - - // Only add one to the count if the child contains the binding that we are counting. - final String aggregatedName = aggregation.getAggregatedBindingName(); - if(childBindingSet.hasBinding(aggregatedName)) { - final MapBindingSet result = state.getBindingSet(); - final String resultName = aggregation.getResultBindingName(); - final boolean newBinding = !result.hasBinding(resultName); - - if(newBinding) { - // Initialize the binding. - result.addBinding(resultName, new IntegerLiteralImpl(BigInteger.ONE)); - } else { - // Update the existing binding. - final Literal count = (Literal) result.getValue(resultName); - final BigInteger updatedCount = count.integerValue().add( BigInteger.ONE ); - result.addBinding(resultName, new IntegerLiteralImpl(updatedCount)); - } - } - } - } - - /** - * Add to the {@link AggregationState}'s sum if the child Binding Set contains the binding name - * that is being summed by the {@link AggregationElement}. - */ - public static final class SumFunction implements AggregationFunction { - @Override - public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) { - checkArgument(aggregation.getAggregationType() == AggregationType.SUM, "The SumFunction only accepts SUM AggregationElements."); - - // Only add values to the sum if the child contains the binding that we are summing. - final String aggregatedName = aggregation.getAggregatedBindingName(); - if(childBindingSet.hasBinding(aggregatedName)) { - final MapBindingSet result = state.getBindingSet(); - final String resultName = aggregation.getResultBindingName(); - final boolean newBinding = !result.hasBinding(resultName); - - // Get the starting number for the sum. - Literal sum; - if(newBinding) { - sum = new IntegerLiteralImpl(BigInteger.ZERO); - } else { - sum = (Literal) state.getBindingSet().getValue(resultName); - } - - // Add the child binding set's value if it is a numeric literal. - final Value childValue = childBindingSet.getValue(aggregatedName); - if(childValue instanceof Literal) { - final Literal childLiteral = (Literal) childValue; - if (childLiteral.getDatatype() != null && XMLDatatypeUtil.isNumericDatatype(childLiteral.getDatatype())) { - try { - sum = MathUtil.compute(sum, childLiteral, MathOp.PLUS); - } catch (final ValueExprEvaluationException e) { - log.error("A problem was encountered while updating a Sum Aggregation. This binding set will be ignored: " + childBindingSet); - return; - } - } - } - - // Update the state to include the new sum. - result.addBinding(resultName, sum); - } - } - } - - /** - * Update the {@link AggregationState}'s average if the child Binding Set contains the binding name - * that is being averaged by the {@link AggregationElement}. - */ - public static final class AverageFunction implements AggregationFunction { - @Override - public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) { - checkArgument(aggregation.getAggregationType() == AggregationType.AVERAGE, "The AverageFunction only accepts AVERAGE AggregationElements."); - - // Only update the average if the child contains the binding that we are averaging. - final String aggregatedName = aggregation.getAggregatedBindingName(); - if(childBindingSet.hasBinding(aggregatedName)) { - final MapBindingSet result = state.getBindingSet(); - final String resultName = aggregation.getResultBindingName(); - final boolean newBinding = !result.hasBinding(resultName); - - // Get the state of the average. - final Map<String, AverageState> averageStates = state.getAverageStates(); - AverageState averageState = newBinding ? new AverageState() : averageStates.get(resultName); - - // Update the state of the average. - final Value childValue = childBindingSet.getValue(aggregatedName); - if(childValue instanceof Literal) { - final Literal childLiteral = (Literal) childValue; - if (childLiteral.getDatatype() != null && XMLDatatypeUtil.isNumericDatatype(childLiteral.getDatatype())) { - try { - // Update the sum. - final Literal oldSum = new DecimalLiteralImpl(averageState.getSum()); - final BigDecimal sum = MathUtil.compute(oldSum, childLiteral, MathOp.PLUS).decimalValue(); - - // Update the count. - final BigInteger count = averageState.getCount().add( BigInteger.ONE ); - - // Update the BindingSet to include the new average. - final Literal sumLiteral = new DecimalLiteralImpl(sum); - final Literal countLiteral = new IntegerLiteralImpl(count); - final Literal average = MathUtil.compute(sumLiteral, countLiteral, MathOp.DIVIDE); - result.addBinding(resultName, average); - - // Update the average state that is stored. - averageState = new AverageState(sum, count); - averageStates.put(resultName, averageState); - } catch (final ValueExprEvaluationException e) { - log.error("A problem was encountered while updating an Average Aggregation. This binding set will be ignored: " + childBindingSet); - return; - } - } - } - } - } - } - - /** - * Update the {@link AggregationState}'s max if the child binding Set contains the binding name that is being - * maxed by the {@link AggregationElement}. - */ - public static final class MaxFunction implements AggregationFunction { - - private final ValueComparator compare = new ValueComparator(); - - @Override - public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) { - checkArgument(aggregation.getAggregationType() == AggregationType.MAX, "The MaxFunction only accepts MAX AggregationElements."); - - // Only update the max if the child contains the binding that we are finding the max value for. - final String aggregatedName = aggregation.getAggregatedBindingName(); - if(childBindingSet.hasBinding(aggregatedName)) { - final MapBindingSet result = state.getBindingSet(); - final String resultName = aggregation.getResultBindingName(); - final boolean newBinding = !result.hasBinding(resultName); - - Value max; - if(newBinding) { - max = childBindingSet.getValue(aggregatedName); - } else { - final Value oldMax = result.getValue(resultName); - final Value childMax = childBindingSet.getValue(aggregatedName); - max = compare.compare(childMax, oldMax) > 0 ? childMax : oldMax; - } - - result.addBinding(resultName, max); - } - } - } - - /** - * Update the {@link AggregationState}'s min if the child binding Set contains the binding name that is being - * mined by the {@link AggregationElement}. - */ - public static final class MinFunction implements AggregationFunction { - - private final ValueComparator compare = new ValueComparator(); - - @Override - public void update(final AggregationElement aggregation, final AggregationState state, final VisibilityBindingSet childBindingSet) { - checkArgument(aggregation.getAggregationType() == AggregationType.MIN, "The MinFunction only accepts MIN AggregationElements."); - - // Only update the min if the child contains the binding that we are finding the min value for. - final String aggregatedName = aggregation.getAggregatedBindingName(); - if(childBindingSet.hasBinding(aggregatedName)) { - final MapBindingSet result = state.getBindingSet(); - final String resultName = aggregation.getResultBindingName(); - final boolean newBinding = !result.hasBinding(resultName); - - Value min; - if(newBinding) { - min = childBindingSet.getValue(aggregatedName); - } else { - final Value oldMin = result.getValue(resultName); - final Value chidlMin = childBindingSet.getValue(aggregatedName); - min = compare.compare(chidlMin, oldMin) < 0 ? chidlMin : oldMin; - } - - result.addBinding(resultName, min); - } - } - } - - /** * Reads/Writes instances of {@link AggregationState} to/from bytes. */ public static interface AggregationStateSerDe { @@ -410,18 +195,18 @@ public class AggregationResultUpdater extends AbstractNodeUpdater { // System.out.println("vois.accept(" + className + ".class, ");};}; ) { // These classes are allowed to be deserialized. Others throw InvalidClassException. - vois.accept(org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.AggregationState.class, // - org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.AverageState.class, // - java.util.HashMap.class, // - java.math.BigInteger.class, // - java.lang.Number.class, // - java.math.BigDecimal.class, // - org.openrdf.query.impl.MapBindingSet.class, // - java.util.LinkedHashMap.class, // - org.openrdf.query.impl.BindingImpl.class, // - org.openrdf.model.impl.URIImpl.class, // - org.openrdf.model.impl.LiteralImpl.class, // - org.openrdf.model.impl.DecimalLiteralImpl.class, // + vois.accept(AggregationState.class, + AverageState.class, + java.util.HashMap.class, + java.math.BigInteger.class, + java.lang.Number.class, + java.math.BigDecimal.class, + org.openrdf.query.impl.MapBindingSet.class, + java.util.LinkedHashMap.class, + org.openrdf.query.impl.BindingImpl.class, + org.openrdf.model.impl.URIImpl.class, + org.openrdf.model.impl.LiteralImpl.class, + org.openrdf.model.impl.DecimalLiteralImpl.class, org.openrdf.model.impl.IntegerLiteralImpl.class); vois.accept("[B"); // Array of Bytes final Object o = vois.readObject(); @@ -437,155 +222,4 @@ public class AggregationResultUpdater extends AbstractNodeUpdater { return state; } } - - /** - * Keeps track information required to update and build the resulting Binding Set for a set of Group By values. - */ - public static final class AggregationState implements Serializable { - private static final long serialVersionUID = 1L; - - // The visibility equation that encompasses all data the aggregation state is derived from. - private String visibility; - - // A binding set that holds the current state of the aggregations. - private final MapBindingSet bindingSet; - - // A map from result binding name to the state that derived that binding's value. - private final Map<String, AverageState> avgStates; - - /** - * Constructs an instance of {@link AggregationState}. - */ - public AggregationState() { - this.visibility = ""; - this.bindingSet = new MapBindingSet(); - this.avgStates = new HashMap<>(); - } - - /** - * Constructs an instance of {@link AggregationState}. - * - * @param visibility - The visibility equation associated with the resulting binding set. (not null) - * @param bindingSet - The Binding Set whose values are being updated. It holds the result for a set of - * Group By values. (not null) - * @param avgStates - If the aggregation is doing an Average, this is a map from result binding name to - * average state for that binding. - */ - public AggregationState( - final String visibility, - final MapBindingSet bindingSet, - final Map<String, AverageState> avgStates) { - this.visibility = requireNonNull(visibility); - this.bindingSet = requireNonNull(bindingSet); - this.avgStates = requireNonNull(avgStates); - } - - /** - * @return The visibility equation associated with the resulting binding set. - */ - public String getVisibility() { - return visibility; - } - - /** - * @param visibility - The visibility equation associated with the resulting binding set. - */ - public void setVisibility(final String visibility) { - this.visibility = requireNonNull(visibility); - } - - /** - * @return The Binding Set whose values are being updated. It holds the result for a set of Group By values. - */ - public MapBindingSet getBindingSet() { - return bindingSet; - } - - /** - * @return If the aggregation is doing an Average, this is a map from result binding name to - * average state for that binding. - */ - public Map<String, AverageState> getAverageStates() { - return avgStates; - } - - @Override - public int hashCode() { - return Objects.hash(visibility, bindingSet, avgStates); - } - - @Override - public boolean equals(final Object o) { - if(o instanceof AggregationState) { - final AggregationState state = (AggregationState) o; - return Objects.equals(visibility, state.visibility) && - Objects.equals(bindingSet, state.bindingSet) && - Objects.equals(avgStates, state.avgStates); - } - return false; - } - } - - /** - * The Sum and Count of the values that are being averaged. The average itself is derived from these values. - */ - public static class AverageState implements Serializable { - private static final long serialVersionUID = 1L; - - private final BigDecimal sum; - private final BigInteger count; - - /** - * Constructs an instance of {@link AverageState} where the count and sum start at 0. - */ - public AverageState() { - sum = BigDecimal.ZERO; - count = BigInteger.ZERO; - } - - /** - * Constructs an instance of {@link AverageState}. - * - * @param sum - The sum of the values that are averaged. (not null) - * @param count - The number of values that are averaged. (not null) - */ - public AverageState(final BigDecimal sum, final BigInteger count) { - this.sum = requireNonNull(sum); - this.count = requireNonNull(count); - } - - /** - * @return The sum of the values that are averaged. - */ - public BigDecimal getSum() { - return sum; - } - - /** - * @return The number of values that are averaged. - */ - public BigInteger getCount() { - return count; - } - - @Override - public int hashCode() { - return Objects.hash(sum, count); - } - - @Override - public boolean equals(final Object o) { - if(o instanceof AverageState) { - final AverageState state = (AverageState) o; - return Objects.equals(sum, state.sum) && - Objects.equals(count, state.count); - } - return false; - } - - @Override - public String toString() { - return "Sum: " + sum + " Count: " + count; - } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java index 0271519..e806b15 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/AggregationObserver.java @@ -23,10 +23,10 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.AG import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.data.Bytes; -import org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.AggregationState; +import org.apache.rya.api.function.aggregation.AggregationState; +import org.apache.rya.api.model.VisibilityBindingSet; import org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.AggregationStateSerDe; import org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.ObjectSerializationAggregationStateSerDe; -import org.apache.rya.api.model.VisibilityBindingSet; import org.apache.rya.indexing.pcj.fluo.app.BindingSetRow; import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java index eaa072f..a839645 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/AggregationMetadata.java @@ -20,23 +20,14 @@ package org.apache.rya.indexing.pcj.fluo.app.query; import static java.util.Objects.requireNonNull; -import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Objects; -import java.util.Optional; +import org.apache.rya.api.function.aggregation.AggregationElement; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; -import org.openrdf.query.algebra.AggregateOperator; -import org.openrdf.query.algebra.Avg; -import org.openrdf.query.algebra.Count; -import org.openrdf.query.algebra.Max; -import org.openrdf.query.algebra.Min; -import org.openrdf.query.algebra.Sum; - -import com.google.common.collect.ImmutableMap; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; @@ -50,115 +41,6 @@ import net.jcip.annotations.Immutable; @DefaultAnnotation(NonNull.class) public class AggregationMetadata extends CommonNodeMetadata { - /** - * The different types of Aggregation functions that an aggregate node may perform. - */ - public static enum AggregationType { - MIN(Min.class), - MAX(Max.class), - COUNT(Count.class), - SUM(Sum.class), - AVERAGE(Avg.class); - - private final Class<? extends AggregateOperator> operatorClass; - - private AggregationType(final Class<? extends AggregateOperator> operatorClass) { - this.operatorClass = requireNonNull(operatorClass); - } - - private static final ImmutableMap<Class<? extends AggregateOperator>, AggregationType> byOperatorClass; - static { - final ImmutableMap.Builder<Class<? extends AggregateOperator>, AggregationType> builder = ImmutableMap.builder(); - for(final AggregationType type : AggregationType.values()) { - builder.put(type.operatorClass, type); - } - byOperatorClass = builder.build(); - } - - public static Optional<AggregationType> byOperatorClass(final Class<? extends AggregateOperator> operatorClass) { - return Optional.ofNullable( byOperatorClass.get(operatorClass) ); - } - } - - /** - * Represents all of the metadata require to perform an Aggregation that is part of a SPARQL query. - * </p> - * For example, if you have the following in SPARQL: - * <pre> - * SELECT (avg(?price) as ?avgPrice) { - * ... - * } - * </pre> - * You would construct an instance of this object like so: - * <pre> - * new AggregationElement(AggregationType.AVERAGE, "price", "avgPrice"); - * </pre> - */ - @Immutable - @DefaultAnnotation(NonNull.class) - public static final class AggregationElement implements Serializable { - private static final long serialVersionUID = 1L; - - private final AggregationType aggregationType; - private final String aggregatedBindingName; - private final String resultBindingName; - - /** - * Constructs an instance of {@link AggregationElement}. - * - * @param aggregationType - Defines how the binding values will be aggregated. (not null) - * @param aggregatedBindingName - The name of the binding whose values is aggregated. This binding must - * appear within the child node's emitted binding sets. (not null) - * @param resultBindingName - The name of the binding this aggregation's results are written to. This binding - * must appeared within the AggregationMetadata's variable order. (not null) - */ - public AggregationElement( - final AggregationType aggregationType, - final String aggregatedBindingName, - final String resultBindingName) { - this.aggregationType = requireNonNull(aggregationType); - this.aggregatedBindingName = requireNonNull(aggregatedBindingName); - this.resultBindingName = requireNonNull(resultBindingName); - } - - /** - * @return Defines how the binding values will be aggregated. - */ - public AggregationType getAggregationType() { - return aggregationType; - } - - /** - * @return The name of the binding whose values is aggregated. This binding must appear within the child node's emitted binding sets. - */ - public String getAggregatedBindingName() { - return aggregatedBindingName; - } - - /** - * @return The name of the binding this aggregation's results are written to. This binding must appeared within the AggregationMetadata's variable order. - */ - public String getResultBindingName() { - return resultBindingName; - } - - @Override - public int hashCode() { - return Objects.hash(aggregationType, aggregatedBindingName, resultBindingName); - } - - @Override - public boolean equals(final Object o ) { - if(o instanceof AggregationElement) { - final AggregationElement agg = (AggregationElement) o; - return Objects.equals(aggregationType, agg.aggregationType) && - Objects.equals(aggregatedBindingName, agg.aggregatedBindingName) && - Objects.equals(resultBindingName, agg.resultBindingName); - } - return false; - } - } - private final String parentNodeId; private final String childNodeId; private final Collection<AggregationElement> aggregations; @@ -308,6 +190,7 @@ public class AggregationMetadata extends CommonNodeMetadata { /** * @return This node's Node ID. */ + @Override public String getNodeId() { return nodeId; } @@ -321,10 +204,11 @@ public class AggregationMetadata extends CommonNodeMetadata { this.varOrder = varOrder; return this; } - + /** * @return the variable order of binding sets that are emitted by this node. */ + @Override public VariableOrder getVariableOrder() { return varOrder; } @@ -337,7 +221,7 @@ public class AggregationMetadata extends CommonNodeMetadata { this.parentNodeId = parentNodeId; return this; } - + public String getParentNodeId() { return parentNodeId; } @@ -350,7 +234,7 @@ public class AggregationMetadata extends CommonNodeMetadata { this.childNodeId = childNodeId; return this; } - + public String getChildNodeId() { return childNodeId; } @@ -375,7 +259,7 @@ public class AggregationMetadata extends CommonNodeMetadata { this.groupByVariables = groupByVariables; return this; } - + /** * @return variable order that defines how data is grouped for the aggregation function */ http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java index 9d1c4fc..e043671 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java @@ -25,8 +25,8 @@ import java.util.List; import org.apache.fluo.api.data.Column; import org.apache.rya.api.client.CreatePCJ.QueryType; +import org.apache.rya.api.function.aggregation.AggregationState; import org.apache.rya.api.model.VisibilityBindingSet; -import org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.AggregationState; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8363724b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java index ba75a56..c193ef7 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java @@ -37,11 +37,12 @@ import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; import org.apache.rya.api.client.CreatePCJ.ExportStrategy; import org.apache.rya.api.client.CreatePCJ.QueryType; +import org.apache.rya.api.function.aggregation.AggregationElement; +import org.apache.rya.api.function.aggregation.AggregationType; import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph; import org.apache.rya.indexing.pcj.fluo.app.ConstructGraphSerializer; import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; import org.apache.rya.indexing.pcj.fluo.app.NodeType; -import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; @@ -567,10 +568,10 @@ public class FluoQueryMetadataDAO { // System.out.println("vois.accept(" + className + ".class, ");};}; ) { // These classes are allowed to be deserialized. Others throw InvalidClassException. - vois.accept(java.util.ArrayList.class, // - java.lang.Enum.class, // - org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement.class, // - org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType.class); + vois.accept(java.util.ArrayList.class, + java.lang.Enum.class, + AggregationElement.class, + AggregationType.class); final Object object = vois.readObject(); if (!(object instanceof Collection<?>)) { throw new InvalidClassException("Object read was not of type Collection. It was: " + object.getClass());