[ 
https://issues.apache.org/jira/browse/BEAM-4449?focusedWorklogId=145119&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145119
 ]

ASF GitHub Bot logged work on BEAM-4449:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 17/Sep/18 23:17
            Start Date: 17/Sep/18 23:17
    Worklog Time Spent: 10m 
      Work Description: apilloud closed pull request #6382: [BEAM-4449] Finish 
splitting Aggregate and Project
URL: https://github.com/apache/beam/pull/6382
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
index b65d85fb4fd..dec5217fa37 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
@@ -33,9 +33,6 @@
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlOperatorExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlPrimitive;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlUdfExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowEndExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowExpression;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.BeamSqlWindowStartExpression;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.DateOperators;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.StringOperators;
 import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.operator.arithmetic.BeamSqlDivideExpression;
@@ -504,21 +501,6 @@ private static BeamSqlExpression 
getBeamSqlExpression(RexNode rexNode) {
           ret = new BeamSqlIsNotNullExpression(subExps.get(0));
           break;
 
-        case "HOP":
-        case "TUMBLE":
-        case "SESSION":
-          ret = new BeamSqlWindowExpression(subExps, 
node.type.getSqlTypeName());
-          break;
-        case "HOP_START":
-        case "TUMBLE_START":
-        case "SESSION_START":
-          ret = new BeamSqlWindowStartExpression();
-          break;
-        case "HOP_END":
-        case "TUMBLE_END":
-        case "SESSION_END":
-          ret = new BeamSqlWindowEndExpression();
-          break;
         default:
           // handle UDF
           if (((RexCall) rexNode).getOperator() instanceof 
SqlUserDefinedFunction) {
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java
deleted file mode 100644
index 9caa4158cf5..00000000000
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowEndExpression.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
-
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionEnvironment;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.values.Row;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.joda.time.DateTime;
-
-/**
- * {@code BeamSqlExpression} for {@code HOP_END}, {@code TUMBLE_END}, {@code 
SESSION_END} operation.
- *
- * <p>These operators returns the <em>end</em> timestamp of window.
- */
-public class BeamSqlWindowEndExpression extends BeamSqlExpression {
-
-  @Override
-  public boolean accept() {
-    return true;
-  }
-
-  @Override
-  public BeamSqlPrimitive<DateTime> evaluate(
-      Row inputRow, BoundedWindow window, BeamSqlExpressionEnvironment env) {
-    if (window instanceof IntervalWindow) {
-      return BeamSqlPrimitive.of(
-          SqlTypeName.TIMESTAMP, new DateTime(((IntervalWindow) 
window).end()));
-    } else {
-      throw new UnsupportedOperationException(
-          "Cannot run HOP_END|TUMBLE_END|SESSION_END on GlobalWindow.");
-    }
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java
deleted file mode 100644
index 81061ae0216..00000000000
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowExpression.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
-
-import java.util.List;
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionEnvironment;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.values.Row;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.joda.time.ReadableInstant;
-
-/**
- * {@code BeamSqlExpression} for {@code HOP}, {@code TUMBLE}, {@code SESSION} 
operation.
- *
- * <p>These functions don't change the timestamp field, instead it's used to 
indicate the
- * event_timestamp field, and how the window is defined.
- */
-public class BeamSqlWindowExpression extends BeamSqlExpression {
-
-  public BeamSqlWindowExpression(List<BeamSqlExpression> operands, SqlTypeName 
outputType) {
-    super(operands, outputType);
-  }
-
-  @Override
-  public boolean accept() {
-    return operands.get(0).getOutputType().equals(SqlTypeName.DATE)
-        || operands.get(0).getOutputType().equals(SqlTypeName.TIME)
-        || operands.get(0).getOutputType().equals(SqlTypeName.TIMESTAMP);
-  }
-
-  @Override
-  public BeamSqlPrimitive<ReadableInstant> evaluate(
-      Row inputRow, BoundedWindow window, BeamSqlExpressionEnvironment env) {
-    return BeamSqlPrimitive.of(
-        SqlTypeName.TIMESTAMP,
-        (ReadableInstant) operands.get(0).evaluate(inputRow, window, 
env).getValue());
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java
deleted file mode 100644
index 84566522aeb..00000000000
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlWindowStartExpression.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.impl.interpreter.operator;
-
-import 
org.apache.beam.sdk.extensions.sql.impl.interpreter.BeamSqlExpressionEnvironment;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
-import org.apache.beam.sdk.values.Row;
-import org.apache.calcite.sql.type.SqlTypeName;
-import org.joda.time.DateTime;
-
-/**
- * {@code BeamSqlExpression} for {@code HOP_START}, {@code TUMBLE_START}, 
{@code SESSION_START}
- * operation.
- *
- * <p>These operators returns the <em>start</em> timestamp of window.
- */
-public class BeamSqlWindowStartExpression extends BeamSqlExpression {
-
-  @Override
-  public boolean accept() {
-    return true;
-  }
-
-  @Override
-  public BeamSqlPrimitive<DateTime> evaluate(
-      Row inputRow, BoundedWindow window, BeamSqlExpressionEnvironment env) {
-    if (window instanceof IntervalWindow) {
-      return BeamSqlPrimitive.of(
-          SqlTypeName.TIMESTAMP, new DateTime(((IntervalWindow) 
window).start()));
-    } else {
-      throw new UnsupportedOperationException(
-          "Cannot run HOP_START|TUMBLE_START|SESSION_START on GlobalWindow.");
-    }
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
index 236e5fa5639..36b7225532e 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
@@ -22,9 +22,7 @@
 import static org.apache.beam.sdk.values.PCollection.IsBounded.BOUNDED;
 
 import java.util.List;
-import java.util.Optional;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.extensions.sql.impl.rule.AggregateWindowField;
 import 
org.apache.beam.sdk.extensions.sql.impl.transform.BeamAggregationTransforms;
 import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
 import org.apache.beam.sdk.schemas.Schema;
@@ -36,8 +34,13 @@
 import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.WithTimestamps;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
@@ -46,6 +49,7 @@
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
 import org.apache.calcite.rel.core.Aggregate;
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.rel.type.RelDataType;
@@ -55,8 +59,8 @@
 
 /** {@link BeamRelNode} to replace a {@link Aggregate} node. */
 public class BeamAggregationRel extends Aggregate implements BeamRelNode {
+  private WindowFn<Row, IntervalWindow> windowFn;
   private final int windowFieldIndex;
-  private Optional<AggregateWindowField> windowField;
 
   public BeamAggregationRel(
       RelOptCluster cluster,
@@ -66,11 +70,45 @@ public BeamAggregationRel(
       ImmutableBitSet groupSet,
       List<ImmutableBitSet> groupSets,
       List<AggregateCall> aggCalls,
-      Optional<AggregateWindowField> windowField) {
+      WindowFn<Row, IntervalWindow> windowFn,
+      int windowFieldIndex) {
 
     super(cluster, traits, child, indicator, groupSet, groupSets, aggCalls);
-    this.windowField = windowField;
-    this.windowFieldIndex = 
windowField.map(AggregateWindowField::fieldIndex).orElse(-1);
+
+    this.windowFn = windowFn;
+    this.windowFieldIndex = windowFieldIndex;
+  }
+
+  @Override
+  public RelWriter explainTerms(RelWriter pw) {
+    super.explainTerms(pw);
+    if (this.windowFn != null) {
+      WindowFn windowFn = this.windowFn;
+      String window = windowFn.getClass().getSimpleName() + "($" + 
String.valueOf(windowFieldIndex);
+      if (windowFn instanceof FixedWindows) {
+        FixedWindows fn = (FixedWindows) windowFn;
+        window = window + ", " + fn.getSize().toString() + ", " + 
fn.getOffset().toString();
+      } else if (windowFn instanceof SlidingWindows) {
+        SlidingWindows fn = (SlidingWindows) windowFn;
+        window =
+            window
+                + ", "
+                + fn.getPeriod().toString()
+                + ", "
+                + fn.getSize().toString()
+                + ", "
+                + fn.getOffset().toString();
+      } else if (windowFn instanceof Sessions) {
+        Sessions fn = (Sessions) windowFn;
+        window = window + ", " + fn.getGapDuration().toString();
+      } else {
+        throw new RuntimeException(
+            "Unknown window function " + windowFn.getClass().getSimpleName());
+      }
+      window = window + ")";
+      pw.item("window", window);
+    }
+    return pw;
   }
 
   @Override
@@ -88,7 +126,8 @@ public BeamAggregationRel(
           BeamAggregationRel.class.getSimpleName(),
           pinput);
       PCollection<Row> upstream = pinput.get(0);
-      if (windowField.isPresent()) {
+      PCollection<Row> windowedStream = upstream;
+      if (windowFn != null) {
         upstream =
             upstream
                 .apply(
@@ -97,13 +136,9 @@ public BeamAggregationRel(
                             new 
BeamAggregationTransforms.WindowTimestampFn(windowFieldIndex))
                         .withAllowedTimestampSkew(new 
Duration(Long.MAX_VALUE)))
                 .setCoder(upstream.getCoder());
+        windowedStream = upstream.apply(Window.into(windowFn));
       }
 
-      PCollection<Row> windowedStream =
-          windowField.isPresent()
-              ? upstream.apply(Window.into(windowField.get().windowFn()))
-              : upstream;
-
       validateWindowIsSupported(windowedStream);
 
       Schema keySchema = exKeyFieldsSchema(input.getRowType());
@@ -196,6 +231,14 @@ public Aggregate copy(
       List<ImmutableBitSet> groupSets,
       List<AggregateCall> aggCalls) {
     return new BeamAggregationRel(
-        getCluster(), traitSet, input, indicator, groupSet, groupSets, 
aggCalls, windowField);
+        getCluster(),
+        traitSet,
+        input,
+        indicator,
+        groupSet,
+        groupSets,
+        aggCalls,
+        windowFn,
+        windowFieldIndex);
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
index 2d050acdb1f..bbd9194accb 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamCalcRel.java
@@ -32,7 +32,6 @@
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.Row;
@@ -112,12 +111,11 @@ public void setup() {
     }
 
     @ProcessElement
-    public void processElement(ProcessContext c, BoundedWindow window) {
+    public void processElement(ProcessContext c) {
       Row inputRow = c.element();
       @Nullable
       List<Object> rawResultValues =
-          executor.execute(
-              inputRow, window, BeamSqlExpressionEnvironments.forRow(inputRow, 
window));
+          executor.execute(inputRow, null, 
BeamSqlExpressionEnvironments.forRow(inputRow, null));
 
       if (rawResultValues != null) {
         List<Object> castResultValues =
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/AggregateWindowFactory.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/AggregateWindowFactory.java
deleted file mode 100644
index 6629e1f46ef..00000000000
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/AggregateWindowFactory.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.rule;
-
-import java.util.List;
-import java.util.Optional;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Sessions;
-import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexLiteral;
-import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.sql.SqlKind;
-import org.joda.time.Duration;
-
-/** Creates {@link WindowFn} wrapper based on HOP/TUMBLE/SESSION call in a 
query. */
-class AggregateWindowFactory {
-
-  /**
-   * Returns optional of {@link AggregateWindowField} which represents a 
windowing function
-   * specified by HOP/TUMBLE/SESSION in the SQL query.
-   *
-   * <p>If no known windowing function is specified in the query, then {@link 
Optional#empty()} is
-   * returned.
-   *
-   * <p>Throws {@link UnsupportedOperationException} if it cannot convert SQL 
windowing function
-   * call to Beam model, see {@link #getWindowFieldAt(RexCall, int)} for 
details.
-   */
-  static Optional<AggregateWindowField> getWindowFieldAt(RexCall call, int 
groupField) {
-
-    Optional<WindowFn> windowFnOptional = createWindowFn(call.operands, 
call.op.kind);
-
-    return windowFnOptional.map(
-        windowFn ->
-            
AggregateWindowField.builder().setFieldIndex(groupField).setWindowFn(windowFn).build());
-  }
-
-  /**
-   * Returns a {@link WindowFn} based on the SQL windowing function defined by 
{#code operatorKind}.
-   * Supported {@link SqlKind}s:
-   *
-   * <ul>
-   *   <li>{@link SqlKind#TUMBLE}, mapped to {@link FixedWindows};
-   *   <li>{@link SqlKind#HOP}, mapped to {@link SlidingWindows};
-   *   <li>{@link SqlKind#SESSION}, mapped to {@link Sessions};
-   * </ul>
-   *
-   * <p>For example:
-   *
-   * <pre>{@code
-   * SELECT event_timestamp, COUNT(*)
-   * FROM PCOLLECTION
-   * GROUP BY TUMBLE(event_timestamp, INTERVAL '1' HOUR)
-   * }</pre>
-   *
-   * <p>SQL window functions support optional window_offset parameter which 
indicates a how window
-   * definition is offset from the event time. Offset is zero if not specified.
-   *
-   * <p>Beam model does not support offset for session windows, so this method 
will throw {@link
-   * UnsupportedOperationException} if offset is specified in SQL query for 
{@link SqlKind#SESSION}.
-   */
-  private static Optional<WindowFn> createWindowFn(List<RexNode> parameters, 
SqlKind operatorKind) {
-    switch (operatorKind) {
-      case TUMBLE:
-
-        // Fixed-size, non-intersecting time-based windows, for example:
-        //   every hour aggregate elements from the previous hour;
-        //
-        // SQL Syntax:
-        //   TUMBLE(monotonic_field, window_size [, window_offset])
-        //
-        // Example:
-        //   TUMBLE(event_timestamp_field, INTERVAL '1' HOUR)
-
-        FixedWindows fixedWindows = 
FixedWindows.of(durationParameter(parameters, 1));
-        if (parameters.size() == 3) {
-          fixedWindows = fixedWindows.withOffset(durationParameter(parameters, 
2));
-        }
-
-        return Optional.of(fixedWindows);
-      case HOP:
-
-        // Sliding, fixed-size, intersecting time-based windows, for example:
-        //   every minute aggregate elements from the previous hour;
-        //
-        // SQL Syntax:
-        //   HOP(monotonic_field, emit_frequency, window_size [, 
window_offset])
-        //
-        // Example:
-        //   HOP(event_timestamp_field, INTERVAL '1' MINUTE, INTERVAL '1' HOUR)
-
-        SlidingWindows slidingWindows =
-            SlidingWindows.of(durationParameter(parameters, 2))
-                .every(durationParameter(parameters, 1));
-
-        if (parameters.size() == 4) {
-          slidingWindows = 
slidingWindows.withOffset(durationParameter(parameters, 3));
-        }
-
-        return Optional.of(slidingWindows);
-      case SESSION:
-
-        // Session windows, for example:
-        //   aggregate events after a gap of 1 minute of no events;
-        //
-        // SQL Syntax:
-        //   SESSION(monotonic_field, session_gap)
-        //
-        // Example:
-        //   SESSION(event_timestamp_field, INTERVAL '1' MINUTE)
-
-        Sessions sessions = 
Sessions.withGapDuration(durationParameter(parameters, 1));
-        if (parameters.size() == 3) {
-          throw new UnsupportedOperationException(
-              "Specifying alignment (offset) is not supported for session 
windows");
-        }
-
-        return Optional.of(sessions);
-      default:
-        return Optional.empty();
-    }
-  }
-
-  private static Duration durationParameter(List<RexNode> parameters, int 
parameterIndex) {
-    return Duration.millis(intValue(parameters.get(parameterIndex)));
-  }
-
-  private static long intValue(RexNode operand) {
-    if (operand instanceof RexLiteral) {
-      return RexLiteral.intValue(operand);
-    } else {
-      throw new IllegalArgumentException(String.format("[%s] is not valid.", 
operand));
-    }
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/AggregateWindowField.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/AggregateWindowField.java
deleted file mode 100644
index ff8b9d71931..00000000000
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/AggregateWindowField.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.extensions.sql.impl.rule;
-
-import com.google.auto.value.AutoValue;
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.values.Row;
-
-/**
- * <b>For internal use only; no backwards compatibility guarantees.</b>
- *
- * <p>Represents a field with a window function call in a SQL expression.
- */
-@Internal
-@AutoValue
-public abstract class AggregateWindowField {
-  public abstract int fieldIndex();
-
-  public abstract WindowFn<Row, ? extends BoundedWindow> windowFn();
-
-  static Builder builder() {
-    return new AutoValue_AggregateWindowField.Builder();
-  }
-
-  @AutoValue.Builder
-  abstract static class Builder {
-    abstract Builder setFieldIndex(int fieldIndex);
-
-    abstract Builder setWindowFn(WindowFn<Row, ? extends BoundedWindow> 
window);
-
-    abstract AggregateWindowField build();
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamAggregationRule.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamAggregationRule.java
index 429949fe335..70ece2fc63a 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamAggregationRule.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamAggregationRule.java
@@ -17,10 +17,15 @@
  */
 package org.apache.beam.sdk.extensions.sql.impl.rule;
 
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamAggregationRel;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamLogicalConvention;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.calcite.plan.RelOptRule;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.rel.RelNode;
@@ -28,9 +33,12 @@
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.RelFactories;
 import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexLiteral;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.tools.RelBuilderFactory;
 import org.apache.calcite.util.ImmutableBitSet;
+import org.joda.time.Duration;
 
 /** Rule to detect the window/trigger settings. */
 public class BeamAggregationRule extends RelOptRule {
@@ -54,29 +62,133 @@ public void onMatch(RelOptRuleCall call) {
 
   private static RelNode updateWindow(RelOptRuleCall call, Aggregate 
aggregate, Project project) {
     ImmutableBitSet groupByFields = aggregate.getGroupSet();
-    List<RexNode> projectMapping = project.getProjects();
+    ArrayList<RexNode> projects = new ArrayList(project.getProjects());
 
-    Optional<AggregateWindowField> windowField = Optional.empty();
+    WindowFn windowFn = null;
+    int windowFieldIndex = -1;
 
     for (int groupFieldIndex : groupByFields.asList()) {
-      RexNode projNode = projectMapping.get(groupFieldIndex);
+      RexNode projNode = projects.get(groupFieldIndex);
       if (!(projNode instanceof RexCall)) {
         continue;
       }
 
-      windowField = AggregateWindowFactory.getWindowFieldAt((RexCall) 
projNode, groupFieldIndex);
+      RexCall rexCall = (RexCall) projNode;
+      WindowFn fn = createWindowFn(rexCall.getOperands(), rexCall.op.kind);
+      if (fn != null) {
+        windowFn = fn;
+        windowFieldIndex = groupFieldIndex;
+        projects.set(groupFieldIndex, rexCall.getOperands().get(0));
+      }
     }
 
+    final Project newProject =
+        project.copy(project.getTraitSet(), project.getInput(), projects, 
project.getRowType());
+
     return new BeamAggregationRel(
         aggregate.getCluster(),
         aggregate.getTraitSet().replace(BeamLogicalConvention.INSTANCE),
-        convert(
-            aggregate.getInput(),
-            
aggregate.getInput().getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
+        convert(newProject, 
newProject.getTraitSet().replace(BeamLogicalConvention.INSTANCE)),
         aggregate.indicator,
         aggregate.getGroupSet(),
         aggregate.getGroupSets(),
         aggregate.getAggCallList(),
-        windowField);
+        windowFn,
+        windowFieldIndex);
+  }
+
+  /**
+   * Returns a {@link WindowFn} based on the SQL windowing function defined by 
{#code operatorKind}.
+   * Supported {@link SqlKind}s:
+   *
+   * <ul>
+   *   <li>{@link SqlKind#TUMBLE}, mapped to {@link FixedWindows};
+   *   <li>{@link SqlKind#HOP}, mapped to {@link SlidingWindows};
+   *   <li>{@link SqlKind#SESSION}, mapped to {@link Sessions};
+   * </ul>
+   *
+   * <p>For example:
+   *
+   * <pre>{@code
+   * SELECT event_timestamp, COUNT(*)
+   * FROM PCOLLECTION
+   * GROUP BY TUMBLE(event_timestamp, INTERVAL '1' HOUR)
+   * }</pre>
+   *
+   * <p>SQL window functions support optional window_offset parameter which 
indicates a how window
+   * definition is offset from the event time. Offset is zero if not specified.
+   *
+   * <p>Beam model does not support offset for session windows, so this method 
will throw {@link
+   * UnsupportedOperationException} if offset is specified in SQL query for 
{@link SqlKind#SESSION}.
+   */
+  private static @Nullable WindowFn createWindowFn(List<RexNode> parameters, 
SqlKind operatorKind) {
+    switch (operatorKind) {
+      case TUMBLE:
+
+        // Fixed-size, non-intersecting time-based windows, for example:
+        //   every hour aggregate elements from the previous hour;
+        //
+        // SQL Syntax:
+        //   TUMBLE(monotonic_field, window_size [, window_offset])
+        //
+        // Example:
+        //   TUMBLE(event_timestamp_field, INTERVAL '1' HOUR)
+
+        FixedWindows fixedWindows = 
FixedWindows.of(durationParameter(parameters, 1));
+        if (parameters.size() == 3) {
+          fixedWindows = fixedWindows.withOffset(durationParameter(parameters, 
2));
+        }
+
+        return fixedWindows;
+      case HOP:
+
+        // Sliding, fixed-size, intersecting time-based windows, for example:
+        //   every minute aggregate elements from the previous hour;
+        //
+        // SQL Syntax:
+        //   HOP(monotonic_field, emit_frequency, window_size [, 
window_offset])
+
+        SlidingWindows slidingWindows =
+            SlidingWindows.of(durationParameter(parameters, 2))
+                .every(durationParameter(parameters, 1));
+
+        if (parameters.size() == 4) {
+          slidingWindows = 
slidingWindows.withOffset(durationParameter(parameters, 3));
+        }
+
+        return slidingWindows;
+      case SESSION:
+
+        // Session windows, for example:
+        //   aggregate events after a gap of 1 minute of no events;
+        //
+        // SQL Syntax:
+        //   SESSION(monotonic_field, session_gap)
+        //
+        // Example:
+        //   SESSION(event_timestamp_field, INTERVAL '1' MINUTE)
+
+        Sessions sessions = 
Sessions.withGapDuration(durationParameter(parameters, 1));
+        if (parameters.size() == 3) {
+          throw new UnsupportedOperationException(
+              "Specifying alignment (offset) is not supported for session 
windows");
+        }
+
+        return sessions;
+      default:
+        return null;
+    }
+  }
+
+  private static Duration durationParameter(List<RexNode> parameters, int 
parameterIndex) {
+    return Duration.millis(intValue(parameters.get(parameterIndex)));
+  }
+
+  private static long intValue(RexNode operand) {
+    if (operand instanceof RexLiteral) {
+      return RexLiteral.intValue(operand);
+    } else {
+      throw new IllegalArgumentException(String.format("[%s] is not valid.", 
operand));
+    }
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 145119)
    Time Spent: 8h 10m  (was: 8h)

> Use Calc instead of Project and Filter separately
> -------------------------------------------------
>
>                 Key: BEAM-4449
>                 URL: https://issues.apache.org/jira/browse/BEAM-4449
>             Project: Beam
>          Issue Type: Improvement
>          Components: dsl-sql
>            Reporter: Kenneth Knowles
>            Assignee: Kenneth Knowles
>            Priority: Major
>             Fix For: 2.5.0
>
>          Time Spent: 8h 10m
>  Remaining Estimate: 0h
>
> Calcite has a combined Calc operator that is amenable to more optimization, 
> and also means less code to manage as we adjust how the operators/expressions 
> are implemented.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to