[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2562


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-12 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r83009444
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/PropertyRead.scala
 ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.streaming.api.windowing.windows.Window
+
+/**
+  * Base class for reading a window property. The property will be 
extracted once and
+  * can be read multiple times.
+  */
+trait PropertyRead[T] extends Serializable {
+
+  def extract(window: Window): Unit
--- End diff --

Yes, every aggregation should only happen once. We should definitely do 
that. Btw. we can also get rid of `AvgAggregate` once 
`AggregateReduceFunctionsRule` is enabled. So there are many open issues with 
the current aggregate implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-12 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r83008188
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowIntervalTypeInfo.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.typeutils
+
+import java.util.Objects
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.base.{LongComparator, 
LongSerializer}
+import org.apache.flink.api.common.typeutils.{TypeComparator, 
TypeSerializer}
+import 
org.apache.flink.api.table.typeutils.RowIntervalTypeInfo.instantiateComparator
+import org.apache.flink.util.Preconditions._
+
+/**
+  * TypeInformation for row intervals.
+  */
+@SerialVersionUID(-1306179424364925258L)
+class RowIntervalTypeInfo[T](
--- End diff --

That's a good idea. I will do that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-12 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r83000595
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/windows/Session.java
 ---
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.windows;
+
+import org.apache.flink.api.table.SessionWindow;
+
+/**
+ * Helper class for creating a session window. Session windows are ideal 
for cases where the
+ * window boundaries need to adjust to the incoming data. In a session 
window it is possible to
+ * have windows that start at individual points in time for each key and 
that end once there has
+ * been a certain period of inactivity.
+ */
+public class Session {
--- End diff --

I see, that makes sense. 

I think we should we convert the Java classes into Scala classes because 
they are in the Scala source folder. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-12 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82999720
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowIntervalTypeInfo.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.typeutils
+
+import java.util.Objects
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.base.{LongComparator, 
LongSerializer}
+import org.apache.flink.api.common.typeutils.{TypeComparator, 
TypeSerializer}
+import 
org.apache.flink.api.table.typeutils.RowIntervalTypeInfo.instantiateComparator
+import org.apache.flink.util.Preconditions._
+
+/**
+  * TypeInformation for row intervals.
+  */
+@SerialVersionUID(-1306179424364925258L)
+class RowIntervalTypeInfo[T](
--- End diff --

OK, I understand the issue with the instance checks (although I would think 
these checks are not correct if they fail in such a cases). 

However, providing a full implementation for each internal type does not 
seem right. How about we create a special abstract TypeInfo for types that are 
not required at execution time and implement all irrelevant methods (arity, 
serializer, comparator, etc.) with `UnsupportedOperationException`. 
`RowIntervalTypeInfo` and `TimeIntervalTypeInfo` would extend that and just 
provide the type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-12 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82993938
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/PropertyRead.scala
 ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.streaming.api.windowing.windows.Window
+
+/**
+  * Base class for reading a window property. The property will be 
extracted once and
+  * can be read multiple times.
+  */
+trait PropertyRead[T] extends Serializable {
+
+  def extract(window: Window): Unit
--- End diff --

In general I agree to your solution, but right now we support to have the 
same operation multiple times in a query. E.g. 

```
.window(Session withGap 3.milli on 'rowtime as 'w)
.select('string, 'w.end, 'w.end)
```

Your code would fail. Let's leave the aggregation as it is for now and 
rework it again later. The aggregations have to be reworked anyway for 
efficiency.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-12 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82991170
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
 ---
@@ -23,6 +23,8 @@ import org.apache.flink.api.table.Row
 import java.math.BigDecimal
 import java.math.BigInteger
 
+import org.apache.flink.streaming.api.windowing.windows.Window
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-12 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82990475
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
 ---
@@ -27,48 +27,65 @@ import scala.collection.mutable.ListBuffer
 object RexNodeTranslator {
--- End diff --

Good point. I renamed it to `ProjectionTranslator`.
Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-12 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82990268
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/properties.scala
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
+import org.apache.flink.api.table.FlinkRelBuilder.NamedProperty
+import org.apache.flink.api.table.validate.{ValidationFailure, 
ValidationSuccess}
+
+abstract class Property(child: Expression) extends UnaryExpression {
+
+  override def toString = s"Property($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode =
+throw new UnsupportedOperationException("Property cannot be 
transformed to RexNode.")
+
+  override private[flink] def validateInput() =
+if (child.isInstanceOf[WindowReference]) {
--- End diff --

I just tried to keep the names short. Because the Scala line lengths are 
pretty strict.
Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-12 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82989243
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/windows/Session.java
 ---
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.windows;
+
+import org.apache.flink.api.table.SessionWindow;
+
+/**
+ * Helper class for creating a session window. Session windows are ideal 
for cases where the
+ * window boundaries need to adjust to the incoming data. In a session 
window it is possible to
+ * have windows that start at individual points in time for each key and 
that end once there has
+ * been a certain period of inactivity.
+ */
+public class Session {
--- End diff --

Another reason was that for a Scala user it would be an additional import. 
By now, `import org.apache.flink.api.scala.table._` is enough for using the 
Table API. If I move it in the general table package, the user has to import it 
and cannot use the fluent API anymore.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-12 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82988652
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/windows/Session.java
 ---
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.windows;
+
+import org.apache.flink.api.table.SessionWindow;
+
+/**
+ * Helper class for creating a session window. Session windows are ideal 
for cases where the
+ * window boundaries need to adjust to the incoming data. In a session 
window it is possible to
+ * have windows that start at individual points in time for each key and 
that end once there has
+ * been a certain period of inactivity.
+ */
+public class Session {
--- End diff --

During the development I had the problem that I had to use 
`Session$.MODULE$.withGap(10)`. See [1]. But now it is not an companion object 
anymore, so it works. I will change that.

[1] 
http://stackoverflow.com/questions/3282653/how-do-you-call-a-scala-singleton-method-from-java


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-12 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82884874
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala
 ---
@@ -18,12 +18,15 @@
 package org.apache.flink.api.table.expressions
 
 import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes, 
SqlTypeName}
--- End diff --

Clean up imports


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-12 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82899030
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala
 ---
@@ -19,6 +19,7 @@ package org.apache.flink.api.table.runtime.aggregate
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.table.Row
+import org.apache.flink.streaming.api.windowing.windows.Window
--- End diff --

Remove change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-12 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82964364
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowIntervalTypeInfo.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.typeutils
+
+import java.util.Objects
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.base.{LongComparator, 
LongSerializer}
+import org.apache.flink.api.common.typeutils.{TypeComparator, 
TypeSerializer}
+import 
org.apache.flink.api.table.typeutils.RowIntervalTypeInfo.instantiateComparator
+import org.apache.flink.util.Preconditions._
+
+/**
+  * TypeInformation for row intervals.
+  */
+@SerialVersionUID(-1306179424364925258L)
+class RowIntervalTypeInfo[T](
--- End diff --

Can we define this class as 
```
class RowIntervalTypeInfo extends IntegerTypeInfo[java.lang.Long](
  classOf[java.lang.Long],
  Array[Class[_]](classOf[Float], classOf[Double], classOf[Character]),
  LongSerializer.INSTANCE,
  classOf[LongComparator]) {}

object RowIntervalTypeInfo {
  val INTERVAL_ROWS = new RowIntervalTypeInfo()
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-12 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82901298
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowITCase.scala
 ---
@@ -0,0 +1,873 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import 
org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark
+import org.apache.flink.api.scala.stream.utils.StreamITCase
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.{Row, _}
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Assert._
+import org.junit.{Ignore, Test}
+
+import scala.collection.mutable
+
+class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
--- End diff --

#2595 has been merged. Please convert into lightweight unit tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-12 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82899800
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala
 ---
@@ -23,6 +23,8 @@ import org.apache.flink.api.table.Row
 import java.math.BigDecimal
 import java.math.BigInteger
 
+import org.apache.flink.streaming.api.windowing.windows.Window
--- End diff --

Please undo the reformatting changes and unnecessary import on this file. 
Please check the other `Aggregate` implementations as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-12 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82964905
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowIntervalTypeInfo.scala
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.typeutils
+
+import java.util.Objects
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
+import org.apache.flink.api.common.typeutils.base.{LongComparator, 
LongSerializer}
+import org.apache.flink.api.common.typeutils.{TypeComparator, 
TypeSerializer}
+import 
org.apache.flink.api.table.typeutils.RowIntervalTypeInfo.instantiateComparator
+import org.apache.flink.util.Preconditions._
+
+/**
+  * TypeInformation for row intervals.
+  */
+@SerialVersionUID(-1306179424364925258L)
+class RowIntervalTypeInfo[T](
--- End diff --

I know why we need this class, but IMO it doesn't look like a clean design 
to require a `TypeInformation` for things that will never be serialized or 
compared during execution. Did we consider using Calcite's type system during 
validation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-12 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82971868
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/PropertyCollector.scala
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.api.table.Row
+import org.apache.flink.util.Collector
+
+/**
+  * Adds properties to the end of a row before it emits it to the final 
collector.
+  * The collector assumes that the row has placeholders at the end that 
can be filled.
+  */
+class PropertyCollector(properties: Array[PropertyRead[_ <: Any]]) extends 
Collector[Row] {
--- End diff --

Please see suggestion on `PropertyRead`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-12 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82886893
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/properties.scala
 ---
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
+import org.apache.flink.api.table.FlinkRelBuilder.NamedProperty
+import org.apache.flink.api.table.validate.{ValidationFailure, 
ValidationSuccess}
+
+abstract class Property(child: Expression) extends UnaryExpression {
+
+  override def toString = s"Property($child)"
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode =
+throw new UnsupportedOperationException("Property cannot be 
transformed to RexNode.")
+
+  override private[flink] def validateInput() =
+if (child.isInstanceOf[WindowReference]) {
--- End diff --

validation requires window reference child. Rename `Property` to 
`WindowProperty`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-12 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82880181
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/windows/Session.java
 ---
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.table.windows;
+
+import org.apache.flink.api.table.SessionWindow;
+
+/**
+ * Helper class for creating a session window. Session windows are ideal 
for cases where the
+ * window boundaries need to adjust to the incoming data. In a session 
window it is possible to
+ * have windows that start at individual points in time for each key and 
that end once there has
+ * been a certain period of inactivity.
+ */
+public class Session {
--- End diff --

Why dedicated Java classes for the Java API windows and not additional 
methods in the Scala API windows (same as in table.scala)? Might lead to mixed 
up imports because Java and Scala classes have the same names.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-12 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82968570
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TimeIntervalTypeInfo.scala
 ---
@@ -24,14 +24,14 @@ import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.common.typeutils.base.{IntComparator, 
IntSerializer, LongComparator, LongSerializer}
 import org.apache.flink.api.common.typeutils.{TypeComparator, 
TypeSerializer}
-import 
org.apache.flink.api.table.typeutils.IntervalTypeInfo.instantiateComparator
+import 
org.apache.flink.api.table.typeutils.TimeIntervalTypeInfo.instantiateComparator
 import org.apache.flink.util.Preconditions._
 
 /**
   * TypeInformation for SQL INTERVAL types.
   */
 @SerialVersionUID(-1816179424364825258L)
-class IntervalTypeInfo[T](
+class TimeIntervalTypeInfo[T](
--- End diff --

Define `TimeIntervalTypeInfo` as 
```
class TimeIntervalTypeInfo[T](
val clazz: Class[T],
val serializer: TypeSerializer[T],
val comparatorClazz: Class[_ <: TypeComparator[T]])
  extends IntegerTypeInfo[T](
clazz,
Array[Class[_]](classOf[Float], classOf[Double], classOf[Character]),
serializer,
comparatorClazz)

object TimeIntervalTypeInfo {

  val INTERVAL_MONTHS = new TimeIntervalTypeInfo(
classOf[java.lang.Integer],
IntSerializer.INSTANCE,
classOf[IntComparator])

  val INTERVAL_MILLIS = new TimeIntervalTypeInfo(
classOf[java.lang.Long],
LongSerializer.INSTANCE,
classOf[LongComparator])

}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-12 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82971688
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/PropertyRead.scala
 ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime.aggregate
+
+import org.apache.flink.streaming.api.windowing.windows.Window
+
+/**
+  * Base class for reading a window property. The property will be 
extracted once and
+  * can be read multiple times.
+  */
+trait PropertyRead[T] extends Serializable {
+
+  def extract(window: Window): Unit
--- End diff --

I think we do not need a generic interface to extract properties. The 
`extract` method requires a window object but not all properties might be 
present in the window object. So I am not sure how much generalization is 
required here.

I think we can implement a `TimeWindowPropertyCollector` that works as 
follows:
```
class TimeWindowPropertyCollector(start: Int, end: Int) extends 
Collector[Row] {
  var finalCollector: Collector[Row] = _
  var timeWindow: TimeWindow = _

  override def collect(record: Row): Unit = {

if (start > -1) {
  record.setField(start, timeWindow.getStart)
}
if (end > -1) {
  record.setField(end, timeWindow.getEnd)
}

finalCollector.collect(record)
  }

  override def close(): Unit = finalCollector.close()
}
```

If we (ever) need more properties, we can add another wrapping collector or 
change the design. But at this point I think this is sufficient. 

What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-12 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82887623
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
 ---
@@ -27,48 +27,65 @@ import scala.collection.mutable.ListBuffer
 object RexNodeTranslator {
--- End diff --

This class does not deal with RexNodes anymore and could be renamed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82186936
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
 ---
@@ -325,33 +337,42 @@ trait ImplicitExpressionOperations {
 */
   def day = toMilliInterval(expr, MILLIS_PER_DAY)
 
-/**
+  /**
 * Creates an interval of the given number of hours.
 *
 * @return interval of milliseconds
 */
   def hour = toMilliInterval(expr, MILLIS_PER_HOUR)
 
-/**
+  /**
 * Creates an interval of the given number of minutes.
 *
 * @return interval of milliseconds
 */
   def minute = toMilliInterval(expr, MILLIS_PER_MINUTE)
 
-/**
+  /**
 * Creates an interval of the given number of seconds.
 *
 * @return interval of milliseconds
 */
   def second = toMilliInterval(expr, MILLIS_PER_SECOND)
 
-/**
+  /**
 * Creates an interval of the given number of milliseconds.
 *
 * @return interval of milliseconds
 */
   def milli = toMilliInterval(expr, 1)
+
+  // row type
+
+  /**
+* Creates a number defining an amount of rows.
+*
+* @return number of rows
+*/
+  def rows = expr
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82177134
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.{CombineFunction, 
RichGroupReduceFunction}
+import org.apache.flink.api.table.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.{Collector, Preconditions}
+
+import scala.collection.JavaConversions._
+
+
+/**
+ * It wraps the aggregate logic inside of
+ * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and
+ * [[org.apache.flink.api.java.operators.GroupCombineOperator]]
+ *
+ * @param aggregates   The aggregate functions.
+ * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+ * and output Row.
+ * @param aggregateMapping The index mapping between aggregate function 
list and aggregated value
+ * index in output Row.
+ */
+class AggregateReduceCombineFunction(
+private val aggregates: Array[Aggregate[_ <: Any]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val aggregateMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)
+extends RichGroupReduceFunction[Row, Row] with CombineFunction[Row, 
Row] {
+
+  private var aggregateBuffer: Row = _
+  private var output: Row = _
+  private var aggContext: AggContext = _
+
+  /**
+* Sets a new aggregation context used for [[Aggregate.evaluate()]].
+*/
+  def setAggContext(aggContext: AggContext): Unit = {
+this.aggContext = aggContext
+  }
+
+  override def open(config: Configuration): Unit = {
+Preconditions.checkNotNull(aggregates)
+Preconditions.checkNotNull(groupKeysMapping)
+val finalRowLength: Int = groupKeysMapping.length + 
aggregateMapping.length
+aggregateBuffer = new Row(intermediateRowArity)
+output = new Row(finalRowLength)
+aggContext = new AggContext
+  }
+
+  /**
+   * For grouped intermediate aggregate Rows, merge all of them into 
aggregate buffer,
+   * calculate aggregated values output by aggregate buffer, and set them 
into output
+   * Row based on the mapping relation between intermediate aggregate Row 
and output Row.
+   *
+   * @param records  Grouped intermediate aggregate Rows iterator.
+   * @param out The collector to hand results to.
+   *
+   */
+  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
+
+// Initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(aggregateBuffer))
+
+// Merge intermediate aggregate value to buffer.
+var last: Row = null
+records.foreach((record) => {
+  aggregates.foreach(_.merge(record, aggregateBuffer))
+  last = record
+})
+
+// Set group keys value to final output.
+groupKeysMapping.map {
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82177163
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.{CombineFunction, 
RichGroupReduceFunction}
+import org.apache.flink.api.table.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.{Collector, Preconditions}
+
+import scala.collection.JavaConversions._
+
+
+/**
+ * It wraps the aggregate logic inside of
+ * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and
+ * [[org.apache.flink.api.java.operators.GroupCombineOperator]]
+ *
+ * @param aggregates   The aggregate functions.
+ * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+ * and output Row.
+ * @param aggregateMapping The index mapping between aggregate function 
list and aggregated value
+ * index in output Row.
+ */
+class AggregateReduceCombineFunction(
+private val aggregates: Array[Aggregate[_ <: Any]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val aggregateMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)
+extends RichGroupReduceFunction[Row, Row] with CombineFunction[Row, 
Row] {
+
+  private var aggregateBuffer: Row = _
+  private var output: Row = _
+  private var aggContext: AggContext = _
+
+  /**
+* Sets a new aggregation context used for [[Aggregate.evaluate()]].
+*/
+  def setAggContext(aggContext: AggContext): Unit = {
+this.aggContext = aggContext
+  }
+
+  override def open(config: Configuration): Unit = {
+Preconditions.checkNotNull(aggregates)
+Preconditions.checkNotNull(groupKeysMapping)
+val finalRowLength: Int = groupKeysMapping.length + 
aggregateMapping.length
+aggregateBuffer = new Row(intermediateRowArity)
+output = new Row(finalRowLength)
+aggContext = new AggContext
+  }
+
+  /**
+   * For grouped intermediate aggregate Rows, merge all of them into 
aggregate buffer,
+   * calculate aggregated values output by aggregate buffer, and set them 
into output
+   * Row based on the mapping relation between intermediate aggregate Row 
and output Row.
+   *
+   * @param records  Grouped intermediate aggregate Rows iterator.
+   * @param out The collector to hand results to.
+   *
+   */
+  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
+
+// Initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(aggregateBuffer))
+
+// Merge intermediate aggregate value to buffer.
+var last: Row = null
+records.foreach((record) => {
+  aggregates.foreach(_.merge(record, aggregateBuffer))
+  last = record
+})
+
+// Set group keys value to final output.
+groupKeysMapping.map {
+  case (after, previous) =>
+output.setField(after, last.productElement(previous))
+}
+
+// Evaluate final aggregate value and set to output.
+aggregateMapping.map {
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82176556
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
 ---
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.table.expressions.{Expression, Literal}
+import org.apache.flink.api.table.plan.logical._
+import org.apache.flink.api.table.plan.nodes.FlinkAggregate
+import 
org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate.{createKeyedWindowedStream,
 createNonKeyedWindowedStream}
+import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._
+import 
org.apache.flink.api.table.runtime.aggregate.{AggregateAllWindowFunction, 
AggregateUtil, AggregateWindowFunction}
+import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, 
RowTypeInfo, TypeCheckUtils, TypeConverter}
+import org.apache.flink.api.table.{FlinkTypeFactory, Row, 
StreamTableEnvironment}
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream, 
DataStream, KeyedStream, WindowedStream}
+import 
org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, 
WindowFunction}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{Window => 
DataStreamWindow}
+
+import scala.collection.JavaConverters._
+
+class DataStreamAggregate(
+window: LogicalWindow,
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputNode: RelNode,
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+rowRelDataType: RelDataType,
+inputType: RelDataType,
+grouping: Array[Int])
+  extends SingleRel(cluster, traitSet, inputNode)
+  with FlinkAggregate
+  with DataStreamRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamAggregate(
+  window,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  namedAggregates,
+  getRowType,
+  inputType,
+  grouping)
+  }
+
+  override def toString: String = {
+s"Aggregate(${ if (!grouping.isEmpty) {
+  s"groupBy: (${groupingToString(inputType, grouping)}), "
+} else {
+  ""
+}}window: ($window), " +
+  s"select: (${aggregationToString(inputType, grouping, getRowType, 
namedAggregates)}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .itemIf("groupBy", groupingToString(inputType, grouping), 
!grouping.isEmpty)
+  .item("window", window)
+  .item("select", aggregationToString(inputType, grouping, getRowType, 
namedAggregates))
+  }
+
+  override def translateToPlan(
+  tableEnv: StreamTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+val config = tableEnv.getConfig
+
+val groupingKeys = grouping.indices.toArray
+// add grouping fields, position keys in the input, and input type
+val aggregateResult = 
AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates,
+  inputType, getRowType, grouping, config)
+
+val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan

[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82171490
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
 ---
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.table.expressions.{Expression, Literal}
+import org.apache.flink.api.table.plan.logical._
+import org.apache.flink.api.table.plan.nodes.FlinkAggregate
+import 
org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate.{createKeyedWindowedStream,
 createNonKeyedWindowedStream}
+import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._
+import 
org.apache.flink.api.table.runtime.aggregate.{AggregateAllWindowFunction, 
AggregateUtil, AggregateWindowFunction}
+import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, 
RowTypeInfo, TypeCheckUtils, TypeConverter}
+import org.apache.flink.api.table.{FlinkTypeFactory, Row, 
StreamTableEnvironment}
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream, 
DataStream, KeyedStream, WindowedStream}
+import 
org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, 
WindowFunction}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{Window => 
DataStreamWindow}
+
+import scala.collection.JavaConverters._
+
+class DataStreamAggregate(
+window: LogicalWindow,
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputNode: RelNode,
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+rowRelDataType: RelDataType,
+inputType: RelDataType,
+grouping: Array[Int])
+  extends SingleRel(cluster, traitSet, inputNode)
+  with FlinkAggregate
+  with DataStreamRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamAggregate(
+  window,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  namedAggregates,
+  getRowType,
+  inputType,
+  grouping)
+  }
+
+  override def toString: String = {
+s"Aggregate(${ if (!grouping.isEmpty) {
+  s"groupBy: (${groupingToString(inputType, grouping)}), "
+} else {
+  ""
+}}window: ($window), " +
+  s"select: (${aggregationToString(inputType, grouping, getRowType, 
namedAggregates)}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .itemIf("groupBy", groupingToString(inputType, grouping), 
!grouping.isEmpty)
+  .item("window", window)
+  .item("select", aggregationToString(inputType, grouping, getRowType, 
namedAggregates))
+  }
+
+  override def translateToPlan(
+  tableEnv: StreamTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+val config = tableEnv.getConfig
+
+val groupingKeys = grouping.indices.toArray
+// add grouping fields, position keys in the input, and input type
+val aggregateResult = 
AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates,
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear 

[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82170972
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala
 ---
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table
+
+import org.apache.flink.api.table.expressions.{Expression, 
ExpressionParser}
+import org.apache.flink.api.table.plan.logical._
+
+/**
+  * Group-window specification. Group-windows allow aggregates which are 
computed for a group of
+  * elements. A (time or row-count) window is required to bound the 
infinite input stream into a
+  * finite group. Group-windows are evaluated once per group.
+  */
+trait GroupWindow {
+
+  /**
+* Converts an API class to a logical window for planning. This is an 
internal method.
+*/
+  private[flink] def toLogicalWindow: LogicalWindow
+}
+
+/**
+  * A group-window operating on event-time.
+  *
+  * @param timeField defines the time mode for streaming tables and acts 
as a time attribute for
+  *  batch tables over which the query is evaluated.
+  */
+abstract class EventTimeWindow(timeField: Expression) extends GroupWindow {
+
+  protected var name: Option[Expression] = None
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to in order
+* to access window properties such as window start or end time.
+*
+* @param alias alias for this window
+* @return this window
+*/
+  def as(alias: Expression): EventTimeWindow = {
+this.name = Some(alias)
+this
+  }
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to in order
+* to access window properties such as window start or end time.
+*
+* @param alias alias for this window
+* @return this window
+*/
+  def as(alias: String): EventTimeWindow = 
as(ExpressionParser.parseExpression(alias))
+}
+
+// 

+// Tumbling group-windows
+// 

+
+/**
+  * Tumbling group-window. By default, it works on processing-time.
+  *
+  * @param size size of the window either as number of rows or interval of 
milliseconds
+  */
+class TumblingWindow(size: Expression) extends GroupWindow {
+
+  /**
+* Tumbling group-window. By default, it works on processing-time.
+*
+* @param size size of the window either as number of rows or interval 
of milliseconds
+*/
+  def this(size: String) = this(ExpressionParser.parseExpression(size))
+
+  private var alias: Option[Expression] = None
+
+  /**
+* Defines the time mode for streaming tables and specifies a time 
attribute for
+* batch tables over which the query is evaluated.
+*
+* @param timeField time mode for streaming tables and time attribute 
for batch tables
+* @return a tumbling group-window on event-time
+*/
+  def on(timeField: Expression): TumblingEventTimeWindow =
+new TumblingEventTimeWindow(alias, timeField, size)
+
+  /**
+* Defines the time mode for streaming tables and specifies a time 
attribute for
+* batch tables over which the query is evaluated.
+*
+* @param timeField time mode for streaming tables and time attribute 
for batch tables
+* @return a tumbling group-window on event-time
+*/
+  def on(timeField: String): TumblingEventTimeWindow =
+on(ExpressionParser.parseExpression(timeField))
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to in order
+* to access window properties suc

[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82170078
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala
 ---
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table
+
+import org.apache.flink.api.table.expressions.{Expression, 
ExpressionParser}
+import org.apache.flink.api.table.plan.logical._
+
+/**
+  * Group-window specification. Group-windows allow aggregates which are 
computed for a group of
+  * elements. A (time or row-count) window is required to bound the 
infinite input stream into a
+  * finite group. Group-windows are evaluated once per group.
+  */
+trait GroupWindow {
+
+  /**
+* Converts an API class to a logical window for planning. This is an 
internal method.
+*/
+  private[flink] def toLogicalWindow: LogicalWindow
+}
+
+/**
+  * A group-window operating on event-time.
+  *
+  * @param timeField defines the time mode for streaming tables and acts 
as a time attribute for
+  *  batch tables over which the query is evaluated.
+  */
+abstract class EventTimeWindow(timeField: Expression) extends GroupWindow {
+
+  protected var name: Option[Expression] = None
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to in order
+* to access window properties such as window start or end time.
+*
+* @param alias alias for this window
+* @return this window
+*/
+  def as(alias: Expression): EventTimeWindow = {
+this.name = Some(alias)
+this
+  }
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to in order
+* to access window properties such as window start or end time.
+*
+* @param alias alias for this window
+* @return this window
+*/
+  def as(alias: String): EventTimeWindow = 
as(ExpressionParser.parseExpression(alias))
+}
+
+// 

+// Tumbling group-windows
+// 

+
+/**
+  * Tumbling group-window. By default, it works on processing-time.
+  *
+  * @param size size of the window either as number of rows or interval of 
milliseconds
+  */
+class TumblingWindow(size: Expression) extends GroupWindow {
+
+  /**
+* Tumbling group-window. By default, it works on processing-time.
+*
+* @param size size of the window either as number of rows or interval 
of milliseconds
+*/
+  def this(size: String) = this(ExpressionParser.parseExpression(size))
+
+  private var alias: Option[Expression] = None
+
+  /**
+* Defines the time mode for streaming tables and specifies a time 
attribute for
+* batch tables over which the query is evaluated.
+*
+* @param timeField time mode for streaming tables and time attribute 
for batch tables
+* @return a tumbling group-window on event-time
+*/
+  def on(timeField: Expression): TumblingEventTimeWindow =
+new TumblingEventTimeWindow(alias, timeField, size)
+
+  /**
+* Defines the time mode for streaming tables and specifies a time 
attribute for
+* batch tables over which the query is evaluated.
+*
+* @param timeField time mode for streaming tables and time attribute 
for batch tables
+* @return a tumbling group-window on event-time
+*/
+  def on(timeField: String): TumblingEventTimeWindow =
+on(ExpressionParser.parseExpression(timeField))
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to in order
+* to access window properties suc

[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-06 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82147739
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
 ---
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.table.expressions.{Expression, Literal}
+import org.apache.flink.api.table.plan.logical._
+import org.apache.flink.api.table.plan.nodes.FlinkAggregate
+import 
org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate.{createKeyedWindowedStream,
 createNonKeyedWindowedStream}
+import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._
+import 
org.apache.flink.api.table.runtime.aggregate.{AggregateAllWindowFunction, 
AggregateUtil, AggregateWindowFunction}
+import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, 
RowTypeInfo, TypeCheckUtils, TypeConverter}
+import org.apache.flink.api.table.{FlinkTypeFactory, Row, 
StreamTableEnvironment}
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream, 
DataStream, KeyedStream, WindowedStream}
+import 
org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, 
WindowFunction}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{Window => 
DataStreamWindow}
+
+import scala.collection.JavaConverters._
+
+class DataStreamAggregate(
+window: LogicalWindow,
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputNode: RelNode,
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+rowRelDataType: RelDataType,
+inputType: RelDataType,
+grouping: Array[Int])
+  extends SingleRel(cluster, traitSet, inputNode)
+  with FlinkAggregate
+  with DataStreamRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamAggregate(
+  window,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  namedAggregates,
+  getRowType,
+  inputType,
+  grouping)
+  }
+
+  override def toString: String = {
+s"Aggregate(${ if (!grouping.isEmpty) {
+  s"groupBy: (${groupingToString(inputType, grouping)}), "
+} else {
+  ""
+}}window: ($window), " +
+  s"select: (${aggregationToString(inputType, grouping, getRowType, 
namedAggregates)}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .itemIf("groupBy", groupingToString(inputType, grouping), 
!grouping.isEmpty)
+  .item("window", window)
+  .item("select", aggregationToString(inputType, grouping, getRowType, 
namedAggregates))
+  }
+
+  override def translateToPlan(
+  tableEnv: StreamTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+val config = tableEnv.getConfig
+
+val groupingKeys = grouping.indices.toArray
+// add grouping fields, position keys in the input, and input type
+val aggregateResult = 
AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates,
+  inputType, getRowType, grouping, config)
+
+val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan

[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-06 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82151066
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.{CombineFunction, 
RichGroupReduceFunction}
+import org.apache.flink.api.table.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.{Collector, Preconditions}
+
+import scala.collection.JavaConversions._
+
+
+/**
+ * It wraps the aggregate logic inside of
+ * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and
+ * [[org.apache.flink.api.java.operators.GroupCombineOperator]]
+ *
+ * @param aggregates   The aggregate functions.
+ * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+ * and output Row.
+ * @param aggregateMapping The index mapping between aggregate function 
list and aggregated value
+ * index in output Row.
+ */
+class AggregateReduceCombineFunction(
+private val aggregates: Array[Aggregate[_ <: Any]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val aggregateMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)
+extends RichGroupReduceFunction[Row, Row] with CombineFunction[Row, 
Row] {
+
+  private var aggregateBuffer: Row = _
+  private var output: Row = _
+  private var aggContext: AggContext = _
+
+  /**
+* Sets a new aggregation context used for [[Aggregate.evaluate()]].
+*/
+  def setAggContext(aggContext: AggContext): Unit = {
+this.aggContext = aggContext
+  }
+
+  override def open(config: Configuration): Unit = {
+Preconditions.checkNotNull(aggregates)
+Preconditions.checkNotNull(groupKeysMapping)
+val finalRowLength: Int = groupKeysMapping.length + 
aggregateMapping.length
+aggregateBuffer = new Row(intermediateRowArity)
+output = new Row(finalRowLength)
+aggContext = new AggContext
+  }
+
+  /**
+   * For grouped intermediate aggregate Rows, merge all of them into 
aggregate buffer,
+   * calculate aggregated values output by aggregate buffer, and set them 
into output
+   * Row based on the mapping relation between intermediate aggregate Row 
and output Row.
+   *
+   * @param records  Grouped intermediate aggregate Rows iterator.
+   * @param out The collector to hand results to.
+   *
+   */
+  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
+
+// Initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(aggregateBuffer))
+
+// Merge intermediate aggregate value to buffer.
+var last: Row = null
+records.foreach((record) => {
+  aggregates.foreach(_.merge(record, aggregateBuffer))
+  last = record
+})
+
+// Set group keys value to final output.
+groupKeysMapping.map {
+  case (after, previous) =>
+output.setField(after, last.productElement(previous))
+}
+
+// Evaluate final aggregate value and set to output.
+aggregateMapping.map {
--- End diff --

use `foreach` instead of `map` because `setField` returns `Unit`.
(can be fixed in `AggregateReduceGroupFunction` as well)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA tic

[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-06 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82144274
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
 ---
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.table.expressions.{Expression, Literal}
+import org.apache.flink.api.table.plan.logical._
+import org.apache.flink.api.table.plan.nodes.FlinkAggregate
+import 
org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate.{createKeyedWindowedStream,
 createNonKeyedWindowedStream}
+import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._
+import 
org.apache.flink.api.table.runtime.aggregate.{AggregateAllWindowFunction, 
AggregateUtil, AggregateWindowFunction}
+import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, 
RowTypeInfo, TypeCheckUtils, TypeConverter}
+import org.apache.flink.api.table.{FlinkTypeFactory, Row, 
StreamTableEnvironment}
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream, 
DataStream, KeyedStream, WindowedStream}
+import 
org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, 
WindowFunction}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{Window => 
DataStreamWindow}
+
+import scala.collection.JavaConverters._
+
+class DataStreamAggregate(
+window: LogicalWindow,
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputNode: RelNode,
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+rowRelDataType: RelDataType,
+inputType: RelDataType,
+grouping: Array[Int])
+  extends SingleRel(cluster, traitSet, inputNode)
+  with FlinkAggregate
+  with DataStreamRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamAggregate(
+  window,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  namedAggregates,
+  getRowType,
+  inputType,
+  grouping)
+  }
+
+  override def toString: String = {
+s"Aggregate(${ if (!grouping.isEmpty) {
+  s"groupBy: (${groupingToString(inputType, grouping)}), "
+} else {
+  ""
+}}window: ($window), " +
+  s"select: (${aggregationToString(inputType, grouping, getRowType, 
namedAggregates)}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .itemIf("groupBy", groupingToString(inputType, grouping), 
!grouping.isEmpty)
+  .item("window", window)
+  .item("select", aggregationToString(inputType, grouping, getRowType, 
namedAggregates))
+  }
+
+  override def translateToPlan(
+  tableEnv: StreamTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+val config = tableEnv.getConfig
+
+val groupingKeys = grouping.indices.toArray
+// add grouping fields, position keys in the input, and input type
+val aggregateResult = 
AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates,
--- End diff --

Style: break argument list into line-wise args.


---
If your project is set up for it, you can reply

[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-06 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82152829
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
 ---
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.table.expressions.{Expression, Literal}
+import org.apache.flink.api.table.plan.logical._
+import org.apache.flink.api.table.plan.nodes.FlinkAggregate
+import 
org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate.{createKeyedWindowedStream,
 createNonKeyedWindowedStream}
+import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._
+import 
org.apache.flink.api.table.runtime.aggregate.{AggregateAllWindowFunction, 
AggregateUtil, AggregateWindowFunction}
+import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, 
RowTypeInfo, TypeCheckUtils, TypeConverter}
+import org.apache.flink.api.table.{FlinkTypeFactory, Row, 
StreamTableEnvironment}
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream, 
DataStream, KeyedStream, WindowedStream}
+import 
org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, 
WindowFunction}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{Window => 
DataStreamWindow}
+
+import scala.collection.JavaConverters._
+
+class DataStreamAggregate(
+window: LogicalWindow,
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputNode: RelNode,
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+rowRelDataType: RelDataType,
+inputType: RelDataType,
+grouping: Array[Int])
+  extends SingleRel(cluster, traitSet, inputNode)
+  with FlinkAggregate
+  with DataStreamRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamAggregate(
+  window,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  namedAggregates,
+  getRowType,
+  inputType,
+  grouping)
+  }
+
+  override def toString: String = {
+s"Aggregate(${ if (!grouping.isEmpty) {
+  s"groupBy: (${groupingToString(inputType, grouping)}), "
+} else {
+  ""
+}}window: ($window), " +
+  s"select: (${aggregationToString(inputType, grouping, getRowType, 
namedAggregates)}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .itemIf("groupBy", groupingToString(inputType, grouping), 
!grouping.isEmpty)
+  .item("window", window)
+  .item("select", aggregationToString(inputType, grouping, getRowType, 
namedAggregates))
+  }
+
+  override def translateToPlan(
+  tableEnv: StreamTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+val config = tableEnv.getConfig
+
+val groupingKeys = grouping.indices.toArray
+// add grouping fields, position keys in the input, and input type
+val aggregateResult = 
AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates,
+  inputType, getRowType, grouping, config)
+
+val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan

[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-06 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82147669
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
 ---
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.table.expressions.{Expression, Literal}
+import org.apache.flink.api.table.plan.logical._
+import org.apache.flink.api.table.plan.nodes.FlinkAggregate
+import 
org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate.{createKeyedWindowedStream,
 createNonKeyedWindowedStream}
+import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._
+import 
org.apache.flink.api.table.runtime.aggregate.{AggregateAllWindowFunction, 
AggregateUtil, AggregateWindowFunction}
+import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, 
RowTypeInfo, TypeCheckUtils, TypeConverter}
+import org.apache.flink.api.table.{FlinkTypeFactory, Row, 
StreamTableEnvironment}
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream, 
DataStream, KeyedStream, WindowedStream}
+import 
org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, 
WindowFunction}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{Window => 
DataStreamWindow}
+
+import scala.collection.JavaConverters._
+
+class DataStreamAggregate(
+window: LogicalWindow,
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputNode: RelNode,
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+rowRelDataType: RelDataType,
+inputType: RelDataType,
+grouping: Array[Int])
+  extends SingleRel(cluster, traitSet, inputNode)
+  with FlinkAggregate
+  with DataStreamRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamAggregate(
+  window,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  namedAggregates,
+  getRowType,
+  inputType,
+  grouping)
+  }
+
+  override def toString: String = {
+s"Aggregate(${ if (!grouping.isEmpty) {
+  s"groupBy: (${groupingToString(inputType, grouping)}), "
+} else {
+  ""
+}}window: ($window), " +
+  s"select: (${aggregationToString(inputType, grouping, getRowType, 
namedAggregates)}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .itemIf("groupBy", groupingToString(inputType, grouping), 
!grouping.isEmpty)
+  .item("window", window)
+  .item("select", aggregationToString(inputType, grouping, getRowType, 
namedAggregates))
+  }
+
+  override def translateToPlan(
+  tableEnv: StreamTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+val config = tableEnv.getConfig
+
+val groupingKeys = grouping.indices.toArray
+// add grouping fields, position keys in the input, and input type
+val aggregateResult = 
AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates,
+  inputType, getRowType, grouping, config)
+
+val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan

[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-06 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82147584
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
 ---
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.table.expressions.{Expression, Literal}
+import org.apache.flink.api.table.plan.logical._
+import org.apache.flink.api.table.plan.nodes.FlinkAggregate
+import 
org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate.{createKeyedWindowedStream,
 createNonKeyedWindowedStream}
+import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._
+import 
org.apache.flink.api.table.runtime.aggregate.{AggregateAllWindowFunction, 
AggregateUtil, AggregateWindowFunction}
+import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, 
RowTypeInfo, TypeCheckUtils, TypeConverter}
+import org.apache.flink.api.table.{FlinkTypeFactory, Row, 
StreamTableEnvironment}
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream, 
DataStream, KeyedStream, WindowedStream}
+import 
org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, 
WindowFunction}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{Window => 
DataStreamWindow}
+
+import scala.collection.JavaConverters._
+
+class DataStreamAggregate(
+window: LogicalWindow,
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputNode: RelNode,
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+rowRelDataType: RelDataType,
+inputType: RelDataType,
+grouping: Array[Int])
+  extends SingleRel(cluster, traitSet, inputNode)
+  with FlinkAggregate
+  with DataStreamRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamAggregate(
+  window,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  namedAggregates,
+  getRowType,
+  inputType,
+  grouping)
+  }
+
+  override def toString: String = {
+s"Aggregate(${ if (!grouping.isEmpty) {
+  s"groupBy: (${groupingToString(inputType, grouping)}), "
+} else {
+  ""
+}}window: ($window), " +
+  s"select: (${aggregationToString(inputType, grouping, getRowType, 
namedAggregates)}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .itemIf("groupBy", groupingToString(inputType, grouping), 
!grouping.isEmpty)
+  .item("window", window)
+  .item("select", aggregationToString(inputType, grouping, getRowType, 
namedAggregates))
+  }
+
+  override def translateToPlan(
+  tableEnv: StreamTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+val config = tableEnv.getConfig
+
+val groupingKeys = grouping.indices.toArray
+// add grouping fields, position keys in the input, and input type
+val aggregateResult = 
AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates,
+  inputType, getRowType, grouping, config)
+
+val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan

[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-06 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82151050
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.{CombineFunction, 
RichGroupReduceFunction}
+import org.apache.flink.api.table.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.{Collector, Preconditions}
+
+import scala.collection.JavaConversions._
+
+
+/**
+ * It wraps the aggregate logic inside of
+ * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and
+ * [[org.apache.flink.api.java.operators.GroupCombineOperator]]
+ *
+ * @param aggregates   The aggregate functions.
+ * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+ * and output Row.
+ * @param aggregateMapping The index mapping between aggregate function 
list and aggregated value
+ * index in output Row.
+ */
+class AggregateReduceCombineFunction(
+private val aggregates: Array[Aggregate[_ <: Any]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val aggregateMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)
+extends RichGroupReduceFunction[Row, Row] with CombineFunction[Row, 
Row] {
+
+  private var aggregateBuffer: Row = _
+  private var output: Row = _
+  private var aggContext: AggContext = _
+
+  /**
+* Sets a new aggregation context used for [[Aggregate.evaluate()]].
+*/
+  def setAggContext(aggContext: AggContext): Unit = {
+this.aggContext = aggContext
+  }
+
+  override def open(config: Configuration): Unit = {
+Preconditions.checkNotNull(aggregates)
+Preconditions.checkNotNull(groupKeysMapping)
+val finalRowLength: Int = groupKeysMapping.length + 
aggregateMapping.length
+aggregateBuffer = new Row(intermediateRowArity)
+output = new Row(finalRowLength)
+aggContext = new AggContext
+  }
+
+  /**
+   * For grouped intermediate aggregate Rows, merge all of them into 
aggregate buffer,
+   * calculate aggregated values output by aggregate buffer, and set them 
into output
+   * Row based on the mapping relation between intermediate aggregate Row 
and output Row.
+   *
+   * @param records  Grouped intermediate aggregate Rows iterator.
+   * @param out The collector to hand results to.
+   *
+   */
+  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
+
+// Initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(aggregateBuffer))
+
+// Merge intermediate aggregate value to buffer.
+var last: Row = null
+records.foreach((record) => {
+  aggregates.foreach(_.merge(record, aggregateBuffer))
+  last = record
+})
+
+// Set group keys value to final output.
+groupKeysMapping.map {
--- End diff --

use `foreach` instead of `map` because `setField` returns `Unit`. 
(can be fixed in `AggregateReduceGroupFunction` as well)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-06 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82147461
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala
 ---
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes.datastream
+
+import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.AggregateCall
+import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
+import org.apache.flink.api.java.tuple.Tuple
+import org.apache.flink.api.table.expressions.{Expression, Literal}
+import org.apache.flink.api.table.plan.logical._
+import org.apache.flink.api.table.plan.nodes.FlinkAggregate
+import 
org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate.{createKeyedWindowedStream,
 createNonKeyedWindowedStream}
+import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._
+import 
org.apache.flink.api.table.runtime.aggregate.{AggregateAllWindowFunction, 
AggregateUtil, AggregateWindowFunction}
+import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval
+import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, 
RowTypeInfo, TypeCheckUtils, TypeConverter}
+import org.apache.flink.api.table.{FlinkTypeFactory, Row, 
StreamTableEnvironment}
+import org.apache.flink.streaming.api.datastream.{AllWindowedStream, 
DataStream, KeyedStream, WindowedStream}
+import 
org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, 
WindowFunction}
+import org.apache.flink.streaming.api.windowing.assigners._
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.windows.{Window => 
DataStreamWindow}
+
+import scala.collection.JavaConverters._
+
+class DataStreamAggregate(
+window: LogicalWindow,
+cluster: RelOptCluster,
+traitSet: RelTraitSet,
+inputNode: RelNode,
+namedAggregates: Seq[CalcitePair[AggregateCall, String]],
+rowRelDataType: RelDataType,
+inputType: RelDataType,
+grouping: Array[Int])
+  extends SingleRel(cluster, traitSet, inputNode)
+  with FlinkAggregate
+  with DataStreamRel {
+
+  override def deriveRowType() = rowRelDataType
+
+  override def copy(traitSet: RelTraitSet, inputs: 
java.util.List[RelNode]): RelNode = {
+new DataStreamAggregate(
+  window,
+  cluster,
+  traitSet,
+  inputs.get(0),
+  namedAggregates,
+  getRowType,
+  inputType,
+  grouping)
+  }
+
+  override def toString: String = {
+s"Aggregate(${ if (!grouping.isEmpty) {
+  s"groupBy: (${groupingToString(inputType, grouping)}), "
+} else {
+  ""
+}}window: ($window), " +
+  s"select: (${aggregationToString(inputType, grouping, getRowType, 
namedAggregates)}))"
+  }
+
+  override def explainTerms(pw: RelWriter): RelWriter = {
+super.explainTerms(pw)
+  .itemIf("groupBy", groupingToString(inputType, grouping), 
!grouping.isEmpty)
+  .item("window", window)
+  .item("select", aggregationToString(inputType, grouping, getRowType, 
namedAggregates))
+  }
+
+  override def translateToPlan(
+  tableEnv: StreamTableEnvironment,
+  expectedType: Option[TypeInformation[Any]]): DataStream[Any] = {
+val config = tableEnv.getConfig
+
+val groupingKeys = grouping.indices.toArray
+// add grouping fields, position keys in the input, and input type
+val aggregateResult = 
AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates,
+  inputType, getRowType, grouping, config)
+
+val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan

[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-06 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82151146
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala
 ---
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.runtime.aggregate
+
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.{CombineFunction, 
RichGroupReduceFunction}
+import org.apache.flink.api.table.Row
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.windowing.windows.Window
+import org.apache.flink.util.{Collector, Preconditions}
+
+import scala.collection.JavaConversions._
+
+
+/**
+ * It wraps the aggregate logic inside of
+ * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and
+ * [[org.apache.flink.api.java.operators.GroupCombineOperator]]
+ *
+ * @param aggregates   The aggregate functions.
+ * @param groupKeysMapping The index mapping of group keys between 
intermediate aggregate Row
+ * and output Row.
+ * @param aggregateMapping The index mapping between aggregate function 
list and aggregated value
+ * index in output Row.
+ */
+class AggregateReduceCombineFunction(
+private val aggregates: Array[Aggregate[_ <: Any]],
+private val groupKeysMapping: Array[(Int, Int)],
+private val aggregateMapping: Array[(Int, Int)],
+private val intermediateRowArity: Int)
+extends RichGroupReduceFunction[Row, Row] with CombineFunction[Row, 
Row] {
+
+  private var aggregateBuffer: Row = _
+  private var output: Row = _
+  private var aggContext: AggContext = _
+
+  /**
+* Sets a new aggregation context used for [[Aggregate.evaluate()]].
+*/
+  def setAggContext(aggContext: AggContext): Unit = {
+this.aggContext = aggContext
+  }
+
+  override def open(config: Configuration): Unit = {
+Preconditions.checkNotNull(aggregates)
+Preconditions.checkNotNull(groupKeysMapping)
+val finalRowLength: Int = groupKeysMapping.length + 
aggregateMapping.length
+aggregateBuffer = new Row(intermediateRowArity)
+output = new Row(finalRowLength)
+aggContext = new AggContext
+  }
+
+  /**
+   * For grouped intermediate aggregate Rows, merge all of them into 
aggregate buffer,
+   * calculate aggregated values output by aggregate buffer, and set them 
into output
+   * Row based on the mapping relation between intermediate aggregate Row 
and output Row.
+   *
+   * @param records  Grouped intermediate aggregate Rows iterator.
+   * @param out The collector to hand results to.
+   *
+   */
+  override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = 
{
+
+// Initiate intermediate aggregate value.
+aggregates.foreach(_.initiate(aggregateBuffer))
+
+// Merge intermediate aggregate value to buffer.
+var last: Row = null
+records.foreach((record) => {
+  aggregates.foreach(_.merge(record, aggregateBuffer))
+  last = record
+})
+
+// Set group keys value to final output.
+groupKeysMapping.map {
+  case (after, previous) =>
+output.setField(after, last.productElement(previous))
+}
+
+// Evaluate final aggregate value and set to output.
+aggregateMapping.map {
+  case (after, previous) =>
+output.setField(after, 
aggregates(previous).evaluate(aggregateBuffer, aggContext))
+}
+
+out.collect(output)
+  }
+
+  /**
+   * For sub-grouped intermediate aggregate Rows, merge all of them into 
aggregate buffer,
+   *
+   * @param records  Sub-grouped intermediate aggregate Rows iterator.
+   * @return Combined intermediate aggregate Row.
+   *
+

[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82150960
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/groupWindows.scala
 ---
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.logical
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.table.{StreamTableEnvironment, 
TableEnvironment}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, 
TypeCoercion}
+import org.apache.flink.api.table.validate.{ValidationFailure, 
ValidationResult, ValidationSuccess}
+
+abstract class EventTimeGroupWindow(
+name: Option[Expression],
+time: Expression)
+  extends LogicalWindow(name) {
+
+  override def validate(tableEnv: TableEnvironment): ValidationResult = {
+val valid = super.validate(tableEnv)
+if (valid.isFailure) {
+return valid
+}
+
+if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
+  val valid = time match {
+case RowtimeAttribute() =>
+  ValidationSuccess
+case _ =>
+  ValidationFailure("Event-time window expects a 'rowtime' time 
field.")
+  }
+  if (valid.isFailure) {
+return valid
+  }
+}
+if (!TypeCoercion.canCast(time.resultType, 
BasicTypeInfo.LONG_TYPE_INFO)) {
+  ValidationFailure(s"Event-time window expects a time field that can 
be safely cast " +
+s"to Long, but is ${time.resultType}")
+} else {
+  ValidationSuccess
+}
+  }
+}
+
+abstract class ProcessingTimeGroupWindow(name: Option[Expression]) extends 
LogicalWindow(name)
+
+// 

+// Tumbling group windows
+// 

+
+object TumblingGroupWindow {
+  def validate(tableEnv: TableEnvironment, size: Expression): 
ValidationResult = size match {
+case Literal(_, IntervalTypeInfo.INTERVAL_MILLIS) =>
+  ValidationSuccess
+case Literal(_, BasicTypeInfo.LONG_TYPE_INFO) | Literal(_, 
BasicTypeInfo.INT_TYPE_INFO) =>
+  ValidationSuccess
+case _ =>
+  ValidationFailure(
+"Tumbling window expects size literal of type Interval of 
Milliseconds or Long/Integer.")
+  }
+}
+
+case class ProcessingTimeTumblingGroupWindow(
+name: Option[Expression],
+size: Expression)
+  extends ProcessingTimeGroupWindow(name) {
+
+  override def resolveExpressions(resolve: (Expression) => Expression): 
LogicalWindow =
+ProcessingTimeTumblingGroupWindow(
+  name.map(resolve),
+  resolve(size))
+
+  override def validate(tableEnv: TableEnvironment): ValidationResult =
+super.validate(tableEnv).orElse(TumblingGroupWindow.validate(tableEnv, 
size))
+
+  override def toString: String = 
s"ProcessingTimeTumblingGroupWindow($name, $size)"
+}
+
+case class EventTimeTumblingGroupWindow(
+name: Option[Expression],
+timeField: Expression,
+size: Expression)
+  extends EventTimeGroupWindow(
+name,
+timeField) {
+
+  override def resolveExpressions(resolve: (Expression) => Expression): 
LogicalWindow =
+EventTimeTumblingGroupWindow(
+  name.map(resolve),
+  resolve(timeField),
+  resolve(size))
+
+  override def validate(tableEnv: TableEnvironment): ValidationResult =
+super.validate(tableEnv).orElse(TumblingGroupWindow.validate(tableEnv, 
size))
+
+  override def toString: String = s"EventTimeTumblingGroupWindow($name, 
$timeField, $size)"
+}
+
+// 
-

[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82150850
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/groupWindows.scala
 ---
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.logical
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.table.{StreamTableEnvironment, 
TableEnvironment}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, 
TypeCoercion}
+import org.apache.flink.api.table.validate.{ValidationFailure, 
ValidationResult, ValidationSuccess}
+
+abstract class EventTimeGroupWindow(
+name: Option[Expression],
+time: Expression)
+  extends LogicalWindow(name) {
+
+  override def validate(tableEnv: TableEnvironment): ValidationResult = {
+val valid = super.validate(tableEnv)
+if (valid.isFailure) {
+return valid
+}
+
+if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
+  val valid = time match {
+case RowtimeAttribute() =>
+  ValidationSuccess
+case _ =>
+  ValidationFailure("Event-time window expects a 'rowtime' time 
field.")
+  }
+  if (valid.isFailure) {
+return valid
+  }
+}
+if (!TypeCoercion.canCast(time.resultType, 
BasicTypeInfo.LONG_TYPE_INFO)) {
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82149847
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/Resolvable.scala
 ---
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.logical
+
+import org.apache.flink.api.table.expressions.Expression
+
+trait Resolvable[T <: AnyRef] {
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82149863
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/Resolvable.scala
 ---
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.logical
+
+import org.apache.flink.api.table.expressions.Expression
+
+trait Resolvable[T <: AnyRef] {
+
+  def resolveExpressions(resolver: (Expression) => Expression): T
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82146791
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -497,3 +501,95 @@ case class LogicalRelNode(
 
   override def validate(tableEnv: TableEnvironment): LogicalNode = this
 }
+
+case class WindowAggregate(
+groupingExpressions: Seq[Expression],
+window: LogicalWindow,
+aggregateExpressions: Seq[NamedExpression],
+child: LogicalNode)
+  extends UnaryNode {
+
+  override def output: Seq[Attribute] = {
+(groupingExpressions ++ aggregateExpressions) map {
+  case ne: NamedExpression => ne.toAttribute
+  case e => Alias(e, e.toString).toAttribute
+}
+  }
+
+  override def resolveReference(
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82141352
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -55,28 +56,27 @@ case class Project(projectList: Seq[NamedExpression], 
child: LogicalNode) extend
 
   override def validate(tableEnv: TableEnvironment): LogicalNode = {
 val resolvedProject = super.validate(tableEnv).asInstanceOf[Project]
+val names: mutable.Set[String] = mutable.Set()
 
-def checkUniqueNames(exprs: Seq[Expression]): Unit = {
-  val names: mutable.Set[String] = mutable.Set()
-  exprs.foreach {
-case n: Alias =>
-  // explicit name
-  if (names.contains(n.name)) {
-throw ValidationException(s"Duplicate field name ${n.name}.")
-  } else {
-names.add(n.name)
-  }
-case r: ResolvedFieldReference =>
-  // simple field forwarding
-  if (names.contains(r.name)) {
-throw ValidationException(s"Duplicate field name ${r.name}.")
-  } else {
-names.add(r.name)
-  }
-case _ => // Do nothing
+def checkName(name: String): Unit = {
+  if (names.contains(name)) {
+throw ValidationException(s"Duplicate field name $name.")
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82140542
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowITCase.scala
 ---
@@ -0,0 +1,777 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import 
org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark
+import org.apache.flink.api.scala.stream.utils.StreamITCase
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.{Row, _}
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidBatchWindow(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Session withGap 10.rows as 'string)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testInvalidRowtime1(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'rowtime, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Tumble over 50.milli)
+  .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime2(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Tumble over 50.milli)
+  .select('string, 'int.count as 'rowtime)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime3(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table.as('rowtime, 'myint, 'mystring)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime4(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Tumble over 50.milli on 'string)
+  .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidTumblingSize(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+   

[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82133014
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowITCase.scala
 ---
@@ -0,0 +1,777 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import 
org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark
+import org.apache.flink.api.scala.stream.utils.StreamITCase
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.{Row, _}
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidBatchWindow(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Session withGap 10.rows as 'string)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testInvalidRowtime1(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'rowtime, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Tumble over 50.milli)
+  .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime2(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Tumble over 50.milli)
+  .select('string, 'int.count as 'rowtime)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime3(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table.as('rowtime, 'myint, 'mystring)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime4(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Tumble over 50.milli on 'string)
+  .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidTumblingSize(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+   

[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82132367
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowITCase.scala
 ---
@@ -0,0 +1,777 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import 
org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark
+import org.apache.flink.api.scala.stream.utils.StreamITCase
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.{Row, _}
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidBatchWindow(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Session withGap 10.rows as 'string)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testInvalidRowtime1(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'rowtime, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Tumble over 50.milli)
+  .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime2(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Tumble over 50.milli)
+  .select('string, 'int.count as 'rowtime)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime3(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table.as('rowtime, 'myint, 'mystring)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime4(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Tumble over 50.milli on 'string)
+  .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidTumblingSize(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+   

[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82132349
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowITCase.scala
 ---
@@ -0,0 +1,777 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import 
org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark
+import org.apache.flink.api.scala.stream.utils.StreamITCase
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.{Row, _}
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidBatchWindow(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Session withGap 10.rows as 'string)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testInvalidRowtime1(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'rowtime, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Tumble over 50.milli)
+  .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime2(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Tumble over 50.milli)
+  .select('string, 'int.count as 'rowtime)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime3(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table.as('rowtime, 'myint, 'mystring)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime4(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Tumble over 50.milli on 'string)
+  .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidTumblingSize(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+   

[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82131749
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowITCase.scala
 ---
@@ -0,0 +1,777 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import 
org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark
+import org.apache.flink.api.scala.stream.utils.StreamITCase
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.{Row, _}
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidBatchWindow(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-06 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82129916
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
 ---
@@ -325,33 +337,42 @@ trait ImplicitExpressionOperations {
 */
   def day = toMilliInterval(expr, MILLIS_PER_DAY)
 
-/**
+  /**
 * Creates an interval of the given number of hours.
 *
 * @return interval of milliseconds
 */
   def hour = toMilliInterval(expr, MILLIS_PER_HOUR)
 
-/**
+  /**
 * Creates an interval of the given number of minutes.
 *
 * @return interval of milliseconds
 */
   def minute = toMilliInterval(expr, MILLIS_PER_MINUTE)
 
-/**
+  /**
 * Creates an interval of the given number of seconds.
 *
 * @return interval of milliseconds
 */
   def second = toMilliInterval(expr, MILLIS_PER_SECOND)
 
-/**
+  /**
 * Creates an interval of the given number of milliseconds.
 *
 * @return interval of milliseconds
 */
   def milli = toMilliInterval(expr, 1)
+
+  // row type
+
+  /**
+* Creates a number defining an amount of rows.
+*
+* @return number of rows
+*/
+  def rows = expr
--- End diff --

I'm still not sure if we need an additional data type for that. Actually we 
could also just allow `Int` and `Long` and remove the `rows` completely. It is 
just syntactic sugar without additional advantage. It might also be good if we 
would allow constant arithmetic in future, such as `10*10.rows`. But if you 
think a row interval makes sense, I will add it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82009076
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/windowDsl.scala
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table
+
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.table.{SessionWindow, SlideWithSize, 
TumblingWindow}
+
+/**
+  * Helper object for creating a tumbling window. In a tumbling window 
elements are assigned to
+  * fixed length, non-overlapping windows of a specified window size. For 
example, if you specify a
+  * window size of 5 minutes, the following operation will get 5 minutes 
worth of elements in
+  * each invocation.
+  */
+object Tumble {
+
+  /**
+* Creates a tumbling window. In a tumbling window elements are 
assigned to fixed length,
+* non-overlapping windows of a specified window size. For example, if 
you specify a window
+* size of 5 minutes, the following operation will get 5 minutes worth 
of elements in
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82008309
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/windowDsl.scala
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table
+
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.table.{SessionWindow, SlideWithSize, 
TumblingWindow}
+
+/**
+  * Helper object for creating a tumbling window. In a tumbling window 
elements are assigned to
+  * fixed length, non-overlapping windows of a specified window size. For 
example, if you specify a
+  * window size of 5 minutes, the following operation will get 5 minutes 
worth of elements in
+  * each invocation.
+  */
+object Tumble {
+
+  /**
+* Creates a tumbling window. In a tumbling window elements are 
assigned to fixed length,
+* non-overlapping windows of a specified window size. For example, if 
you specify a window
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82006797
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
 ---
@@ -50,6 +50,10 @@ object WordCountTable {
   .filter('frequency === 2)
   .toDataSet[WC]
 
+val w = Slide.over(23).every(23).toLogicalWindow.toString
--- End diff --

Sorry. I haven't seen that. Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82006252
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala
 ---
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table
+
+import org.apache.flink.api.table.expressions.{Expression, 
ExpressionParser}
+import org.apache.flink.api.table.plan.logical._
+
+/**
+  * Group-window specification. Group-windows allow aggregates which are 
computed for a group of
+  * elements. A (time or row-count) window is required to bound the 
infinite input stream into a
+  * finite group. Group-windows are evaluated once per group.
+  */
+trait GroupWindow {
+
+  /**
+* Converts an API class to a logical window for planning. This is an 
internal method.
+*/
+  private[flink] def toLogicalWindow: LogicalWindow
+}
+
+/**
+  * A group-window operating on event-time.
+  *
+  * @param timeField defines the time mode for streaming tables and acts 
as a time attribute for
+  *  batch tables over which the query is evaluated.
+  */
+abstract class EventTimeWindow(timeField: Expression) extends GroupWindow {
+
+  protected var name: Option[Expression] = None
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to in order
+* to access window properties such as window start or end time.
+*
+* @param alias alias for this window
+* @return this window
+*/
+  def as(alias: Expression): EventTimeWindow = {
+this.name = Some(alias)
+this
+  }
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to in order
+* to access window properties such as window start or end time.
+*
+* @param alias alias for this window
+* @return this window
+*/
+  def as(alias: String): EventTimeWindow = 
as(ExpressionParser.parseExpression(alias))
+}
+
+// 

+// Tumbling group-windows
+// 

+
+/**
+  * Tumbling group-window. By default, it works on processing-time.
+  *
+  * @param size size of the window either as number of rows or interval of 
milliseconds
+  */
+class TumblingWindow(size: Expression) extends GroupWindow {
+
+  /**
+* Tumbling group-window. By default, it works on processing-time.
+*
+* @param size size of the window either as number of rows or interval 
of milliseconds
+*/
+  def this(size: String) = this(ExpressionParser.parseExpression(size))
+
+  private var alias: Option[Expression] = None
+
+  /**
+* Defines the time mode for streaming tables and specifies a time 
attribute for
+* batch tables over which the query is evaluated.
+*
+* @param timeField time mode for streaming tables and time attribute 
for batch tables
+* @return a tumbling group-window on event-time
+*/
+  def on(timeField: Expression): TumblingEventTimeWindow =
+new TumblingEventTimeWindow(alias, timeField, size)
+
+  /**
+* Defines the time mode for streaming tables and specifies a time 
attribute for
+* batch tables over which the query is evaluated.
+*
+* @param timeField time mode for streaming tables and time attribute 
for batch tables
+* @return a tumbling group-window on event-time
+*/
+  def on(timeField: String): TumblingEventTimeWindow =
+on(ExpressionParser.parseExpression(timeField))
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to in order
+* to access window properties suc

[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r82002196
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala
 ---
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table
+
+import org.apache.flink.api.table.expressions.{Expression, 
ExpressionParser}
+import org.apache.flink.api.table.plan.logical._
+
+/**
+  * Group-window specification. Group-windows allow aggregates which are 
computed for a group of
+  * elements. A (time or row-count) window is required to bound the 
infinite input stream into a
+  * finite group. Group-windows are evaluated once per group.
+  */
+trait GroupWindow {
+
+  /**
+* Converts an API class to a logical window for planning. This is an 
internal method.
+*/
+  private[flink] def toLogicalWindow: LogicalWindow
+}
+
+/**
+  * A group-window operating on event-time.
+  *
+  * @param timeField defines the time mode for streaming tables and acts 
as a time attribute for
+  *  batch tables over which the query is evaluated.
+  */
+abstract class EventTimeWindow(timeField: Expression) extends GroupWindow {
+
+  protected var name: Option[Expression] = None
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to in order
+* to access window properties such as window start or end time.
+*
+* @param alias alias for this window
+* @return this window
+*/
+  def as(alias: Expression): EventTimeWindow = {
+this.name = Some(alias)
+this
+  }
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to in order
+* to access window properties such as window start or end time.
+*
+* @param alias alias for this window
+* @return this window
+*/
+  def as(alias: String): EventTimeWindow = 
as(ExpressionParser.parseExpression(alias))
+}
+
+// 

+// Tumbling group-windows
+// 

+
+/**
+  * Tumbling group-window. By default, it works on processing-time.
+  *
+  * @param size size of the window either as number of rows or interval of 
milliseconds
+  */
+class TumblingWindow(size: Expression) extends GroupWindow {
+
+  /**
+* Tumbling group-window. By default, it works on processing-time.
+*
+* @param size size of the window either as number of rows or interval 
of milliseconds
+*/
+  def this(size: String) = this(ExpressionParser.parseExpression(size))
+
+  private var alias: Option[Expression] = None
+
+  /**
+* Defines the time mode for streaming tables and specifies a time 
attribute for
+* batch tables over which the query is evaluated.
+*
+* @param timeField time mode for streaming tables and time attribute 
for batch tables
+* @return a tumbling group-window on event-time
+*/
+  def on(timeField: Expression): TumblingEventTimeWindow =
+new TumblingEventTimeWindow(alias, timeField, size)
+
+  /**
+* Defines the time mode for streaming tables and specifies a time 
attribute for
+* batch tables over which the query is evaluated.
+*
+* @param timeField time mode for streaming tables and time attribute 
for batch tables
+* @return a tumbling group-window on event-time
+*/
+  def on(timeField: String): TumblingEventTimeWindow =
+on(ExpressionParser.parseExpression(timeField))
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to in order
+* to access window properties suc

[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r81998716
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala
 ---
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table
+
+import org.apache.flink.api.table.expressions.{Expression, 
ExpressionParser}
+import org.apache.flink.api.table.plan.logical._
+
+/**
+  * Group-window specification. Group-windows allow aggregates which are 
computed for a group of
+  * elements. A (time or row-count) window is required to bound the 
infinite input stream into a
+  * finite group. Group-windows are evaluated once per group.
+  */
+trait GroupWindow {
+
+  /**
+* Converts an API class to a logical window for planning. This is an 
internal method.
+*/
+  private[flink] def toLogicalWindow: LogicalWindow
+}
+
+/**
+  * A group-window operating on event-time.
+  *
+  * @param timeField defines the time mode for streaming tables and acts 
as a time attribute for
+  *  batch tables over which the query is evaluated.
+  */
+abstract class EventTimeWindow(timeField: Expression) extends GroupWindow {
+
+  protected var name: Option[Expression] = None
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to in order
+* to access window properties such as window start or end time.
+*
+* @param alias alias for this window
+* @return this window
+*/
+  def as(alias: Expression): EventTimeWindow = {
+this.name = Some(alias)
+this
+  }
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to in order
+* to access window properties such as window start or end time.
+*
+* @param alias alias for this window
+* @return this window
+*/
+  def as(alias: String): EventTimeWindow = 
as(ExpressionParser.parseExpression(alias))
+}
+
+// 

+// Tumbling group-windows
+// 

+
+/**
+  * Tumbling group-window. By default, it works on processing-time.
+  *
+  * @param size size of the window either as number of rows or interval of 
milliseconds
+  */
+class TumblingWindow(size: Expression) extends GroupWindow {
+
+  /**
+* Tumbling group-window. By default, it works on processing-time.
+*
+* @param size size of the window either as number of rows or interval 
of milliseconds
+*/
+  def this(size: String) = this(ExpressionParser.parseExpression(size))
+
+  private var alias: Option[Expression] = None
+
+  /**
+* Defines the time mode for streaming tables and specifies a time 
attribute for
+* batch tables over which the query is evaluated.
+*
+* @param timeField time mode for streaming tables and time attribute 
for batch tables
+* @return a tumbling group-window on event-time
+*/
+  def on(timeField: Expression): TumblingEventTimeWindow =
+new TumblingEventTimeWindow(alias, timeField, size)
+
+  /**
+* Defines the time mode for streaming tables and specifies a time 
attribute for
+* batch tables over which the query is evaluated.
+*
+* @param timeField time mode for streaming tables and time attribute 
for batch tables
+* @return a tumbling group-window on event-time
+*/
+  def on(timeField: String): TumblingEventTimeWindow =
+on(ExpressionParser.parseExpression(timeField))
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to in order
+* to access window properties suc

[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r81997213
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala
 ---
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table
+
+import org.apache.flink.api.table.expressions.{Expression, 
ExpressionParser}
+import org.apache.flink.api.table.plan.logical._
+
+/**
+  * Group-window specification. Group-windows allow aggregates which are 
computed for a group of
+  * elements. A (time or row-count) window is required to bound the 
infinite input stream into a
+  * finite group. Group-windows are evaluated once per group.
+  */
+trait GroupWindow {
+
+  /**
+* Converts an API class to a logical window for planning. This is an 
internal method.
+*/
+  private[flink] def toLogicalWindow: LogicalWindow
+}
+
+/**
+  * A group-window operating on event-time.
+  *
+  * @param timeField defines the time mode for streaming tables and acts 
as a time attribute for
+  *  batch tables over which the query is evaluated.
+  */
+abstract class EventTimeWindow(timeField: Expression) extends GroupWindow {
+
+  protected var name: Option[Expression] = None
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to in order
+* to access window properties such as window start or end time.
+*
+* @param alias alias for this window
+* @return this window
+*/
+  def as(alias: Expression): EventTimeWindow = {
+this.name = Some(alias)
+this
+  }
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to in order
+* to access window properties such as window start or end time.
+*
+* @param alias alias for this window
+* @return this window
+*/
+  def as(alias: String): EventTimeWindow = 
as(ExpressionParser.parseExpression(alias))
+}
+
+// 

+// Tumbling group-windows
+// 

+
+/**
+  * Tumbling group-window. By default, it works on processing-time.
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r81858699
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowITCase.scala
 ---
@@ -0,0 +1,777 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import 
org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark
+import org.apache.flink.api.scala.stream.utils.StreamITCase
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.{Row, _}
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test(expected = classOf[ValidationException])
--- End diff --

I think it is a good idea to add a brief comment why a test is expected to 
fail


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r81851322
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala
 ---
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table
+
+import org.apache.flink.api.table.expressions.{Expression, 
ExpressionParser}
+import org.apache.flink.api.table.plan.logical._
+
+/**
+  * Group-window specification. Group-windows allow aggregates which are 
computed for a group of
+  * elements. A (time or row-count) window is required to bound the 
infinite input stream into a
+  * finite group. Group-windows are evaluated once per group.
+  */
+trait GroupWindow {
+
+  /**
+* Converts an API class to a logical window for planning. This is an 
internal method.
+*/
+  private[flink] def toLogicalWindow: LogicalWindow
+}
+
+/**
+  * A group-window operating on event-time.
+  *
+  * @param timeField defines the time mode for streaming tables and acts 
as a time attribute for
+  *  batch tables over which the query is evaluated.
+  */
+abstract class EventTimeWindow(timeField: Expression) extends GroupWindow {
+
+  protected var name: Option[Expression] = None
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to in order
+* to access window properties such as window start or end time.
+*
+* @param alias alias for this window
+* @return this window
+*/
+  def as(alias: Expression): EventTimeWindow = {
+this.name = Some(alias)
+this
+  }
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to in order
+* to access window properties such as window start or end time.
+*
+* @param alias alias for this window
+* @return this window
+*/
+  def as(alias: String): EventTimeWindow = 
as(ExpressionParser.parseExpression(alias))
+}
+
+// 

+// Tumbling group-windows
+// 

+
+/**
+  * Tumbling group-window. By default, it works on processing-time.
+  *
+  * @param size size of the window either as number of rows or interval of 
milliseconds
+  */
+class TumblingWindow(size: Expression) extends GroupWindow {
+
+  /**
+* Tumbling group-window. By default, it works on processing-time.
+*
+* @param size size of the window either as number of rows or interval 
of milliseconds
+*/
+  def this(size: String) = this(ExpressionParser.parseExpression(size))
+
+  private var alias: Option[Expression] = None
+
+  /**
+* Defines the time mode for streaming tables and specifies a time 
attribute for
--- End diff --

Use event-time mode for streaming tables by calling with `on('rowtime)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r81862041
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowITCase.scala
 ---
@@ -0,0 +1,777 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import 
org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark
+import org.apache.flink.api.scala.stream.utils.StreamITCase
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.{Row, _}
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidBatchWindow(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Session withGap 10.rows as 'string)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testInvalidRowtime1(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'rowtime, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Tumble over 50.milli)
+  .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime2(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Tumble over 50.milli)
+  .select('string, 'int.count as 'rowtime)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime3(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table.as('rowtime, 'myint, 'mystring)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime4(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Tumble over 50.milli on 'string)
+  .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidTumblingSize(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+   

[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r81857880
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
 ---
@@ -325,33 +337,42 @@ trait ImplicitExpressionOperations {
 */
   def day = toMilliInterval(expr, MILLIS_PER_DAY)
 
-/**
+  /**
 * Creates an interval of the given number of hours.
 *
 * @return interval of milliseconds
 */
   def hour = toMilliInterval(expr, MILLIS_PER_HOUR)
 
-/**
+  /**
 * Creates an interval of the given number of minutes.
 *
 * @return interval of milliseconds
 */
   def minute = toMilliInterval(expr, MILLIS_PER_MINUTE)
 
-/**
+  /**
 * Creates an interval of the given number of seconds.
 *
 * @return interval of milliseconds
 */
   def second = toMilliInterval(expr, MILLIS_PER_SECOND)
 
-/**
+  /**
 * Creates an interval of the given number of milliseconds.
 *
 * @return interval of milliseconds
 */
   def milli = toMilliInterval(expr, 1)
+
+  // row type
+
+  /**
+* Creates a number defining an amount of rows.
+*
+* @return number of rows
+*/
+  def rows = expr
--- End diff --

Doesn't this allow to use `Int.row` wherever `Int` is allowed? Should we 
return a `RowInterval` expression?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r81971150
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/Resolvable.scala
 ---
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.logical
+
+import org.apache.flink.api.table.expressions.Expression
+
+trait Resolvable[T <: AnyRef] {
--- End diff --

Add docs to explain the purpose of `Resolvable`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r81852219
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala
 ---
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table
+
+import org.apache.flink.api.table.expressions.{Expression, 
ExpressionParser}
+import org.apache.flink.api.table.plan.logical._
+
+/**
+  * Group-window specification. Group-windows allow aggregates which are 
computed for a group of
+  * elements. A (time or row-count) window is required to bound the 
infinite input stream into a
+  * finite group. Group-windows are evaluated once per group.
+  */
+trait GroupWindow {
+
+  /**
+* Converts an API class to a logical window for planning. This is an 
internal method.
+*/
+  private[flink] def toLogicalWindow: LogicalWindow
+}
+
+/**
+  * A group-window operating on event-time.
+  *
+  * @param timeField defines the time mode for streaming tables and acts 
as a time attribute for
+  *  batch tables over which the query is evaluated.
+  */
+abstract class EventTimeWindow(timeField: Expression) extends GroupWindow {
+
+  protected var name: Option[Expression] = None
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to in order
+* to access window properties such as window start or end time.
+*
+* @param alias alias for this window
+* @return this window
+*/
+  def as(alias: Expression): EventTimeWindow = {
+this.name = Some(alias)
+this
+  }
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to in order
+* to access window properties such as window start or end time.
+*
+* @param alias alias for this window
+* @return this window
+*/
+  def as(alias: String): EventTimeWindow = 
as(ExpressionParser.parseExpression(alias))
+}
+
+// 

+// Tumbling group-windows
+// 

+
+/**
+  * Tumbling group-window. By default, it works on processing-time.
+  *
+  * @param size size of the window either as number of rows or interval of 
milliseconds
+  */
+class TumblingWindow(size: Expression) extends GroupWindow {
+
+  /**
+* Tumbling group-window. By default, it works on processing-time.
+*
+* @param size size of the window either as number of rows or interval 
of milliseconds
+*/
+  def this(size: String) = this(ExpressionParser.parseExpression(size))
+
+  private var alias: Option[Expression] = None
+
+  /**
+* Defines the time mode for streaming tables and specifies a time 
attribute for
+* batch tables over which the query is evaluated.
+*
+* @param timeField time mode for streaming tables and time attribute 
for batch tables
+* @return a tumbling group-window on event-time
+*/
+  def on(timeField: Expression): TumblingEventTimeWindow =
+new TumblingEventTimeWindow(alias, timeField, size)
+
+  /**
+* Defines the time mode for streaming tables and specifies a time 
attribute for
+* batch tables over which the query is evaluated.
+*
+* @param timeField time mode for streaming tables and time attribute 
for batch tables
+* @return a tumbling group-window on event-time
+*/
+  def on(timeField: String): TumblingEventTimeWindow =
+on(ExpressionParser.parseExpression(timeField))
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to in order
+* to access window properties suc

[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r81860022
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowITCase.scala
 ---
@@ -0,0 +1,777 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import 
org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark
+import org.apache.flink.api.scala.stream.utils.StreamITCase
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.{Row, _}
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidBatchWindow(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Session withGap 10.rows as 'string)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testInvalidRowtime1(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'rowtime, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Tumble over 50.milli)
+  .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime2(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Tumble over 50.milli)
+  .select('string, 'int.count as 'rowtime)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime3(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table.as('rowtime, 'myint, 'mystring)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime4(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Tumble over 50.milli on 'string)
+  .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidTumblingSize(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+   

[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r81971046
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/Resolvable.scala
 ---
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.logical
+
+import org.apache.flink.api.table.expressions.Expression
+
+trait Resolvable[T <: AnyRef] {
+
+  def resolveExpressions(resolver: (Expression) => Expression): T
--- End diff --

Add docs what this method is supposed to do.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r81945161
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -55,28 +56,27 @@ case class Project(projectList: Seq[NamedExpression], 
child: LogicalNode) extend
 
   override def validate(tableEnv: TableEnvironment): LogicalNode = {
 val resolvedProject = super.validate(tableEnv).asInstanceOf[Project]
+val names: mutable.Set[String] = mutable.Set()
 
-def checkUniqueNames(exprs: Seq[Expression]): Unit = {
-  val names: mutable.Set[String] = mutable.Set()
-  exprs.foreach {
-case n: Alias =>
-  // explicit name
-  if (names.contains(n.name)) {
-throw ValidationException(s"Duplicate field name ${n.name}.")
-  } else {
-names.add(n.name)
-  }
-case r: ResolvedFieldReference =>
-  // simple field forwarding
-  if (names.contains(r.name)) {
-throw ValidationException(s"Duplicate field name ${r.name}.")
-  } else {
-names.add(r.name)
-  }
-case _ => // Do nothing
+def checkName(name: String): Unit = {
+  if (names.contains(name)) {
+throw ValidationException(s"Duplicate field name $name.")
--- End diff --

use `failValidation()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r81970900
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalWindow.scala
 ---
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.logical
+
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.api.table.expressions.{Expression, WindowReference}
+import org.apache.flink.api.table.validate.{ValidationFailure, 
ValidationResult, ValidationSuccess}
+
+abstract class LogicalWindow(val alias: Option[Expression]) extends 
Resolvable[LogicalWindow] {
+  def resolveExpressions(resolver: (Expression) => Expression): 
LogicalWindow = this
--- End diff --

add newline


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r81957188
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
 ---
@@ -497,3 +501,95 @@ case class LogicalRelNode(
 
   override def validate(tableEnv: TableEnvironment): LogicalNode = this
 }
+
+case class WindowAggregate(
+groupingExpressions: Seq[Expression],
+window: LogicalWindow,
+aggregateExpressions: Seq[NamedExpression],
+child: LogicalNode)
+  extends UnaryNode {
+
+  override def output: Seq[Attribute] = {
+(groupingExpressions ++ aggregateExpressions) map {
+  case ne: NamedExpression => ne.toAttribute
+  case e => Alias(e, e.toString).toAttribute
+}
+  }
+
+  override def resolveReference(
--- End diff --

please add inline comments to make the code easier to follow


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r81861775
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowITCase.scala
 ---
@@ -0,0 +1,777 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import 
org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark
+import org.apache.flink.api.scala.stream.utils.StreamITCase
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.{Row, _}
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidBatchWindow(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Session withGap 10.rows as 'string)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testInvalidRowtime1(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'rowtime, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Tumble over 50.milli)
+  .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime2(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Tumble over 50.milli)
+  .select('string, 'int.count as 'rowtime)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime3(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table.as('rowtime, 'myint, 'mystring)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime4(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Tumble over 50.milli on 'string)
+  .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidTumblingSize(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+   

[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r81850803
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala
 ---
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table
+
+import org.apache.flink.api.table.expressions.{Expression, 
ExpressionParser}
+import org.apache.flink.api.table.plan.logical._
+
+/**
+  * Group-window specification. Group-windows allow aggregates which are 
computed for a group of
+  * elements. A (time or row-count) window is required to bound the 
infinite input stream into a
+  * finite group. Group-windows are evaluated once per group.
+  */
+trait GroupWindow {
+
+  /**
+* Converts an API class to a logical window for planning. This is an 
internal method.
+*/
+  private[flink] def toLogicalWindow: LogicalWindow
+}
+
+/**
+  * A group-window operating on event-time.
+  *
+  * @param timeField defines the time mode for streaming tables and acts 
as a time attribute for
+  *  batch tables over which the query is evaluated.
+  */
+abstract class EventTimeWindow(timeField: Expression) extends GroupWindow {
+
+  protected var name: Option[Expression] = None
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to in order
+* to access window properties such as window start or end time.
+*
+* @param alias alias for this window
+* @return this window
+*/
+  def as(alias: Expression): EventTimeWindow = {
+this.name = Some(alias)
+this
+  }
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to in order
+* to access window properties such as window start or end time.
+*
+* @param alias alias for this window
+* @return this window
+*/
+  def as(alias: String): EventTimeWindow = 
as(ExpressionParser.parseExpression(alias))
+}
+
+// 

+// Tumbling group-windows
+// 

+
+/**
+  * Tumbling group-window. By default, it works on processing-time.
--- End diff --

Add hint for how to create event time window.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r81857075
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/windowDsl.scala
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table
+
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.table.{SessionWindow, SlideWithSize, 
TumblingWindow}
+
+/**
+  * Helper object for creating a tumbling window. In a tumbling window 
elements are assigned to
+  * fixed length, non-overlapping windows of a specified window size. For 
example, if you specify a
+  * window size of 5 minutes, the following operation will get 5 minutes 
worth of elements in
+  * each invocation.
+  */
+object Tumble {
+
+  /**
+* Creates a tumbling window. In a tumbling window elements are 
assigned to fixed length,
+* non-overlapping windows of a specified window size. For example, if 
you specify a window
+* size of 5 minutes, the following operation will get 5 minutes worth 
of elements in
--- End diff --

I think following operation is not well defined. How about "For example, if 
you specify a tumbling window of 5 minutes size, elements will be grouped in 5 
minute intervals."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r81857293
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/windowDsl.scala
 ---
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.table
+
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.table.{SessionWindow, SlideWithSize, 
TumblingWindow}
+
+/**
+  * Helper object for creating a tumbling window. In a tumbling window 
elements are assigned to
+  * fixed length, non-overlapping windows of a specified window size. For 
example, if you specify a
+  * window size of 5 minutes, the following operation will get 5 minutes 
worth of elements in
+  * each invocation.
+  */
+object Tumble {
+
+  /**
+* Creates a tumbling window. In a tumbling window elements are 
assigned to fixed length,
+* non-overlapping windows of a specified window size. For example, if 
you specify a window
--- End diff --

"fixed length ... of a specified window size" is duplicate. How about 
"non-overlapping windows of a specified fixed length"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r81861356
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowITCase.scala
 ---
@@ -0,0 +1,777 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import 
org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark
+import org.apache.flink.api.scala.stream.utils.StreamITCase
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.{Row, _}
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidBatchWindow(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Session withGap 10.rows as 'string)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testInvalidRowtime1(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'rowtime, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Tumble over 50.milli)
+  .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime2(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Tumble over 50.milli)
+  .select('string, 'int.count as 'rowtime)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime3(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table.as('rowtime, 'myint, 'mystring)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime4(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Tumble over 50.milli on 'string)
+  .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidTumblingSize(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+   

[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r81852394
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala
 ---
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table
+
+import org.apache.flink.api.table.expressions.{Expression, 
ExpressionParser}
+import org.apache.flink.api.table.plan.logical._
+
+/**
+  * Group-window specification. Group-windows allow aggregates which are 
computed for a group of
+  * elements. A (time or row-count) window is required to bound the 
infinite input stream into a
+  * finite group. Group-windows are evaluated once per group.
+  */
+trait GroupWindow {
+
+  /**
+* Converts an API class to a logical window for planning. This is an 
internal method.
+*/
+  private[flink] def toLogicalWindow: LogicalWindow
+}
+
+/**
+  * A group-window operating on event-time.
+  *
+  * @param timeField defines the time mode for streaming tables and acts 
as a time attribute for
+  *  batch tables over which the query is evaluated.
+  */
+abstract class EventTimeWindow(timeField: Expression) extends GroupWindow {
+
+  protected var name: Option[Expression] = None
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to in order
+* to access window properties such as window start or end time.
+*
+* @param alias alias for this window
+* @return this window
+*/
+  def as(alias: Expression): EventTimeWindow = {
+this.name = Some(alias)
+this
+  }
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to in order
+* to access window properties such as window start or end time.
+*
+* @param alias alias for this window
+* @return this window
+*/
+  def as(alias: String): EventTimeWindow = 
as(ExpressionParser.parseExpression(alias))
+}
+
+// 

+// Tumbling group-windows
+// 

+
+/**
+  * Tumbling group-window. By default, it works on processing-time.
+  *
+  * @param size size of the window either as number of rows or interval of 
milliseconds
+  */
+class TumblingWindow(size: Expression) extends GroupWindow {
+
+  /**
+* Tumbling group-window. By default, it works on processing-time.
+*
+* @param size size of the window either as number of rows or interval 
of milliseconds
+*/
+  def this(size: String) = this(ExpressionParser.parseExpression(size))
+
+  private var alias: Option[Expression] = None
+
+  /**
+* Defines the time mode for streaming tables and specifies a time 
attribute for
+* batch tables over which the query is evaluated.
+*
+* @param timeField time mode for streaming tables and time attribute 
for batch tables
+* @return a tumbling group-window on event-time
+*/
+  def on(timeField: Expression): TumblingEventTimeWindow =
+new TumblingEventTimeWindow(alias, timeField, size)
+
+  /**
+* Defines the time mode for streaming tables and specifies a time 
attribute for
+* batch tables over which the query is evaluated.
+*
+* @param timeField time mode for streaming tables and time attribute 
for batch tables
+* @return a tumbling group-window on event-time
+*/
+  def on(timeField: String): TumblingEventTimeWindow =
+on(ExpressionParser.parseExpression(timeField))
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to in order
+* to access window properties suc

[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r81862223
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowITCase.scala
 ---
@@ -0,0 +1,777 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import 
org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark
+import org.apache.flink.api.scala.stream.utils.StreamITCase
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.{Row, _}
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidBatchWindow(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Session withGap 10.rows as 'string)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testInvalidRowtime1(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'rowtime, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Tumble over 50.milli)
+  .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime2(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Tumble over 50.milli)
+  .select('string, 'int.count as 'rowtime)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime3(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table.as('rowtime, 'myint, 'mystring)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime4(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Tumble over 50.milli on 'string)
+  .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidTumblingSize(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+   

[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r81970778
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalWindow.scala
 ---
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.logical
+
+import org.apache.flink.api.table.TableEnvironment
+import org.apache.flink.api.table.expressions.{Expression, WindowReference}
+import org.apache.flink.api.table.validate.{ValidationFailure, 
ValidationResult, ValidationSuccess}
+
+abstract class LogicalWindow(val alias: Option[Expression]) extends 
Resolvable[LogicalWindow] {
--- End diff --

rename to `LogicalGroupWindow` and merge with `groupWindows.scala`? 
Or is this class supposed to be used for row windows as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r81860061
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowITCase.scala
 ---
@@ -0,0 +1,777 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import 
org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark
+import org.apache.flink.api.scala.stream.utils.StreamITCase
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.{Row, _}
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidBatchWindow(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Session withGap 10.rows as 'string)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testInvalidRowtime1(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'rowtime, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Tumble over 50.milli)
+  .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime2(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Tumble over 50.milli)
+  .select('string, 'int.count as 'rowtime)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime3(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table.as('rowtime, 'myint, 'mystring)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidRowtime4(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
+val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+table
+  .groupBy('string)
+  .window(Tumble over 50.milli on 'string)
+  .select('string, 'int.count)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidTumblingSize(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+   

[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r81847736
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
 ---
@@ -675,11 +692,77 @@ class GroupedTable(
 * Example:
 *
 * {{{
-*   tab.groupBy("key").select("key, value.avg + " The average" as 
average")
+*   tab.groupBy("key").select("key, value.avg + ' The average' as 
average")
+* }}}
+*/
+  def select(fields: String): Table = {
+val fieldExprs = ExpressionParser.parseExpressionList(fields)
+select(fieldExprs: _*)
+  }
+
+  /**
+* Windows a table to divide a (potentially) infinite stream of records 
into finite slices
+* based on the timestamps of elements or other criteria. This division 
is required when
+* working with infinite data and performing transformations that 
aggregate elements.
+*
+* @param groupWindow group-window specification required to bound the 
infinite input stream
+*into a finite group
+* @return group-windowed table
+*/
+  def window(groupWindow: GroupWindow): GroupWindowedTable = {
+if (table.tableEnv.isInstanceOf[BatchTableEnvironment]) {
+  throw new ValidationException(s"Windows on batch tables are 
currently not supported.")
+}
+new GroupWindowedTable(table, groupKey, groupWindow)
+  }
+}
+
+class GroupWindowedTable(
+private[flink] val table: Table,
+private[flink] val groupKey: Seq[Expression],
+private[flink] val window: GroupWindow) {
+
+  /**
+* Performs a selection operation on a group-windowed table. Similar to 
an SQL SELECT statement.
+* The field expressions can contain complex expressions and 
aggregations.
+*
+* Example:
+*
+* {{{
+*   groupWindowTable.select('key, 'window.start, 'value.avg + " The 
average" as 'average)
+* }}}
+*/
+  def select(fields: Expression*): Table = {
+val projectionOnAggregates = fields.map(extractAggregations(_, 
table.tableEnv))
+val aggregations = projectionOnAggregates.flatMap(_._2)
+
+val groupWindow = window.toLogicalWindow
+
+val logical = if (aggregations.nonEmpty) {
+  Project(projectionOnAggregates.map(e => UnresolvedAlias(e._1)),
+WindowAggregate(groupKey, groupWindow, aggregations, 
table.logicalPlan)
+  .validate(table.tableEnv))
+} else {
+  Project(projectionOnAggregates.map(e => UnresolvedAlias(e._1)),
+WindowAggregate(groupKey, groupWindow, Nil, 
table.logicalPlan).validate(table.tableEnv))
+}
+
+new Table(table.tableEnv, logical.validate(table.tableEnv))
--- End diff --

Wasn't `logical` validated before?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r81983925
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/groupWindows.scala
 ---
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.logical
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.table.{StreamTableEnvironment, 
TableEnvironment}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, 
TypeCoercion}
+import org.apache.flink.api.table.validate.{ValidationFailure, 
ValidationResult, ValidationSuccess}
+
+abstract class EventTimeGroupWindow(
+name: Option[Expression],
+time: Expression)
+  extends LogicalWindow(name) {
+
+  override def validate(tableEnv: TableEnvironment): ValidationResult = {
+val valid = super.validate(tableEnv)
+if (valid.isFailure) {
+return valid
+}
+
+if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
+  val valid = time match {
+case RowtimeAttribute() =>
+  ValidationSuccess
+case _ =>
+  ValidationFailure("Event-time window expects a 'rowtime' time 
field.")
+  }
+  if (valid.isFailure) {
+return valid
+  }
+}
+if (!TypeCoercion.canCast(time.resultType, 
BasicTypeInfo.LONG_TYPE_INFO)) {
+  ValidationFailure(s"Event-time window expects a time field that can 
be safely cast " +
+s"to Long, but is ${time.resultType}")
+} else {
+  ValidationSuccess
+}
+  }
+}
+
+abstract class ProcessingTimeGroupWindow(name: Option[Expression]) extends 
LogicalWindow(name)
+
+// 

+// Tumbling group windows
+// 

+
+object TumblingGroupWindow {
+  def validate(tableEnv: TableEnvironment, size: Expression): 
ValidationResult = size match {
+case Literal(_, IntervalTypeInfo.INTERVAL_MILLIS) =>
+  ValidationSuccess
+case Literal(_, BasicTypeInfo.LONG_TYPE_INFO) | Literal(_, 
BasicTypeInfo.INT_TYPE_INFO) =>
+  ValidationSuccess
+case _ =>
+  ValidationFailure(
+"Tumbling window expects size literal of type Interval of 
Milliseconds or Long/Integer.")
+  }
+}
+
+case class ProcessingTimeTumblingGroupWindow(
+name: Option[Expression],
+size: Expression)
+  extends ProcessingTimeGroupWindow(name) {
+
+  override def resolveExpressions(resolve: (Expression) => Expression): 
LogicalWindow =
+ProcessingTimeTumblingGroupWindow(
+  name.map(resolve),
+  resolve(size))
+
+  override def validate(tableEnv: TableEnvironment): ValidationResult =
+super.validate(tableEnv).orElse(TumblingGroupWindow.validate(tableEnv, 
size))
+
+  override def toString: String = 
s"ProcessingTimeTumblingGroupWindow($name, $size)"
+}
+
+case class EventTimeTumblingGroupWindow(
+name: Option[Expression],
+timeField: Expression,
+size: Expression)
+  extends EventTimeGroupWindow(
+name,
+timeField) {
+
+  override def resolveExpressions(resolve: (Expression) => Expression): 
LogicalWindow =
+EventTimeTumblingGroupWindow(
+  name.map(resolve),
+  resolve(timeField),
+  resolve(size))
+
+  override def validate(tableEnv: TableEnvironment): ValidationResult =
+super.validate(tableEnv).orElse(TumblingGroupWindow.validate(tableEnv, 
size))
+
+  override def toString: String = s"EventTimeTumblingGroupWindow($name, 
$timeField, $size)"
+}
+
+// 
-

[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r81976580
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/groupWindows.scala
 ---
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.logical
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.table.{StreamTableEnvironment, 
TableEnvironment}
+import org.apache.flink.api.table.expressions._
+import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, 
TypeCoercion}
+import org.apache.flink.api.table.validate.{ValidationFailure, 
ValidationResult, ValidationSuccess}
+
+abstract class EventTimeGroupWindow(
+name: Option[Expression],
+time: Expression)
+  extends LogicalWindow(name) {
+
+  override def validate(tableEnv: TableEnvironment): ValidationResult = {
+val valid = super.validate(tableEnv)
+if (valid.isFailure) {
+return valid
+}
+
+if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
+  val valid = time match {
+case RowtimeAttribute() =>
+  ValidationSuccess
+case _ =>
+  ValidationFailure("Event-time window expects a 'rowtime' time 
field.")
+  }
+  if (valid.isFailure) {
+return valid
+  }
+}
+if (!TypeCoercion.canCast(time.resultType, 
BasicTypeInfo.LONG_TYPE_INFO)) {
--- End diff --

Can you put this in an `else` branch (for `BatchTableEnvironment`)? 
`RowtimeAttribute`'s result type is `Long`. So it will always pass the check. 
The `valid.isFailure` check could also be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r81852306
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala
 ---
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table
+
+import org.apache.flink.api.table.expressions.{Expression, 
ExpressionParser}
+import org.apache.flink.api.table.plan.logical._
+
+/**
+  * Group-window specification. Group-windows allow aggregates which are 
computed for a group of
+  * elements. A (time or row-count) window is required to bound the 
infinite input stream into a
+  * finite group. Group-windows are evaluated once per group.
+  */
+trait GroupWindow {
+
+  /**
+* Converts an API class to a logical window for planning. This is an 
internal method.
+*/
+  private[flink] def toLogicalWindow: LogicalWindow
+}
+
+/**
+  * A group-window operating on event-time.
+  *
+  * @param timeField defines the time mode for streaming tables and acts 
as a time attribute for
+  *  batch tables over which the query is evaluated.
+  */
+abstract class EventTimeWindow(timeField: Expression) extends GroupWindow {
+
+  protected var name: Option[Expression] = None
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to in order
+* to access window properties such as window start or end time.
+*
+* @param alias alias for this window
+* @return this window
+*/
+  def as(alias: Expression): EventTimeWindow = {
+this.name = Some(alias)
+this
+  }
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to in order
+* to access window properties such as window start or end time.
+*
+* @param alias alias for this window
+* @return this window
+*/
+  def as(alias: String): EventTimeWindow = 
as(ExpressionParser.parseExpression(alias))
+}
+
+// 

+// Tumbling group-windows
+// 

+
+/**
+  * Tumbling group-window. By default, it works on processing-time.
+  *
+  * @param size size of the window either as number of rows or interval of 
milliseconds
+  */
+class TumblingWindow(size: Expression) extends GroupWindow {
+
+  /**
+* Tumbling group-window. By default, it works on processing-time.
+*
+* @param size size of the window either as number of rows or interval 
of milliseconds
+*/
+  def this(size: String) = this(ExpressionParser.parseExpression(size))
+
+  private var alias: Option[Expression] = None
+
+  /**
+* Defines the time mode for streaming tables and specifies a time 
attribute for
+* batch tables over which the query is evaluated.
+*
+* @param timeField time mode for streaming tables and time attribute 
for batch tables
+* @return a tumbling group-window on event-time
+*/
+  def on(timeField: Expression): TumblingEventTimeWindow =
+new TumblingEventTimeWindow(alias, timeField, size)
+
+  /**
+* Defines the time mode for streaming tables and specifies a time 
attribute for
+* batch tables over which the query is evaluated.
+*
+* @param timeField time mode for streaming tables and time attribute 
for batch tables
+* @return a tumbling group-window on event-time
+*/
+  def on(timeField: String): TumblingEventTimeWindow =
+on(ExpressionParser.parseExpression(timeField))
+
+  /**
+* Assigns an alias for this window that the following `select()` 
clause can refer to in order
+* to access window properties suc

[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r81854254
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
 ---
@@ -50,6 +50,10 @@ object WordCountTable {
   .filter('frequency === 2)
   .toDataSet[WC]
 
+val w = Slide.over(23).every(23).toLogicalWindow.toString
--- End diff --

please remove changes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-10-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2562#discussion_r81859596
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowITCase.scala
 ---
@@ -0,0 +1,777 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import 
org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark
+import org.apache.flink.api.scala.stream.utils.StreamITCase
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.{Row, _}
+import org.apache.flink.streaming.api.TimeCharacteristic
+import 
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class GroupWindowITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidBatchWindow(): Unit = {
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val data = new mutable.MutableList[(Long, Int, String)]
+val stream = env.fromCollection(data)
--- End diff --

not a stream


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...

2016-09-28 Thread twalthr
GitHub user twalthr opened a pull request:

https://github.com/apache/flink/pull/2562

[FLINK-4691] [table] Add group-windows for streaming tables

This PR implements Tumble, Slide, Session group-windows for streaming 
tables as described in FLIP-11. It adds API, validation, logical 
representation, and runtime components.

Some additional comments:

I have not implemented the 'systemtime' keyword yet as this would cause 
more problems than it solves. Especially integrating it into the validation 
layer would be tricky. The resolution of those special fields happens within a 
WindowAggregate, however, the logical type of a window should already be known 
at this point. We are mixing logical operators and expressions which is not 
very nice. Furthermore, what happens in batch environment if 'systemtime' is 
used? It could also be a existing column but does not have to be one. That is 
not specified in the FLIP yet.

The aggregations are not very efficient yet. Currently this PR uses window 
functions that wrap the GroupReduce functions. We have to rework the 
aggregations first. Maybe we could use `WindowedStream#apply(R, 
FoldFunction, WindowFunction, TypeInformation)` which means 
that `R` has to be created in the translation phase.

The tests are mainly ITCases yet, we might want to change that to unit 
tests once we have means (like new test bases) to do that.

The website documentation is missing yet.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/twalthr/flink FLINK-4691

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2562.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2562


commit edbfe00cb0fd7ea8362c90988eb0860eb9ce6078
Author: twalthr 
Date:   2016-08-25T07:19:53Z

[FLINK-4691] [table] Add group-windows for streaming tables




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---