[GitHub] flink pull request #3252: [FLINK-5624] Support tumbling window on streaming ...

2017-02-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3252


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3252: [FLINK-5624] Support tumbling window on streaming ...

2017-02-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3252#discussion_r101239149
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/WindowAggregateITCase.java
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.java.stream.sql;
+
+import org.apache.flink.api.java.tuple.Tuple4;
+import com.google.common.collect.ImmutableList;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase;
+import org.apache.flink.types.Row;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
+public class WindowAggregateITCase extends 
StreamingMultipleProgramsTestBase {
--- End diff --

I don't think we need integration tests for this feature. We try to keep 
those to a minimum to not blow up the build time. Since window aggregates are 
already tested for the Table API, it should be sufficient to test the resulting 
execution plan.

We have the `TableTestBase` which can be extended to validate the result of 
the optimization. This should be enough to test this feature. Since these tests 
are quite cheap, we can also test more queries also with differently ordered 
grouping expressions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3252: [FLINK-5624] Support tumbling window on streaming ...

2017-02-15 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3252#discussion_r101248097
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala
 ---
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.datastream
+
+import java.util.Calendar
+
+import com.google.common.collect.ImmutableList
+import org.apache.calcite.avatica.util.TimeUnitRange
+import org.apache.calcite.plan._
+import org.apache.calcite.plan.hep.HepRelVertex
+import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject}
+import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode}
+import org.apache.calcite.sql.fun.SqlFloorFunction
+import org.apache.calcite.util.ImmutableBitSet
+import org.apache.flink.table.api.scala.Tumble
+import org.apache.flink.table.api.{TableException, TumblingWindow, Window}
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.functions.EventTimeExtractor
+import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
+
+import scala.collection.JavaConversions._
+
+class LogicalWindowAggregateRule
+  extends RelOptRule(
+LogicalWindowAggregateRule.LOGICAL_WINDOW_PREDICATE,
+"LogicalWindowAggregateRule") {
+
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg = call.rel(0).asInstanceOf[LogicalAggregate]
+
+val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
+val groupSets = agg.getGroupSets.size() != 1 || 
agg.getGroupSets.get(0) != agg.getGroupSet
+
+val windowClause = recognizeWindow(agg)
+!distinctAggs && !groupSets && !agg.indicator && windowClause.isDefined
+  }
+
+  /**
+* Transform LogicalAggregate with windowing expression to 
LogicalProject
+* + LogicalWindowAggregate + LogicalProject.
+*
+* The transformation adds an additional LogicalProject at the top to 
ensure
+* that the types are equivalent.
+*/
+  override def onMatch(call: RelOptRuleCall): Unit = {
+val agg = call.rel[LogicalAggregate](0)
+val project = 
agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject]
+val (windowExprIdx, window) = recognizeWindow(agg).get
+val newGroupSet = 
agg.getGroupSet.except(ImmutableBitSet.of(windowExprIdx))
+
+val builder = call.builder()
+val rexBuilder = builder.getRexBuilder
+val zero = 
rexBuilder.makeTimestampLiteral(LogicalWindowAggregateRule.TIMESTAMP_ZERO, 3)
+
+val newAgg = builder
+  .push(project.getInput)
+  .project(project.getChildExps.updated(windowExprIdx, zero))
+  .aggregate(builder.groupKey(
+newGroupSet,
+agg.indicator, ImmutableList.of(newGroupSet)), agg.getAggCallList)
+  .build().asInstanceOf[LogicalAggregate]
+
+// Create an additional project to conform with types
+val transformed = call.builder()
+transformed.push(LogicalWindowAggregate.create(
+  window.toLogicalWindow,
+  Seq[NamedWindowProperty](),
+  newAgg))
+  .project(List(zero) ++ transformed.fields())
--- End diff --

The `zero` element must be injected at the position of the window attribute 
in the grouping set. 
If you change the order of grouping attributes in the SQL query in the 
`testMultiGroup()` to `GROUP BY id, FLOOR(rowtime() TO HOUR)`, the planning 
fails.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3252: [FLINK-5624] Support tumbling window on streaming ...

2017-02-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3252#discussion_r100990608
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala
 ---
@@ -33,10 +34,18 @@ object EventTimeExtractor extends 
SqlFunction("ROWTIME", SqlKind.OTHER_FUNCTION,
 SqlMonotonicity.INCREASING
 }
 
-case class RowTime() extends LeafExpression {
+case class TimeIndicator() extends LeafExpression {
--- End diff --

Should be `RowTime` (`ProcTime` will be added later).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3252: [FLINK-5624] Support tumbling window on streaming ...

2017-02-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3252#discussion_r100995825
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala
 ---
@@ -159,9 +152,25 @@ class LogicalWindowAggregateRule
 }
 false
   }
+
+  private def rewriteTimeIndicatorOperators(agg: LogicalAggregate, 
groupExprIdx: Int) = {
+val project = 
agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject]
+val newProjectExpr = mutable.ArrayBuffer[RexNode]()
+newProjectExpr.appendAll(project.getChildExps)
+val rexBuilder = agg.getCluster.getRexBuilder
+newProjectExpr(groupExprIdx) = rexBuilder.makeTimestampLiteral(
+  DataStreamAggregateRule.TIMESTAMP_ZERO, 3)
+val newProject = project.copy(project.getTraitSet, project.getInput,
+  newProjectExpr, project.getRowType)
+
+agg.copy(agg.getTraitSet, 
List(newProject)).asInstanceOf[LogicalAggregate]
--- End diff --

Can we create here a `LogicalAggregate` with adapted `groupSet`? 
I think adding the `windowingGroupSet` to the `LogicalWindowAggregate` is 
not a very nice solution. It would be better if we could keep the existing code 
as it is without introducing workarounds for the SQL case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3252: [FLINK-5624] Support tumbling window on streaming ...

2017-02-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3252#discussion_r100991050
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -225,12 +231,29 @@ abstract class StreamTableEnvironment(
 // decorrelate
 val decorPlan = RelDecorrelator.decorrelateQuery(relNode)
 
+val prePlanner = createHepPlanner
--- End diff --

I'll merge PR #3101 later today which adds a normalization phase before 
optimization by adding a HepPlanner.
Could you integrate you changes with #3101?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3252: [FLINK-5624] Support tumbling window on streaming ...

2017-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3252#discussion_r100918614
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
 ---
@@ -290,6 +291,15 @@ object FunctionGenerator {
 Seq(),
 new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true))
 
+  // Make ROWTIME() return the local timestamp
+  // The function has to be executable as in windowed queries it is used
+  // in the GroupBy expression. The results of the function, however, does
+  // not matter.
+  addSqlFunction(
+EventTimeExtractor,
+Seq(),
+new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true))
--- End diff --

Ah, yes. You are right. It is still called in the DataStreamCalc and cannot 
be easily removed as you noted.

Alright, then I'd suggest to just emit a casted `null`. This is not very 
nice, as it might also be called at any other place but since we will remove 
the marker function soon, it should not be a big issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3252: [FLINK-5624] Support tumbling window on streaming ...

2017-02-13 Thread haohui
Github user haohui commented on a diff in the pull request:

https://github.com/apache/flink/pull/3252#discussion_r100917108
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
 ---
@@ -290,6 +291,15 @@ object FunctionGenerator {
 Seq(),
 new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true))
 
+  // Make ROWTIME() return the local timestamp
+  // The function has to be executable as in windowed queries it is used
+  // in the GroupBy expression. The results of the function, however, does
+  // not matter.
+  addSqlFunction(
+EventTimeExtractor,
+Seq(),
+new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true))
--- End diff --

It turns out that the function is used all the way in runtime -- the 
translated plan looks like the following:

```
LogicalAggregate(group={0}, ...)
  LogicalProject($0=FLOOR(ROWTIME() TO HOURS)))
```

The expression is used in the projection. Unfortunately there is no trivial 
way to exclude it in Calcite as mentioned in the last comments.

The results of expression is not used in the query though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3252: [FLINK-5624] Support tumbling window on streaming ...

2017-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3252#discussion_r100845516
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/streaming.scala
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes, 
SqlTypeName}
+import org.apache.calcite.sql.validate.SqlMonotonicity
+import org.apache.calcite.sql._
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.TableException
+
+object EventTimeExtractor extends SqlFunction("ROWTIME", 
SqlKind.OTHER_FUNCTION,
+  ReturnTypes.explicit(SqlTypeName.TIMESTAMP), null, OperandTypes.NILADIC,
+  SqlFunctionCategory.SYSTEM) {
+  override def getSyntax: SqlSyntax = SqlSyntax.FUNCTION
+
+  override def getMonotonicity(call: SqlOperatorBinding): SqlMonotonicity =
+SqlMonotonicity.INCREASING
+}
+
+case class RowTime() extends LeafExpression {
--- End diff --

Use `CurrentTimestamp` like PR #3271


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3252: [FLINK-5624] Support tumbling window on streaming ...

2017-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3252#discussion_r100844487
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/streaming.scala
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes, 
SqlTypeName}
+import org.apache.calcite.sql.validate.SqlMonotonicity
+import org.apache.calcite.sql._
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, 
TypeInformation}
+import org.apache.flink.table.api.TableException
+
+object EventTimeExtractor extends SqlFunction("ROWTIME", 
SqlKind.OTHER_FUNCTION,
--- End diff --

Can you move this to a `TimeModeIndicatorFunctions` class as suggested on 
`FlinkStreamingFunctionCatalog` of PR #3271. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3252: [FLINK-5624] Support tumbling window on streaming ...

2017-02-13 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/3252#discussion_r100837291
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala
 ---
@@ -290,6 +291,15 @@ object FunctionGenerator {
 Seq(),
 new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true))
 
+  // Make ROWTIME() return the local timestamp
+  // The function has to be executable as in windowed queries it is used
+  // in the GroupBy expression. The results of the function, however, does
+  // not matter.
+  addSqlFunction(
+EventTimeExtractor,
+Seq(),
+new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true))
--- End diff --

This function should not be called. So, I would suggest to create a 
`CallGenerator` that throws an exception, if possible when the code is 
generated, alternatively in the generated code. PR #3271 will need the same 
call generator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3252: [FLINK-5624] Support tumbling window on streaming ...

2017-02-01 Thread haohui
GitHub user haohui opened a pull request:

https://github.com/apache/flink/pull/3252

[FLINK-5624] Support tumbling window on streaming tables in the SQL API.

This is a POC to add tumbling window support for streaming tables in SQL.

Essentially it recognizes the `LogicalWindow` construct in Calcite and 
transform it to the `LogicalWindowAggregate` in flink.

Feedbacks are highly appreciated.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/haohui/flink FLINK-5624

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3252.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3252


commit a8d4b5042e8bcd1b149f8915118c116e419690e0
Author: Haohui Mai 
Date:   2017-02-01T22:03:44Z

[FLINK-5624] Support tumbling window on streaming tables in the SQL API.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---