[
https://issues.apache.org/jira/browse/BEAM-4702?focusedWorklogId=118218&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-118218
]
ASF GitHub Bot logged work on BEAM-4702:
----------------------------------------
Author: ASF GitHub Bot
Created on: 02/Jul/18 16:04
Start Date: 02/Jul/18 16:04
Worklog Time Spent: 10m
Work Description: kennknowles closed pull request #5850: [BEAM-4702]
Re-window into global window after GROUP BY <window> plus a couple other little
fixes
URL: https://github.com/apache/beam/pull/5850
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
index 4a9e96a89b8..de667f0a1e7 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/BeamSqlFnExecutor.java
@@ -270,7 +270,11 @@ static BeamSqlExpression buildExpression(RexNode rexNode) {
// arithmetic operators
case "+":
- ret = new BeamSqlPlusExpression(subExps);
+ if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) {
+ ret = new BeamSqlPlusExpression(subExps);
+ } else {
+ ret = new BeamSqlDatetimePlusExpression(subExps);
+ }
break;
case "-":
if (SqlTypeName.NUMERIC_TYPES.contains(node.type.getSqlTypeName())) {
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
index 6d6e7395a14..93efd52445a 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
@@ -101,7 +101,7 @@ public BeamAggregationRel(
PCollection<Row> windowedStream =
windowField.isPresent()
- ? upstream.apply(Window.into(windowField.get().windowFn()))
+ ? upstream.apply("Grouping Window",
Window.into(windowField.get().windowFn()))
: upstream;
validateWindowIsSupported(windowedStream);
@@ -136,6 +136,14 @@ public BeamAggregationRel(
CalciteUtils.toBeamSchema(getRowType()),
windowFieldIndex)));
mergedStream.setCoder(CalciteUtils.toBeamSchema(getRowType()).getRowCoder());
+ // If this aggration includes a window field, like GROUP BY TUMBLE(<ts>,
<dur>), then
+ // the resulting rows are globally windowed.
+ // However, if it is not present, then this may be a per-window SQL
operation on a
+ // PCollection in which case we carry along the upstream
windowingStrategy
+ if (windowField.isPresent()) {
+ mergedStream = mergedStream.apply("Global Window", Window.into(new
GlobalWindows()));
+ }
+
return mergedStream;
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
index eda0b8a018b..f81824b1dcc 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamSortRel.java
@@ -21,7 +21,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import java.io.Serializable;
-import java.lang.reflect.Type;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Comparator;
@@ -42,11 +41,12 @@
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollation;
@@ -154,11 +154,12 @@ public int getCount() {
BeamIOSinkRel.class.getSimpleName(),
pinput);
PCollection<Row> upstream = pinput.get(0);
- Type windowType =
-
upstream.getWindowingStrategy().getWindowFn().getWindowTypeDescriptor().getType();
- if (!windowType.equals(GlobalWindow.class)) {
+ WindowingStrategy<?, ?> windowingStrategy =
upstream.getWindowingStrategy();
+ if (!(windowingStrategy.getWindowFn() instanceof GlobalWindows)) {
throw new UnsupportedOperationException(
- "`ORDER BY` is only supported for GlobalWindow, actual window: " +
windowType);
+ String.format(
+ "`ORDER BY` is only supported for %s, actual windowing
strategy: %s",
+ GlobalWindows.class.getSimpleName(), windowingStrategy));
}
BeamSqlRowComparator comparator =
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
index 532da615449..2d257ecb03b 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamAggregationTransforms.java
@@ -185,6 +185,7 @@ public AggregationAdaptor(
aggregators.add(BeamBuiltinAggregations.createMin(call.type.getSqlTypeName()));
break;
case "SUM":
+ case "$SUM0":
aggregators.add(BeamBuiltinAggregations.createSum(call.type.getSqlTypeName()));
break;
case "AVG":
diff --git
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery5.java
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery5.java
index 317bcf84820..2fb2d1a60e2 100644
---
a/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery5.java
+++
b/sdks/java/nexmark/src/main/java/org/apache/beam/sdk/nexmark/queries/sql/SqlQuery5.java
@@ -57,24 +57,33 @@
private static final String QUERY_TEMPLATE =
""
- + " SELECT auction, num "
- + " FROM (SELECT B1.auction, count(*) AS num, "
- + " HOP_START(B1.dateTime, INTERVAL '%1$d' SECOND, "
- + " INTERVAL '%2$d' SECOND) AS starttime "
- + " FROM Bid B1 "
- + " GROUP BY B1.auction, "
- + " HOP(B1.dateTime, INTERVAL '%1$d' SECOND, "
- + " INTERVAL '%2$d' SECOND)) B1 "
- + " JOIN (SELECT max(B2.num) AS maxnum, B2.starttime "
- + " FROM (SELECT count(*) AS num, "
- + " HOP_START(B2.dateTime, INTERVAL '%1$d' SECOND, "
- + " INTERVAL '%2$d' SECOND) AS starttime "
- + " FROM Bid B2 "
- + " GROUP BY B2.auction, "
- + " HOP(B2.dateTime, INTERVAL '%1$d' SECOND, "
- + " INTERVAL '%2$d' SECOND)) B2 "
- + " GROUP BY B2.starttime) B2 "
- + " ON B1.starttime = B2.starttime AND B1.num >= B2.maxnum ";
+ + " SELECT AuctionBids.auction, AuctionBids.num "
+ + " FROM ("
+ + " SELECT"
+ + " B1.auction,"
+ + " count(*) AS num,"
+ + " HOP_START(B1.dateTime, INTERVAL '%1$d' SECOND, INTERVAL
'%2$d' SECOND) AS starttime"
+ + " FROM Bid B1 "
+ + " GROUP BY "
+ + " B1.auction,"
+ + " HOP(B1.dateTime, INTERVAL '%1$d' SECOND, INTERVAL '%2$d'
SECOND)"
+ + " ) AS AuctionBids"
+ + " JOIN ("
+ + " SELECT "
+ + " max(CountBids.num) AS maxnum, "
+ + " HOP_START(CountBids.starttime, INTERVAL '%1$d' SECOND,
INTERVAL '%2$d' SECOND) AS starttime"
+ + " FROM ("
+ + " SELECT"
+ + " count(*) AS num,"
+ + " HOP_START(B2.dateTime, INTERVAL '%1$d' SECOND, INTERVAL
'%2$d' SECOND) AS starttime"
+ + " FROM Bid B2 "
+ + " GROUP BY "
+ + " B2.auction, "
+ + " HOP(B2.dateTime, INTERVAL '%1$d' SECOND, INTERVAL '%2$d'
SECOND)"
+ + " ) AS CountBids"
+ + " GROUP BY HOP(CountBids.starttime, INTERVAL '%1$d' SECOND,
INTERVAL '%2$d' SECOND)"
+ + " ) AS MaxBids "
+ + " ON AuctionBids.starttime = MaxBids.starttime AND AuctionBids.num
>= MaxBids.maxnum ";
private final PTransform<PInput, PCollection<Row>> query;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 118218)
Time Spent: 1h 10m (was: 1h)
> After SQL GROUP BY <windowing> the result should be globally windowed
> ---------------------------------------------------------------------
>
> Key: BEAM-4702
> URL: https://issues.apache.org/jira/browse/BEAM-4702
> Project: Beam
> Issue Type: New Feature
> Components: dsl-sql
> Reporter: Kenneth Knowles
> Assignee: Kenneth Knowles
> Priority: Major
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)