This is an automated email from the ASF dual-hosted git repository.

amaliujia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ea176d  [BEAM-10551] Implement Navigation Functions FIRST_VALUE and 
LAST_VALUE (#12313)
8ea176d is described below

commit 8ea176d23d8d13d2f9becb107178c36ef5269062
Author: jhnmora000 <48849676+jhnmora...@users.noreply.github.com>
AuthorDate: Wed Jul 22 00:48:02 2020 -0500

    [BEAM-10551] Implement Navigation Functions FIRST_VALUE and LAST_VALUE 
(#12313)
    
    * Implement navigation functions
    
    - FIRST_VALUE
    - LAST_VALUE
    
    Since, navigation functions are executed sequentially
    the mergeAccumulators implementation was replaced with
    an UnsupportedOperationException
---
 .../sdk/extensions/sql/impl/rel/BeamWindowRel.java |   3 +-
 .../transform/BeamBuiltinAnalyticFunctions.java    |  65 +++++++++++-
 .../extensions/sql/BeamAnalyticFunctionsTest.java  | 113 +++++++++++++++++++++
 3 files changed, 179 insertions(+), 2 deletions(-)

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java
index 9d29dd4..09ca5b1 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamWindowRel.java
@@ -293,7 +293,8 @@ public class BeamWindowRel extends Window implements 
BeamRelNode {
           Object accumulator = fieldAgg.combineFn.createAccumulator();
           final int aggFieldIndex = fieldAgg.inputFields.get(0);
           for (Row aggRow : aggRange) {
-            fieldAgg.combineFn.addInput(accumulator, 
aggRow.getBaseValue(aggFieldIndex));
+            accumulator =
+                fieldAgg.combineFn.addInput(accumulator, 
aggRow.getBaseValue(aggFieldIndex));
           }
           Object result = fieldAgg.combineFn.extractOutput(accumulator);
           Row processingRow = sortedRowsAsList.get(idx);
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAnalyticFunctions.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAnalyticFunctions.java
index 086b0f2..14fe20d 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAnalyticFunctions.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAnalyticFunctions.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.extensions.sql.impl.transform;
 
 import java.util.Map;
+import java.util.Optional;
 import java.util.function.Function;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.transforms.Combine;
@@ -29,7 +30,8 @@ public class BeamBuiltinAnalyticFunctions {
       BUILTIN_ANALYTIC_FACTORIES =
           ImmutableMap.<String, Function<Schema.FieldType, 
Combine.CombineFn<?, ?, ?>>>builder()
               .putAll(BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES)
-              // Pending Navigation functions
+              .put("FIRST_VALUE", typeName -> navigationFirstValue())
+              .put("LAST_VALUE", typeName -> navigationLastValue())
               // Pending Numbering functions
               .build();
 
@@ -42,4 +44,65 @@ public class BeamBuiltinAnalyticFunctions {
     throw new UnsupportedOperationException(
         String.format("Analytics Function [%s] is not supported", 
functionName));
   }
+
+  public static <T> Combine.CombineFn<T, ?, T> navigationFirstValue() {
+    return new FirstValueCombineFn();
+  }
+
+  public static <T> Combine.CombineFn<T, ?, T> navigationLastValue() {
+    return new LastValueCombineFn();
+  }
+
+  private static class FirstValueCombineFn<T> extends Combine.CombineFn<T, 
Optional<T>, T> {
+    private FirstValueCombineFn() {}
+
+    @Override
+    public Optional<T> createAccumulator() {
+      return Optional.empty();
+    }
+
+    @Override
+    public Optional<T> addInput(Optional<T> accumulator, T input) {
+      Optional<T> r = accumulator;
+      if (!accumulator.isPresent()) {
+        r = Optional.of(input);
+      }
+      return r;
+    }
+
+    @Override
+    public Optional<T> mergeAccumulators(Iterable<Optional<T>> accumulators) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public T extractOutput(Optional<T> accumulator) {
+      return accumulator.isPresent() ? accumulator.get() : null;
+    }
+  }
+
+  private static class LastValueCombineFn<T> extends Combine.CombineFn<T, 
Optional<T>, T> {
+    private LastValueCombineFn() {}
+
+    @Override
+    public Optional<T> createAccumulator() {
+      return Optional.empty();
+    }
+
+    @Override
+    public Optional<T> addInput(Optional<T> accumulator, T input) {
+      Optional<T> r = Optional.of(input);
+      return r;
+    }
+
+    @Override
+    public Optional<T> mergeAccumulators(Iterable<Optional<T>> accumulators) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public T extractOutput(Optional<T> accumulator) {
+      return accumulator.isPresent() ? accumulator.get() : null;
+    }
+  }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamAnalyticFunctionsTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamAnalyticFunctionsTest.java
index bfc53f3..2e06a0b 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamAnalyticFunctionsTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamAnalyticFunctionsTest.java
@@ -360,4 +360,117 @@ public class BeamAnalyticFunctionsTest extends 
BeamSqlDslBase {
 
     pipeline.run();
   }
+
+  // Tests for new functions
+  @Test
+  public void testFirstValueFunction() throws Exception {
+    pipeline.enableAbandonedNodeEnforcement(false);
+    PCollection<Row> inputRows = inputData();
+    String sql =
+        "SELECT item, purchases, category, FIRST_VALUE(purchases) over "
+            + "("
+            + "PARTITION BY category "
+            + "ORDER BY purchases "
+            + "ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"
+            + ")"
+            + " as total_purchases  FROM PCOLLECTION";
+    PCollection<Row> result = inputRows.apply("sql", SqlTransform.query(sql));
+
+    Schema overResultSchema =
+        Schema.builder()
+            .addStringField("item")
+            .addInt32Field("purchases")
+            .addStringField("category")
+            .addInt32Field("total_purchases")
+            .build();
+
+    List<Row> overResult =
+        TestUtils.RowsBuilder.of(overResultSchema)
+            .addRows(
+                "orange",
+                2,
+                "fruit",
+                2,
+                "apple",
+                8,
+                "fruit",
+                2,
+                "leek",
+                2,
+                "vegetable",
+                2,
+                "cabbage",
+                9,
+                "vegetable",
+                2,
+                "lettuce",
+                10,
+                "vegetable",
+                2,
+                "kale",
+                23,
+                "vegetable",
+                2)
+            .getRows();
+
+    PAssert.that(result).containsInAnyOrder(overResult);
+
+    pipeline.run();
+  }
+
+  @Test
+  public void testLastValueFunction() throws Exception {
+    pipeline.enableAbandonedNodeEnforcement(false);
+    PCollection<Row> inputRows = inputData();
+    String sql =
+        "SELECT item, purchases, category, LAST_VALUE(purchases) over "
+            + "("
+            + "PARTITION BY category "
+            + "ORDER BY purchases "
+            + "ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW"
+            + ")"
+            + " as total_purchases  FROM PCOLLECTION";
+    PCollection<Row> result = inputRows.apply("sql", SqlTransform.query(sql));
+
+    Schema overResultSchema =
+        Schema.builder()
+            .addStringField("item")
+            .addInt32Field("purchases")
+            .addStringField("category")
+            .addInt32Field("total_purchases")
+            .build();
+
+    List<Row> overResult =
+        TestUtils.RowsBuilder.of(overResultSchema)
+            .addRows(
+                "orange",
+                2,
+                "fruit",
+                2,
+                "apple",
+                8,
+                "fruit",
+                8,
+                "leek",
+                2,
+                "vegetable",
+                2,
+                "cabbage",
+                9,
+                "vegetable",
+                9,
+                "lettuce",
+                10,
+                "vegetable",
+                10,
+                "kale",
+                23,
+                "vegetable",
+                23)
+            .getRows();
+
+    PAssert.that(result).containsInAnyOrder(overResult);
+
+    pipeline.run();
+  }
 }

Reply via email to