This is an automated email from the ASF dual-hosted git repository. amaliujia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 8ea176d [BEAM-10551] Implement Navigation Functions FIRST_VALUE and LAST_VALUE (#12313) 8ea176d is described below commit 8ea176d23d8d13d2f9becb107178c36ef5269062 Author: jhnmora000 <48849676+jhnmora...@users.noreply.github.com> AuthorDate: Wed Jul 22 00:48:02 2020 -0500 [BEAM-10551] Implement Navigation Functions FIRST_VALUE and LAST_VALUE (#12313) * Implement navigation functions - FIRST_VALUE - LAST_VALUE Since, navigation functions are executed sequentially the mergeAccumulators implementation was replaced with an UnsupportedOperationException --- .../sdk/extensions/sql/impl/rel/BeamWindowRel.java | 3 +- .../transform/BeamBuiltinAnalyticFunctions.java | 65 +++++++++++- .../extensions/sql/BeamAnalyticFunctionsTest.java | 113 +++++++++++++++++++++ 3 files changed, 179 insertions(+), 2 deletions(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java index 9d29dd4..09ca5b1 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java @@ -293,7 +293,8 @@ public class BeamWindowRel extends Window implements BeamRelNode { Object accumulator = fieldAgg.combineFn.createAccumulator(); final int aggFieldIndex = fieldAgg.inputFields.get(0); for (Row aggRow : aggRange) { - fieldAgg.combineFn.addInput(accumulator, aggRow.getBaseValue(aggFieldIndex)); + accumulator = + fieldAgg.combineFn.addInput(accumulator, aggRow.getBaseValue(aggFieldIndex)); } Object result = fieldAgg.combineFn.extractOutput(accumulator); Row processingRow = sortedRowsAsList.get(idx); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAnalyticFunctions.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAnalyticFunctions.java index 086b0f2..14fe20d 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAnalyticFunctions.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAnalyticFunctions.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.extensions.sql.impl.transform; import java.util.Map; +import java.util.Optional; import java.util.function.Function; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.Combine; @@ -29,7 +30,8 @@ public class BeamBuiltinAnalyticFunctions { BUILTIN_ANALYTIC_FACTORIES = ImmutableMap.<String, Function<Schema.FieldType, Combine.CombineFn<?, ?, ?>>>builder() .putAll(BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES) - // Pending Navigation functions + .put("FIRST_VALUE", typeName -> navigationFirstValue()) + .put("LAST_VALUE", typeName -> navigationLastValue()) // Pending Numbering functions .build(); @@ -42,4 +44,65 @@ public class BeamBuiltinAnalyticFunctions { throw new UnsupportedOperationException( String.format("Analytics Function [%s] is not supported", functionName)); } + + public static <T> Combine.CombineFn<T, ?, T> navigationFirstValue() { + return new FirstValueCombineFn(); + } + + public static <T> Combine.CombineFn<T, ?, T> navigationLastValue() { + return new LastValueCombineFn(); + } + + private static class FirstValueCombineFn<T> extends Combine.CombineFn<T, Optional<T>, T> { + private FirstValueCombineFn() {} + + @Override + public Optional<T> createAccumulator() { + return Optional.empty(); + } + + @Override + public Optional<T> addInput(Optional<T> accumulator, T input) { + Optional<T> r = accumulator; + if (!accumulator.isPresent()) { + r = Optional.of(input); + } + return r; + } + + @Override + public Optional<T> mergeAccumulators(Iterable<Optional<T>> accumulators) { + throw new UnsupportedOperationException(); + } + + @Override + public T extractOutput(Optional<T> accumulator) { + return accumulator.isPresent() ? accumulator.get() : null; + } + } + + private static class LastValueCombineFn<T> extends Combine.CombineFn<T, Optional<T>, T> { + private LastValueCombineFn() {} + + @Override + public Optional<T> createAccumulator() { + return Optional.empty(); + } + + @Override + public Optional<T> addInput(Optional<T> accumulator, T input) { + Optional<T> r = Optional.of(input); + return r; + } + + @Override + public Optional<T> mergeAccumulators(Iterable<Optional<T>> accumulators) { + throw new UnsupportedOperationException(); + } + + @Override + public T extractOutput(Optional<T> accumulator) { + return accumulator.isPresent() ? accumulator.get() : null; + } + } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamAnalyticFunctionsTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamAnalyticFunctionsTest.java index bfc53f3..2e06a0b 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamAnalyticFunctionsTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamAnalyticFunctionsTest.java @@ -360,4 +360,117 @@ public class BeamAnalyticFunctionsTest extends BeamSqlDslBase { pipeline.run(); } + + // Tests for new functions + @Test + public void testFirstValueFunction() throws Exception { + pipeline.enableAbandonedNodeEnforcement(false); + PCollection<Row> inputRows = inputData(); + String sql = + "SELECT item, purchases, category, FIRST_VALUE(purchases) over " + + "(" + + "PARTITION BY category " + + "ORDER BY purchases " + + "ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" + + ")" + + " as total_purchases FROM PCOLLECTION"; + PCollection<Row> result = inputRows.apply("sql", SqlTransform.query(sql)); + + Schema overResultSchema = + Schema.builder() + .addStringField("item") + .addInt32Field("purchases") + .addStringField("category") + .addInt32Field("total_purchases") + .build(); + + List<Row> overResult = + TestUtils.RowsBuilder.of(overResultSchema) + .addRows( + "orange", + 2, + "fruit", + 2, + "apple", + 8, + "fruit", + 2, + "leek", + 2, + "vegetable", + 2, + "cabbage", + 9, + "vegetable", + 2, + "lettuce", + 10, + "vegetable", + 2, + "kale", + 23, + "vegetable", + 2) + .getRows(); + + PAssert.that(result).containsInAnyOrder(overResult); + + pipeline.run(); + } + + @Test + public void testLastValueFunction() throws Exception { + pipeline.enableAbandonedNodeEnforcement(false); + PCollection<Row> inputRows = inputData(); + String sql = + "SELECT item, purchases, category, LAST_VALUE(purchases) over " + + "(" + + "PARTITION BY category " + + "ORDER BY purchases " + + "ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" + + ")" + + " as total_purchases FROM PCOLLECTION"; + PCollection<Row> result = inputRows.apply("sql", SqlTransform.query(sql)); + + Schema overResultSchema = + Schema.builder() + .addStringField("item") + .addInt32Field("purchases") + .addStringField("category") + .addInt32Field("total_purchases") + .build(); + + List<Row> overResult = + TestUtils.RowsBuilder.of(overResultSchema) + .addRows( + "orange", + 2, + "fruit", + 2, + "apple", + 8, + "fruit", + 8, + "leek", + 2, + "vegetable", + 2, + "cabbage", + 9, + "vegetable", + 9, + "lettuce", + 10, + "vegetable", + 10, + "kale", + 23, + "vegetable", + 23) + .getRows(); + + PAssert.that(result).containsInAnyOrder(overResult); + + pipeline.run(); + } }