[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2562 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r83009444 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/PropertyRead.scala --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime.aggregate + +import org.apache.flink.streaming.api.windowing.windows.Window + +/** + * Base class for reading a window property. The property will be extracted once and + * can be read multiple times. + */ +trait PropertyRead[T] extends Serializable { + + def extract(window: Window): Unit --- End diff -- Yes, every aggregation should only happen once. We should definitely do that. Btw. we can also get rid of `AvgAggregate` once `AggregateReduceFunctionsRule` is enabled. So there are many open issues with the current aggregate implementation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r83008188 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowIntervalTypeInfo.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.typeutils + +import java.util.Objects + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} +import org.apache.flink.api.common.typeutils.base.{LongComparator, LongSerializer} +import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer} +import org.apache.flink.api.table.typeutils.RowIntervalTypeInfo.instantiateComparator +import org.apache.flink.util.Preconditions._ + +/** + * TypeInformation for row intervals. + */ +@SerialVersionUID(-1306179424364925258L) +class RowIntervalTypeInfo[T]( --- End diff -- That's a good idea. I will do that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r83000595 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/windows/Session.java --- @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.windows; + +import org.apache.flink.api.table.SessionWindow; + +/** + * Helper class for creating a session window. Session windows are ideal for cases where the + * window boundaries need to adjust to the incoming data. In a session window it is possible to + * have windows that start at individual points in time for each key and that end once there has + * been a certain period of inactivity. + */ +public class Session { --- End diff -- I see, that makes sense. I think we should we convert the Java classes into Scala classes because they are in the Scala source folder. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82999720 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowIntervalTypeInfo.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.typeutils + +import java.util.Objects + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} +import org.apache.flink.api.common.typeutils.base.{LongComparator, LongSerializer} +import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer} +import org.apache.flink.api.table.typeutils.RowIntervalTypeInfo.instantiateComparator +import org.apache.flink.util.Preconditions._ + +/** + * TypeInformation for row intervals. + */ +@SerialVersionUID(-1306179424364925258L) +class RowIntervalTypeInfo[T]( --- End diff -- OK, I understand the issue with the instance checks (although I would think these checks are not correct if they fail in such a cases). However, providing a full implementation for each internal type does not seem right. How about we create a special abstract TypeInfo for types that are not required at execution time and implement all irrelevant methods (arity, serializer, comparator, etc.) with `UnsupportedOperationException`. `RowIntervalTypeInfo` and `TimeIntervalTypeInfo` would extend that and just provide the type. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82993938 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/PropertyRead.scala --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime.aggregate + +import org.apache.flink.streaming.api.windowing.windows.Window + +/** + * Base class for reading a window property. The property will be extracted once and + * can be read multiple times. + */ +trait PropertyRead[T] extends Serializable { + + def extract(window: Window): Unit --- End diff -- In general I agree to your solution, but right now we support to have the same operation multiple times in a query. E.g. ``` .window(Session withGap 3.milli on 'rowtime as 'w) .select('string, 'w.end, 'w.end) ``` Your code would fail. Let's leave the aggregation as it is for now and rework it again later. The aggregations have to be reworked anyway for efficiency. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82991170 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala --- @@ -23,6 +23,8 @@ import org.apache.flink.api.table.Row import java.math.BigDecimal import java.math.BigInteger +import org.apache.flink.streaming.api.windowing.windows.Window --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82990475 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala --- @@ -27,48 +27,65 @@ import scala.collection.mutable.ListBuffer object RexNodeTranslator { --- End diff -- Good point. I renamed it to `ProjectionTranslator`. Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82990268 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/properties.scala --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions + +import org.apache.calcite.rex.RexNode +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo +import org.apache.flink.api.table.FlinkRelBuilder.NamedProperty +import org.apache.flink.api.table.validate.{ValidationFailure, ValidationSuccess} + +abstract class Property(child: Expression) extends UnaryExpression { + + override def toString = s"Property($child)" + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = +throw new UnsupportedOperationException("Property cannot be transformed to RexNode.") + + override private[flink] def validateInput() = +if (child.isInstanceOf[WindowReference]) { --- End diff -- I just tried to keep the names short. Because the Scala line lengths are pretty strict. Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82989243 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/windows/Session.java --- @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.windows; + +import org.apache.flink.api.table.SessionWindow; + +/** + * Helper class for creating a session window. Session windows are ideal for cases where the + * window boundaries need to adjust to the incoming data. In a session window it is possible to + * have windows that start at individual points in time for each key and that end once there has + * been a certain period of inactivity. + */ +public class Session { --- End diff -- Another reason was that for a Scala user it would be an additional import. By now, `import org.apache.flink.api.scala.table._` is enough for using the Table API. If I move it in the general table package, the user has to import it and cannot use the fluent API anymore. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82988652 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/windows/Session.java --- @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.windows; + +import org.apache.flink.api.table.SessionWindow; + +/** + * Helper class for creating a session window. Session windows are ideal for cases where the + * window boundaries need to adjust to the incoming data. In a session window it is possible to + * have windows that start at individual points in time for each key and that end once there has + * been a certain period of inactivity. + */ +public class Session { --- End diff -- During the development I had the problem that I had to use `Session$.MODULE$.withGap(10)`. See [1]. But now it is not an companion object anymore, so it works. I will change that. [1] http://stackoverflow.com/questions/3282653/how-do-you-call-a-scala-singleton-method-from-java --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82884874 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/aggregations.scala --- @@ -18,12 +18,15 @@ package org.apache.flink.api.table.expressions import org.apache.calcite.rex.RexNode +import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes, SqlTypeName} --- End diff -- Clean up imports --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82899030 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/Aggregate.scala --- @@ -19,6 +19,7 @@ package org.apache.flink.api.table.runtime.aggregate import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.table.Row +import org.apache.flink.streaming.api.windowing.windows.Window --- End diff -- Remove change --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82964364 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowIntervalTypeInfo.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.typeutils + +import java.util.Objects + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} +import org.apache.flink.api.common.typeutils.base.{LongComparator, LongSerializer} +import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer} +import org.apache.flink.api.table.typeutils.RowIntervalTypeInfo.instantiateComparator +import org.apache.flink.util.Preconditions._ + +/** + * TypeInformation for row intervals. + */ +@SerialVersionUID(-1306179424364925258L) +class RowIntervalTypeInfo[T]( --- End diff -- Can we define this class as ``` class RowIntervalTypeInfo extends IntegerTypeInfo[java.lang.Long]( classOf[java.lang.Long], Array[Class[_]](classOf[Float], classOf[Double], classOf[Character]), LongSerializer.INSTANCE, classOf[LongComparator]) {} object RowIntervalTypeInfo { val INTERVAL_ROWS = new RowIntervalTypeInfo() } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82901298 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowITCase.scala --- @@ -0,0 +1,873 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark +import org.apache.flink.api.scala.stream.utils.StreamITCase +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.{Row, _} +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.junit.Assert._ +import org.junit.{Ignore, Test} + +import scala.collection.mutable + +class GroupWindowITCase extends StreamingMultipleProgramsTestBase { --- End diff -- #2595 has been merged. Please convert into lightweight unit tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82899800 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AvgAggregate.scala --- @@ -23,6 +23,8 @@ import org.apache.flink.api.table.Row import java.math.BigDecimal import java.math.BigInteger +import org.apache.flink.streaming.api.windowing.windows.Window --- End diff -- Please undo the reformatting changes and unnecessary import on this file. Please check the other `Aggregate` implementations as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82964905 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/RowIntervalTypeInfo.scala --- @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.typeutils + +import java.util.Objects + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} +import org.apache.flink.api.common.typeutils.base.{LongComparator, LongSerializer} +import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer} +import org.apache.flink.api.table.typeutils.RowIntervalTypeInfo.instantiateComparator +import org.apache.flink.util.Preconditions._ + +/** + * TypeInformation for row intervals. + */ +@SerialVersionUID(-1306179424364925258L) +class RowIntervalTypeInfo[T]( --- End diff -- I know why we need this class, but IMO it doesn't look like a clean design to require a `TypeInformation` for things that will never be serialized or compared during execution. Did we consider using Calcite's type system during validation? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82971868 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/PropertyCollector.scala --- @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime.aggregate + +import org.apache.flink.api.table.Row +import org.apache.flink.util.Collector + +/** + * Adds properties to the end of a row before it emits it to the final collector. + * The collector assumes that the row has placeholders at the end that can be filled. + */ +class PropertyCollector(properties: Array[PropertyRead[_ <: Any]]) extends Collector[Row] { --- End diff -- Please see suggestion on `PropertyRead` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82886893 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/properties.scala --- @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.expressions + +import org.apache.calcite.rex.RexNode +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo +import org.apache.flink.api.table.FlinkRelBuilder.NamedProperty +import org.apache.flink.api.table.validate.{ValidationFailure, ValidationSuccess} + +abstract class Property(child: Expression) extends UnaryExpression { + + override def toString = s"Property($child)" + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = +throw new UnsupportedOperationException("Property cannot be transformed to RexNode.") + + override private[flink] def validateInput() = +if (child.isInstanceOf[WindowReference]) { --- End diff -- validation requires window reference child. Rename `Property` to `WindowProperty`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82880181 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/windows/Session.java --- @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.table.windows; + +import org.apache.flink.api.table.SessionWindow; + +/** + * Helper class for creating a session window. Session windows are ideal for cases where the + * window boundaries need to adjust to the incoming data. In a session window it is possible to + * have windows that start at individual points in time for each key and that end once there has + * been a certain period of inactivity. + */ +public class Session { --- End diff -- Why dedicated Java classes for the Java API windows and not additional methods in the Scala API windows (same as in table.scala)? Might lead to mixed up imports because Java and Scala classes have the same names. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82968570 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeutils/TimeIntervalTypeInfo.scala --- @@ -24,14 +24,14 @@ import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.base.{IntComparator, IntSerializer, LongComparator, LongSerializer} import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer} -import org.apache.flink.api.table.typeutils.IntervalTypeInfo.instantiateComparator +import org.apache.flink.api.table.typeutils.TimeIntervalTypeInfo.instantiateComparator import org.apache.flink.util.Preconditions._ /** * TypeInformation for SQL INTERVAL types. */ @SerialVersionUID(-1816179424364825258L) -class IntervalTypeInfo[T]( +class TimeIntervalTypeInfo[T]( --- End diff -- Define `TimeIntervalTypeInfo` as ``` class TimeIntervalTypeInfo[T]( val clazz: Class[T], val serializer: TypeSerializer[T], val comparatorClazz: Class[_ <: TypeComparator[T]]) extends IntegerTypeInfo[T]( clazz, Array[Class[_]](classOf[Float], classOf[Double], classOf[Character]), serializer, comparatorClazz) object TimeIntervalTypeInfo { val INTERVAL_MONTHS = new TimeIntervalTypeInfo( classOf[java.lang.Integer], IntSerializer.INSTANCE, classOf[IntComparator]) val INTERVAL_MILLIS = new TimeIntervalTypeInfo( classOf[java.lang.Long], LongSerializer.INSTANCE, classOf[LongComparator]) } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82971688 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/PropertyRead.scala --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime.aggregate + +import org.apache.flink.streaming.api.windowing.windows.Window + +/** + * Base class for reading a window property. The property will be extracted once and + * can be read multiple times. + */ +trait PropertyRead[T] extends Serializable { + + def extract(window: Window): Unit --- End diff -- I think we do not need a generic interface to extract properties. The `extract` method requires a window object but not all properties might be present in the window object. So I am not sure how much generalization is required here. I think we can implement a `TimeWindowPropertyCollector` that works as follows: ``` class TimeWindowPropertyCollector(start: Int, end: Int) extends Collector[Row] { var finalCollector: Collector[Row] = _ var timeWindow: TimeWindow = _ override def collect(record: Row): Unit = { if (start > -1) { record.setField(start, timeWindow.getStart) } if (end > -1) { record.setField(end, timeWindow.getEnd) } finalCollector.collect(record) } override def close(): Unit = finalCollector.close() } ``` If we (ever) need more properties, we can add another wrapping collector or change the design. But at this point I think this is sufficient. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82887623 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala --- @@ -27,48 +27,65 @@ import scala.collection.mutable.ListBuffer object RexNodeTranslator { --- End diff -- This class does not deal with RexNodes anymore and could be renamed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82186936 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala --- @@ -325,33 +337,42 @@ trait ImplicitExpressionOperations { */ def day = toMilliInterval(expr, MILLIS_PER_DAY) -/** + /** * Creates an interval of the given number of hours. * * @return interval of milliseconds */ def hour = toMilliInterval(expr, MILLIS_PER_HOUR) -/** + /** * Creates an interval of the given number of minutes. * * @return interval of milliseconds */ def minute = toMilliInterval(expr, MILLIS_PER_MINUTE) -/** + /** * Creates an interval of the given number of seconds. * * @return interval of milliseconds */ def second = toMilliInterval(expr, MILLIS_PER_SECOND) -/** + /** * Creates an interval of the given number of milliseconds. * * @return interval of milliseconds */ def milli = toMilliInterval(expr, 1) + + // row type + + /** +* Creates a number defining an amount of rows. +* +* @return number of rows +*/ + def rows = expr --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82177134 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.{CombineFunction, RichGroupReduceFunction} +import org.apache.flink.api.table.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.{Collector, Preconditions} + +import scala.collection.JavaConversions._ + + +/** + * It wraps the aggregate logic inside of + * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and + * [[org.apache.flink.api.java.operators.GroupCombineOperator]] + * + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + * @param aggregateMapping The index mapping between aggregate function list and aggregated value + * index in output Row. + */ +class AggregateReduceCombineFunction( +private val aggregates: Array[Aggregate[_ <: Any]], +private val groupKeysMapping: Array[(Int, Int)], +private val aggregateMapping: Array[(Int, Int)], +private val intermediateRowArity: Int) +extends RichGroupReduceFunction[Row, Row] with CombineFunction[Row, Row] { + + private var aggregateBuffer: Row = _ + private var output: Row = _ + private var aggContext: AggContext = _ + + /** +* Sets a new aggregation context used for [[Aggregate.evaluate()]]. +*/ + def setAggContext(aggContext: AggContext): Unit = { +this.aggContext = aggContext + } + + override def open(config: Configuration): Unit = { +Preconditions.checkNotNull(aggregates) +Preconditions.checkNotNull(groupKeysMapping) +val finalRowLength: Int = groupKeysMapping.length + aggregateMapping.length +aggregateBuffer = new Row(intermediateRowArity) +output = new Row(finalRowLength) +aggContext = new AggContext + } + + /** + * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer, + * calculate aggregated values output by aggregate buffer, and set them into output + * Row based on the mapping relation between intermediate aggregate Row and output Row. + * + * @param records Grouped intermediate aggregate Rows iterator. + * @param out The collector to hand results to. + * + */ + override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { + +// Initiate intermediate aggregate value. +aggregates.foreach(_.initiate(aggregateBuffer)) + +// Merge intermediate aggregate value to buffer. +var last: Row = null +records.foreach((record) => { + aggregates.foreach(_.merge(record, aggregateBuffer)) + last = record +}) + +// Set group keys value to final output. +groupKeysMapping.map { --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82177163 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.{CombineFunction, RichGroupReduceFunction} +import org.apache.flink.api.table.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.{Collector, Preconditions} + +import scala.collection.JavaConversions._ + + +/** + * It wraps the aggregate logic inside of + * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and + * [[org.apache.flink.api.java.operators.GroupCombineOperator]] + * + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + * @param aggregateMapping The index mapping between aggregate function list and aggregated value + * index in output Row. + */ +class AggregateReduceCombineFunction( +private val aggregates: Array[Aggregate[_ <: Any]], +private val groupKeysMapping: Array[(Int, Int)], +private val aggregateMapping: Array[(Int, Int)], +private val intermediateRowArity: Int) +extends RichGroupReduceFunction[Row, Row] with CombineFunction[Row, Row] { + + private var aggregateBuffer: Row = _ + private var output: Row = _ + private var aggContext: AggContext = _ + + /** +* Sets a new aggregation context used for [[Aggregate.evaluate()]]. +*/ + def setAggContext(aggContext: AggContext): Unit = { +this.aggContext = aggContext + } + + override def open(config: Configuration): Unit = { +Preconditions.checkNotNull(aggregates) +Preconditions.checkNotNull(groupKeysMapping) +val finalRowLength: Int = groupKeysMapping.length + aggregateMapping.length +aggregateBuffer = new Row(intermediateRowArity) +output = new Row(finalRowLength) +aggContext = new AggContext + } + + /** + * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer, + * calculate aggregated values output by aggregate buffer, and set them into output + * Row based on the mapping relation between intermediate aggregate Row and output Row. + * + * @param records Grouped intermediate aggregate Rows iterator. + * @param out The collector to hand results to. + * + */ + override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { + +// Initiate intermediate aggregate value. +aggregates.foreach(_.initiate(aggregateBuffer)) + +// Merge intermediate aggregate value to buffer. +var last: Row = null +records.foreach((record) => { + aggregates.foreach(_.merge(record, aggregateBuffer)) + last = record +}) + +// Set group keys value to final output. +groupKeysMapping.map { + case (after, previous) => +output.setField(after, last.productElement(previous)) +} + +// Evaluate final aggregate value and set to output. +aggregateMapping.map { --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82176556 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala --- @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.api.table.expressions.{Expression, Literal} +import org.apache.flink.api.table.plan.logical._ +import org.apache.flink.api.table.plan.nodes.FlinkAggregate +import org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate.{createKeyedWindowedStream, createNonKeyedWindowedStream} +import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._ +import org.apache.flink.api.table.runtime.aggregate.{AggregateAllWindowFunction, AggregateUtil, AggregateWindowFunction} +import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval +import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, RowTypeInfo, TypeCheckUtils, TypeConverter} +import org.apache.flink.api.table.{FlinkTypeFactory, Row, StreamTableEnvironment} +import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream} +import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction} +import org.apache.flink.streaming.api.windowing.assigners._ +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow} + +import scala.collection.JavaConverters._ + +class DataStreamAggregate( +window: LogicalWindow, +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +rowRelDataType: RelDataType, +inputType: RelDataType, +grouping: Array[Int]) + extends SingleRel(cluster, traitSet, inputNode) + with FlinkAggregate + with DataStreamRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamAggregate( + window, + cluster, + traitSet, + inputs.get(0), + namedAggregates, + getRowType, + inputType, + grouping) + } + + override def toString: String = { +s"Aggregate(${ if (!grouping.isEmpty) { + s"groupBy: (${groupingToString(inputType, grouping)}), " +} else { + "" +}}window: ($window), " + + s"select: (${aggregationToString(inputType, grouping, getRowType, namedAggregates)}))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty) + .item("window", window) + .item("select", aggregationToString(inputType, grouping, getRowType, namedAggregates)) + } + + override def translateToPlan( + tableEnv: StreamTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataStream[Any] = { +val config = tableEnv.getConfig + +val groupingKeys = grouping.indices.toArray +// add grouping fields, position keys in the input, and input type +val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates, + inputType, getRowType, grouping, config) + +val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82171490 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala --- @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.api.table.expressions.{Expression, Literal} +import org.apache.flink.api.table.plan.logical._ +import org.apache.flink.api.table.plan.nodes.FlinkAggregate +import org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate.{createKeyedWindowedStream, createNonKeyedWindowedStream} +import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._ +import org.apache.flink.api.table.runtime.aggregate.{AggregateAllWindowFunction, AggregateUtil, AggregateWindowFunction} +import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval +import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, RowTypeInfo, TypeCheckUtils, TypeConverter} +import org.apache.flink.api.table.{FlinkTypeFactory, Row, StreamTableEnvironment} +import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream} +import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction} +import org.apache.flink.streaming.api.windowing.assigners._ +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow} + +import scala.collection.JavaConverters._ + +class DataStreamAggregate( +window: LogicalWindow, +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +rowRelDataType: RelDataType, +inputType: RelDataType, +grouping: Array[Int]) + extends SingleRel(cluster, traitSet, inputNode) + with FlinkAggregate + with DataStreamRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamAggregate( + window, + cluster, + traitSet, + inputs.get(0), + namedAggregates, + getRowType, + inputType, + grouping) + } + + override def toString: String = { +s"Aggregate(${ if (!grouping.isEmpty) { + s"groupBy: (${groupingToString(inputType, grouping)}), " +} else { + "" +}}window: ($window), " + + s"select: (${aggregationToString(inputType, grouping, getRowType, namedAggregates)}))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty) + .item("window", window) + .item("select", aggregationToString(inputType, grouping, getRowType, namedAggregates)) + } + + override def translateToPlan( + tableEnv: StreamTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataStream[Any] = { +val config = tableEnv.getConfig + +val groupingKeys = grouping.indices.toArray +// add grouping fields, position keys in the input, and input type +val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates, --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82170972 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala --- @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table + +import org.apache.flink.api.table.expressions.{Expression, ExpressionParser} +import org.apache.flink.api.table.plan.logical._ + +/** + * Group-window specification. Group-windows allow aggregates which are computed for a group of + * elements. A (time or row-count) window is required to bound the infinite input stream into a + * finite group. Group-windows are evaluated once per group. + */ +trait GroupWindow { + + /** +* Converts an API class to a logical window for planning. This is an internal method. +*/ + private[flink] def toLogicalWindow: LogicalWindow +} + +/** + * A group-window operating on event-time. + * + * @param timeField defines the time mode for streaming tables and acts as a time attribute for + * batch tables over which the query is evaluated. + */ +abstract class EventTimeWindow(timeField: Expression) extends GroupWindow { + + protected var name: Option[Expression] = None + + /** +* Assigns an alias for this window that the following `select()` clause can refer to in order +* to access window properties such as window start or end time. +* +* @param alias alias for this window +* @return this window +*/ + def as(alias: Expression): EventTimeWindow = { +this.name = Some(alias) +this + } + + /** +* Assigns an alias for this window that the following `select()` clause can refer to in order +* to access window properties such as window start or end time. +* +* @param alias alias for this window +* @return this window +*/ + def as(alias: String): EventTimeWindow = as(ExpressionParser.parseExpression(alias)) +} + +// +// Tumbling group-windows +// + +/** + * Tumbling group-window. By default, it works on processing-time. + * + * @param size size of the window either as number of rows or interval of milliseconds + */ +class TumblingWindow(size: Expression) extends GroupWindow { + + /** +* Tumbling group-window. By default, it works on processing-time. +* +* @param size size of the window either as number of rows or interval of milliseconds +*/ + def this(size: String) = this(ExpressionParser.parseExpression(size)) + + private var alias: Option[Expression] = None + + /** +* Defines the time mode for streaming tables and specifies a time attribute for +* batch tables over which the query is evaluated. +* +* @param timeField time mode for streaming tables and time attribute for batch tables +* @return a tumbling group-window on event-time +*/ + def on(timeField: Expression): TumblingEventTimeWindow = +new TumblingEventTimeWindow(alias, timeField, size) + + /** +* Defines the time mode for streaming tables and specifies a time attribute for +* batch tables over which the query is evaluated. +* +* @param timeField time mode for streaming tables and time attribute for batch tables +* @return a tumbling group-window on event-time +*/ + def on(timeField: String): TumblingEventTimeWindow = +on(ExpressionParser.parseExpression(timeField)) + + /** +* Assigns an alias for this window that the following `select()` clause can refer to in order +* to access window properties suc
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82170078 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala --- @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table + +import org.apache.flink.api.table.expressions.{Expression, ExpressionParser} +import org.apache.flink.api.table.plan.logical._ + +/** + * Group-window specification. Group-windows allow aggregates which are computed for a group of + * elements. A (time or row-count) window is required to bound the infinite input stream into a + * finite group. Group-windows are evaluated once per group. + */ +trait GroupWindow { + + /** +* Converts an API class to a logical window for planning. This is an internal method. +*/ + private[flink] def toLogicalWindow: LogicalWindow +} + +/** + * A group-window operating on event-time. + * + * @param timeField defines the time mode for streaming tables and acts as a time attribute for + * batch tables over which the query is evaluated. + */ +abstract class EventTimeWindow(timeField: Expression) extends GroupWindow { + + protected var name: Option[Expression] = None + + /** +* Assigns an alias for this window that the following `select()` clause can refer to in order +* to access window properties such as window start or end time. +* +* @param alias alias for this window +* @return this window +*/ + def as(alias: Expression): EventTimeWindow = { +this.name = Some(alias) +this + } + + /** +* Assigns an alias for this window that the following `select()` clause can refer to in order +* to access window properties such as window start or end time. +* +* @param alias alias for this window +* @return this window +*/ + def as(alias: String): EventTimeWindow = as(ExpressionParser.parseExpression(alias)) +} + +// +// Tumbling group-windows +// + +/** + * Tumbling group-window. By default, it works on processing-time. + * + * @param size size of the window either as number of rows or interval of milliseconds + */ +class TumblingWindow(size: Expression) extends GroupWindow { + + /** +* Tumbling group-window. By default, it works on processing-time. +* +* @param size size of the window either as number of rows or interval of milliseconds +*/ + def this(size: String) = this(ExpressionParser.parseExpression(size)) + + private var alias: Option[Expression] = None + + /** +* Defines the time mode for streaming tables and specifies a time attribute for +* batch tables over which the query is evaluated. +* +* @param timeField time mode for streaming tables and time attribute for batch tables +* @return a tumbling group-window on event-time +*/ + def on(timeField: Expression): TumblingEventTimeWindow = +new TumblingEventTimeWindow(alias, timeField, size) + + /** +* Defines the time mode for streaming tables and specifies a time attribute for +* batch tables over which the query is evaluated. +* +* @param timeField time mode for streaming tables and time attribute for batch tables +* @return a tumbling group-window on event-time +*/ + def on(timeField: String): TumblingEventTimeWindow = +on(ExpressionParser.parseExpression(timeField)) + + /** +* Assigns an alias for this window that the following `select()` clause can refer to in order +* to access window properties suc
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82147739 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala --- @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.api.table.expressions.{Expression, Literal} +import org.apache.flink.api.table.plan.logical._ +import org.apache.flink.api.table.plan.nodes.FlinkAggregate +import org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate.{createKeyedWindowedStream, createNonKeyedWindowedStream} +import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._ +import org.apache.flink.api.table.runtime.aggregate.{AggregateAllWindowFunction, AggregateUtil, AggregateWindowFunction} +import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval +import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, RowTypeInfo, TypeCheckUtils, TypeConverter} +import org.apache.flink.api.table.{FlinkTypeFactory, Row, StreamTableEnvironment} +import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream} +import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction} +import org.apache.flink.streaming.api.windowing.assigners._ +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow} + +import scala.collection.JavaConverters._ + +class DataStreamAggregate( +window: LogicalWindow, +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +rowRelDataType: RelDataType, +inputType: RelDataType, +grouping: Array[Int]) + extends SingleRel(cluster, traitSet, inputNode) + with FlinkAggregate + with DataStreamRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamAggregate( + window, + cluster, + traitSet, + inputs.get(0), + namedAggregates, + getRowType, + inputType, + grouping) + } + + override def toString: String = { +s"Aggregate(${ if (!grouping.isEmpty) { + s"groupBy: (${groupingToString(inputType, grouping)}), " +} else { + "" +}}window: ($window), " + + s"select: (${aggregationToString(inputType, grouping, getRowType, namedAggregates)}))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty) + .item("window", window) + .item("select", aggregationToString(inputType, grouping, getRowType, namedAggregates)) + } + + override def translateToPlan( + tableEnv: StreamTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataStream[Any] = { +val config = tableEnv.getConfig + +val groupingKeys = grouping.indices.toArray +// add grouping fields, position keys in the input, and input type +val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates, + inputType, getRowType, grouping, config) + +val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82151066 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.{CombineFunction, RichGroupReduceFunction} +import org.apache.flink.api.table.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.{Collector, Preconditions} + +import scala.collection.JavaConversions._ + + +/** + * It wraps the aggregate logic inside of + * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and + * [[org.apache.flink.api.java.operators.GroupCombineOperator]] + * + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + * @param aggregateMapping The index mapping between aggregate function list and aggregated value + * index in output Row. + */ +class AggregateReduceCombineFunction( +private val aggregates: Array[Aggregate[_ <: Any]], +private val groupKeysMapping: Array[(Int, Int)], +private val aggregateMapping: Array[(Int, Int)], +private val intermediateRowArity: Int) +extends RichGroupReduceFunction[Row, Row] with CombineFunction[Row, Row] { + + private var aggregateBuffer: Row = _ + private var output: Row = _ + private var aggContext: AggContext = _ + + /** +* Sets a new aggregation context used for [[Aggregate.evaluate()]]. +*/ + def setAggContext(aggContext: AggContext): Unit = { +this.aggContext = aggContext + } + + override def open(config: Configuration): Unit = { +Preconditions.checkNotNull(aggregates) +Preconditions.checkNotNull(groupKeysMapping) +val finalRowLength: Int = groupKeysMapping.length + aggregateMapping.length +aggregateBuffer = new Row(intermediateRowArity) +output = new Row(finalRowLength) +aggContext = new AggContext + } + + /** + * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer, + * calculate aggregated values output by aggregate buffer, and set them into output + * Row based on the mapping relation between intermediate aggregate Row and output Row. + * + * @param records Grouped intermediate aggregate Rows iterator. + * @param out The collector to hand results to. + * + */ + override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { + +// Initiate intermediate aggregate value. +aggregates.foreach(_.initiate(aggregateBuffer)) + +// Merge intermediate aggregate value to buffer. +var last: Row = null +records.foreach((record) => { + aggregates.foreach(_.merge(record, aggregateBuffer)) + last = record +}) + +// Set group keys value to final output. +groupKeysMapping.map { + case (after, previous) => +output.setField(after, last.productElement(previous)) +} + +// Evaluate final aggregate value and set to output. +aggregateMapping.map { --- End diff -- use `foreach` instead of `map` because `setField` returns `Unit`. (can be fixed in `AggregateReduceGroupFunction` as well) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA tic
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82144274 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala --- @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.api.table.expressions.{Expression, Literal} +import org.apache.flink.api.table.plan.logical._ +import org.apache.flink.api.table.plan.nodes.FlinkAggregate +import org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate.{createKeyedWindowedStream, createNonKeyedWindowedStream} +import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._ +import org.apache.flink.api.table.runtime.aggregate.{AggregateAllWindowFunction, AggregateUtil, AggregateWindowFunction} +import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval +import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, RowTypeInfo, TypeCheckUtils, TypeConverter} +import org.apache.flink.api.table.{FlinkTypeFactory, Row, StreamTableEnvironment} +import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream} +import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction} +import org.apache.flink.streaming.api.windowing.assigners._ +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow} + +import scala.collection.JavaConverters._ + +class DataStreamAggregate( +window: LogicalWindow, +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +rowRelDataType: RelDataType, +inputType: RelDataType, +grouping: Array[Int]) + extends SingleRel(cluster, traitSet, inputNode) + with FlinkAggregate + with DataStreamRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamAggregate( + window, + cluster, + traitSet, + inputs.get(0), + namedAggregates, + getRowType, + inputType, + grouping) + } + + override def toString: String = { +s"Aggregate(${ if (!grouping.isEmpty) { + s"groupBy: (${groupingToString(inputType, grouping)}), " +} else { + "" +}}window: ($window), " + + s"select: (${aggregationToString(inputType, grouping, getRowType, namedAggregates)}))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty) + .item("window", window) + .item("select", aggregationToString(inputType, grouping, getRowType, namedAggregates)) + } + + override def translateToPlan( + tableEnv: StreamTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataStream[Any] = { +val config = tableEnv.getConfig + +val groupingKeys = grouping.indices.toArray +// add grouping fields, position keys in the input, and input type +val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates, --- End diff -- Style: break argument list into line-wise args. --- If your project is set up for it, you can reply
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82152829 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala --- @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.api.table.expressions.{Expression, Literal} +import org.apache.flink.api.table.plan.logical._ +import org.apache.flink.api.table.plan.nodes.FlinkAggregate +import org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate.{createKeyedWindowedStream, createNonKeyedWindowedStream} +import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._ +import org.apache.flink.api.table.runtime.aggregate.{AggregateAllWindowFunction, AggregateUtil, AggregateWindowFunction} +import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval +import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, RowTypeInfo, TypeCheckUtils, TypeConverter} +import org.apache.flink.api.table.{FlinkTypeFactory, Row, StreamTableEnvironment} +import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream} +import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction} +import org.apache.flink.streaming.api.windowing.assigners._ +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow} + +import scala.collection.JavaConverters._ + +class DataStreamAggregate( +window: LogicalWindow, +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +rowRelDataType: RelDataType, +inputType: RelDataType, +grouping: Array[Int]) + extends SingleRel(cluster, traitSet, inputNode) + with FlinkAggregate + with DataStreamRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamAggregate( + window, + cluster, + traitSet, + inputs.get(0), + namedAggregates, + getRowType, + inputType, + grouping) + } + + override def toString: String = { +s"Aggregate(${ if (!grouping.isEmpty) { + s"groupBy: (${groupingToString(inputType, grouping)}), " +} else { + "" +}}window: ($window), " + + s"select: (${aggregationToString(inputType, grouping, getRowType, namedAggregates)}))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty) + .item("window", window) + .item("select", aggregationToString(inputType, grouping, getRowType, namedAggregates)) + } + + override def translateToPlan( + tableEnv: StreamTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataStream[Any] = { +val config = tableEnv.getConfig + +val groupingKeys = grouping.indices.toArray +// add grouping fields, position keys in the input, and input type +val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates, + inputType, getRowType, grouping, config) + +val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82147669 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala --- @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.api.table.expressions.{Expression, Literal} +import org.apache.flink.api.table.plan.logical._ +import org.apache.flink.api.table.plan.nodes.FlinkAggregate +import org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate.{createKeyedWindowedStream, createNonKeyedWindowedStream} +import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._ +import org.apache.flink.api.table.runtime.aggregate.{AggregateAllWindowFunction, AggregateUtil, AggregateWindowFunction} +import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval +import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, RowTypeInfo, TypeCheckUtils, TypeConverter} +import org.apache.flink.api.table.{FlinkTypeFactory, Row, StreamTableEnvironment} +import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream} +import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction} +import org.apache.flink.streaming.api.windowing.assigners._ +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow} + +import scala.collection.JavaConverters._ + +class DataStreamAggregate( +window: LogicalWindow, +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +rowRelDataType: RelDataType, +inputType: RelDataType, +grouping: Array[Int]) + extends SingleRel(cluster, traitSet, inputNode) + with FlinkAggregate + with DataStreamRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamAggregate( + window, + cluster, + traitSet, + inputs.get(0), + namedAggregates, + getRowType, + inputType, + grouping) + } + + override def toString: String = { +s"Aggregate(${ if (!grouping.isEmpty) { + s"groupBy: (${groupingToString(inputType, grouping)}), " +} else { + "" +}}window: ($window), " + + s"select: (${aggregationToString(inputType, grouping, getRowType, namedAggregates)}))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty) + .item("window", window) + .item("select", aggregationToString(inputType, grouping, getRowType, namedAggregates)) + } + + override def translateToPlan( + tableEnv: StreamTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataStream[Any] = { +val config = tableEnv.getConfig + +val groupingKeys = grouping.indices.toArray +// add grouping fields, position keys in the input, and input type +val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates, + inputType, getRowType, grouping, config) + +val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82147584 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala --- @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.api.table.expressions.{Expression, Literal} +import org.apache.flink.api.table.plan.logical._ +import org.apache.flink.api.table.plan.nodes.FlinkAggregate +import org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate.{createKeyedWindowedStream, createNonKeyedWindowedStream} +import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._ +import org.apache.flink.api.table.runtime.aggregate.{AggregateAllWindowFunction, AggregateUtil, AggregateWindowFunction} +import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval +import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, RowTypeInfo, TypeCheckUtils, TypeConverter} +import org.apache.flink.api.table.{FlinkTypeFactory, Row, StreamTableEnvironment} +import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream} +import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction} +import org.apache.flink.streaming.api.windowing.assigners._ +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow} + +import scala.collection.JavaConverters._ + +class DataStreamAggregate( +window: LogicalWindow, +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +rowRelDataType: RelDataType, +inputType: RelDataType, +grouping: Array[Int]) + extends SingleRel(cluster, traitSet, inputNode) + with FlinkAggregate + with DataStreamRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamAggregate( + window, + cluster, + traitSet, + inputs.get(0), + namedAggregates, + getRowType, + inputType, + grouping) + } + + override def toString: String = { +s"Aggregate(${ if (!grouping.isEmpty) { + s"groupBy: (${groupingToString(inputType, grouping)}), " +} else { + "" +}}window: ($window), " + + s"select: (${aggregationToString(inputType, grouping, getRowType, namedAggregates)}))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty) + .item("window", window) + .item("select", aggregationToString(inputType, grouping, getRowType, namedAggregates)) + } + + override def translateToPlan( + tableEnv: StreamTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataStream[Any] = { +val config = tableEnv.getConfig + +val groupingKeys = grouping.indices.toArray +// add grouping fields, position keys in the input, and input type +val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates, + inputType, getRowType, grouping, config) + +val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82151050 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.{CombineFunction, RichGroupReduceFunction} +import org.apache.flink.api.table.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.{Collector, Preconditions} + +import scala.collection.JavaConversions._ + + +/** + * It wraps the aggregate logic inside of + * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and + * [[org.apache.flink.api.java.operators.GroupCombineOperator]] + * + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + * @param aggregateMapping The index mapping between aggregate function list and aggregated value + * index in output Row. + */ +class AggregateReduceCombineFunction( +private val aggregates: Array[Aggregate[_ <: Any]], +private val groupKeysMapping: Array[(Int, Int)], +private val aggregateMapping: Array[(Int, Int)], +private val intermediateRowArity: Int) +extends RichGroupReduceFunction[Row, Row] with CombineFunction[Row, Row] { + + private var aggregateBuffer: Row = _ + private var output: Row = _ + private var aggContext: AggContext = _ + + /** +* Sets a new aggregation context used for [[Aggregate.evaluate()]]. +*/ + def setAggContext(aggContext: AggContext): Unit = { +this.aggContext = aggContext + } + + override def open(config: Configuration): Unit = { +Preconditions.checkNotNull(aggregates) +Preconditions.checkNotNull(groupKeysMapping) +val finalRowLength: Int = groupKeysMapping.length + aggregateMapping.length +aggregateBuffer = new Row(intermediateRowArity) +output = new Row(finalRowLength) +aggContext = new AggContext + } + + /** + * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer, + * calculate aggregated values output by aggregate buffer, and set them into output + * Row based on the mapping relation between intermediate aggregate Row and output Row. + * + * @param records Grouped intermediate aggregate Rows iterator. + * @param out The collector to hand results to. + * + */ + override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { + +// Initiate intermediate aggregate value. +aggregates.foreach(_.initiate(aggregateBuffer)) + +// Merge intermediate aggregate value to buffer. +var last: Row = null +records.foreach((record) => { + aggregates.foreach(_.merge(record, aggregateBuffer)) + last = record +}) + +// Set group keys value to final output. +groupKeysMapping.map { --- End diff -- use `foreach` instead of `map` because `setField` returns `Unit`. (can be fixed in `AggregateReduceGroupFunction` as well) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82147461 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamAggregate.scala --- @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.nodes.datastream + +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.AggregateCall +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.api.table.expressions.{Expression, Literal} +import org.apache.flink.api.table.plan.logical._ +import org.apache.flink.api.table.plan.nodes.FlinkAggregate +import org.apache.flink.api.table.plan.nodes.datastream.DataStreamAggregate.{createKeyedWindowedStream, createNonKeyedWindowedStream} +import org.apache.flink.api.table.runtime.aggregate.AggregateUtil._ +import org.apache.flink.api.table.runtime.aggregate.{AggregateAllWindowFunction, AggregateUtil, AggregateWindowFunction} +import org.apache.flink.api.table.typeutils.TypeCheckUtils.isTimeInterval +import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, RowTypeInfo, TypeCheckUtils, TypeConverter} +import org.apache.flink.api.table.{FlinkTypeFactory, Row, StreamTableEnvironment} +import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream} +import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction} +import org.apache.flink.streaming.api.windowing.assigners._ +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow} + +import scala.collection.JavaConverters._ + +class DataStreamAggregate( +window: LogicalWindow, +cluster: RelOptCluster, +traitSet: RelTraitSet, +inputNode: RelNode, +namedAggregates: Seq[CalcitePair[AggregateCall, String]], +rowRelDataType: RelDataType, +inputType: RelDataType, +grouping: Array[Int]) + extends SingleRel(cluster, traitSet, inputNode) + with FlinkAggregate + with DataStreamRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { +new DataStreamAggregate( + window, + cluster, + traitSet, + inputs.get(0), + namedAggregates, + getRowType, + inputType, + grouping) + } + + override def toString: String = { +s"Aggregate(${ if (!grouping.isEmpty) { + s"groupBy: (${groupingToString(inputType, grouping)}), " +} else { + "" +}}window: ($window), " + + s"select: (${aggregationToString(inputType, grouping, getRowType, namedAggregates)}))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { +super.explainTerms(pw) + .itemIf("groupBy", groupingToString(inputType, grouping), !grouping.isEmpty) + .item("window", window) + .item("select", aggregationToString(inputType, grouping, getRowType, namedAggregates)) + } + + override def translateToPlan( + tableEnv: StreamTableEnvironment, + expectedType: Option[TypeInformation[Any]]): DataStream[Any] = { +val config = tableEnv.getConfig + +val groupingKeys = grouping.indices.toArray +// add grouping fields, position keys in the input, and input type +val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates, + inputType, getRowType, grouping, config) + +val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82151146 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/aggregate/AggregateReduceCombineFunction.scala --- @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime.aggregate + +import java.lang.Iterable + +import org.apache.flink.api.common.functions.{CombineFunction, RichGroupReduceFunction} +import org.apache.flink.api.table.Row +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.{Collector, Preconditions} + +import scala.collection.JavaConversions._ + + +/** + * It wraps the aggregate logic inside of + * [[org.apache.flink.api.java.operators.GroupReduceOperator]] and + * [[org.apache.flink.api.java.operators.GroupCombineOperator]] + * + * @param aggregates The aggregate functions. + * @param groupKeysMapping The index mapping of group keys between intermediate aggregate Row + * and output Row. + * @param aggregateMapping The index mapping between aggregate function list and aggregated value + * index in output Row. + */ +class AggregateReduceCombineFunction( +private val aggregates: Array[Aggregate[_ <: Any]], +private val groupKeysMapping: Array[(Int, Int)], +private val aggregateMapping: Array[(Int, Int)], +private val intermediateRowArity: Int) +extends RichGroupReduceFunction[Row, Row] with CombineFunction[Row, Row] { + + private var aggregateBuffer: Row = _ + private var output: Row = _ + private var aggContext: AggContext = _ + + /** +* Sets a new aggregation context used for [[Aggregate.evaluate()]]. +*/ + def setAggContext(aggContext: AggContext): Unit = { +this.aggContext = aggContext + } + + override def open(config: Configuration): Unit = { +Preconditions.checkNotNull(aggregates) +Preconditions.checkNotNull(groupKeysMapping) +val finalRowLength: Int = groupKeysMapping.length + aggregateMapping.length +aggregateBuffer = new Row(intermediateRowArity) +output = new Row(finalRowLength) +aggContext = new AggContext + } + + /** + * For grouped intermediate aggregate Rows, merge all of them into aggregate buffer, + * calculate aggregated values output by aggregate buffer, and set them into output + * Row based on the mapping relation between intermediate aggregate Row and output Row. + * + * @param records Grouped intermediate aggregate Rows iterator. + * @param out The collector to hand results to. + * + */ + override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { + +// Initiate intermediate aggregate value. +aggregates.foreach(_.initiate(aggregateBuffer)) + +// Merge intermediate aggregate value to buffer. +var last: Row = null +records.foreach((record) => { + aggregates.foreach(_.merge(record, aggregateBuffer)) + last = record +}) + +// Set group keys value to final output. +groupKeysMapping.map { + case (after, previous) => +output.setField(after, last.productElement(previous)) +} + +// Evaluate final aggregate value and set to output. +aggregateMapping.map { + case (after, previous) => +output.setField(after, aggregates(previous).evaluate(aggregateBuffer, aggContext)) +} + +out.collect(output) + } + + /** + * For sub-grouped intermediate aggregate Rows, merge all of them into aggregate buffer, + * + * @param records Sub-grouped intermediate aggregate Rows iterator. + * @return Combined intermediate aggregate Row. + * +
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82150960 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/groupWindows.scala --- @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.logical + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.table.{StreamTableEnvironment, TableEnvironment} +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, TypeCoercion} +import org.apache.flink.api.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} + +abstract class EventTimeGroupWindow( +name: Option[Expression], +time: Expression) + extends LogicalWindow(name) { + + override def validate(tableEnv: TableEnvironment): ValidationResult = { +val valid = super.validate(tableEnv) +if (valid.isFailure) { +return valid +} + +if (tableEnv.isInstanceOf[StreamTableEnvironment]) { + val valid = time match { +case RowtimeAttribute() => + ValidationSuccess +case _ => + ValidationFailure("Event-time window expects a 'rowtime' time field.") + } + if (valid.isFailure) { +return valid + } +} +if (!TypeCoercion.canCast(time.resultType, BasicTypeInfo.LONG_TYPE_INFO)) { + ValidationFailure(s"Event-time window expects a time field that can be safely cast " + +s"to Long, but is ${time.resultType}") +} else { + ValidationSuccess +} + } +} + +abstract class ProcessingTimeGroupWindow(name: Option[Expression]) extends LogicalWindow(name) + +// +// Tumbling group windows +// + +object TumblingGroupWindow { + def validate(tableEnv: TableEnvironment, size: Expression): ValidationResult = size match { +case Literal(_, IntervalTypeInfo.INTERVAL_MILLIS) => + ValidationSuccess +case Literal(_, BasicTypeInfo.LONG_TYPE_INFO) | Literal(_, BasicTypeInfo.INT_TYPE_INFO) => + ValidationSuccess +case _ => + ValidationFailure( +"Tumbling window expects size literal of type Interval of Milliseconds or Long/Integer.") + } +} + +case class ProcessingTimeTumblingGroupWindow( +name: Option[Expression], +size: Expression) + extends ProcessingTimeGroupWindow(name) { + + override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow = +ProcessingTimeTumblingGroupWindow( + name.map(resolve), + resolve(size)) + + override def validate(tableEnv: TableEnvironment): ValidationResult = +super.validate(tableEnv).orElse(TumblingGroupWindow.validate(tableEnv, size)) + + override def toString: String = s"ProcessingTimeTumblingGroupWindow($name, $size)" +} + +case class EventTimeTumblingGroupWindow( +name: Option[Expression], +timeField: Expression, +size: Expression) + extends EventTimeGroupWindow( +name, +timeField) { + + override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow = +EventTimeTumblingGroupWindow( + name.map(resolve), + resolve(timeField), + resolve(size)) + + override def validate(tableEnv: TableEnvironment): ValidationResult = +super.validate(tableEnv).orElse(TumblingGroupWindow.validate(tableEnv, size)) + + override def toString: String = s"EventTimeTumblingGroupWindow($name, $timeField, $size)" +} + +// -
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82150850 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/groupWindows.scala --- @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.logical + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.table.{StreamTableEnvironment, TableEnvironment} +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, TypeCoercion} +import org.apache.flink.api.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} + +abstract class EventTimeGroupWindow( +name: Option[Expression], +time: Expression) + extends LogicalWindow(name) { + + override def validate(tableEnv: TableEnvironment): ValidationResult = { +val valid = super.validate(tableEnv) +if (valid.isFailure) { +return valid +} + +if (tableEnv.isInstanceOf[StreamTableEnvironment]) { + val valid = time match { +case RowtimeAttribute() => + ValidationSuccess +case _ => + ValidationFailure("Event-time window expects a 'rowtime' time field.") + } + if (valid.isFailure) { +return valid + } +} +if (!TypeCoercion.canCast(time.resultType, BasicTypeInfo.LONG_TYPE_INFO)) { --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82149847 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/Resolvable.scala --- @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.logical + +import org.apache.flink.api.table.expressions.Expression + +trait Resolvable[T <: AnyRef] { --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82149863 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/Resolvable.scala --- @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.logical + +import org.apache.flink.api.table.expressions.Expression + +trait Resolvable[T <: AnyRef] { + + def resolveExpressions(resolver: (Expression) => Expression): T --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82146791 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala --- @@ -497,3 +501,95 @@ case class LogicalRelNode( override def validate(tableEnv: TableEnvironment): LogicalNode = this } + +case class WindowAggregate( +groupingExpressions: Seq[Expression], +window: LogicalWindow, +aggregateExpressions: Seq[NamedExpression], +child: LogicalNode) + extends UnaryNode { + + override def output: Seq[Attribute] = { +(groupingExpressions ++ aggregateExpressions) map { + case ne: NamedExpression => ne.toAttribute + case e => Alias(e, e.toString).toAttribute +} + } + + override def resolveReference( --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82141352 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala --- @@ -55,28 +56,27 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend override def validate(tableEnv: TableEnvironment): LogicalNode = { val resolvedProject = super.validate(tableEnv).asInstanceOf[Project] +val names: mutable.Set[String] = mutable.Set() -def checkUniqueNames(exprs: Seq[Expression]): Unit = { - val names: mutable.Set[String] = mutable.Set() - exprs.foreach { -case n: Alias => - // explicit name - if (names.contains(n.name)) { -throw ValidationException(s"Duplicate field name ${n.name}.") - } else { -names.add(n.name) - } -case r: ResolvedFieldReference => - // simple field forwarding - if (names.contains(r.name)) { -throw ValidationException(s"Duplicate field name ${r.name}.") - } else { -names.add(r.name) - } -case _ => // Do nothing +def checkName(name: String): Unit = { + if (names.contains(name)) { +throw ValidationException(s"Duplicate field name $name.") --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82140542 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowITCase.scala --- @@ -0,0 +1,777 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark +import org.apache.flink.api.scala.stream.utils.StreamITCase +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.{Row, _} +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class GroupWindowITCase extends StreamingMultipleProgramsTestBase { + + @Test(expected = classOf[ValidationException]) + def testInvalidBatchWindow(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table + .groupBy('string) + .window(Session withGap 10.rows as 'string) + } + + @Test(expected = classOf[TableException]) + def testInvalidRowtime1(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'rowtime, 'int, 'string) + +table + .groupBy('string) + .window(Tumble over 50.milli) + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime2(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table + .groupBy('string) + .window(Tumble over 50.milli) + .select('string, 'int.count as 'rowtime) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime3(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table.as('rowtime, 'myint, 'mystring) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime4(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table + .groupBy('string) + .window(Tumble over 50.milli on 'string) + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidTumblingSize(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82133014 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowITCase.scala --- @@ -0,0 +1,777 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark +import org.apache.flink.api.scala.stream.utils.StreamITCase +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.{Row, _} +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class GroupWindowITCase extends StreamingMultipleProgramsTestBase { + + @Test(expected = classOf[ValidationException]) + def testInvalidBatchWindow(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table + .groupBy('string) + .window(Session withGap 10.rows as 'string) + } + + @Test(expected = classOf[TableException]) + def testInvalidRowtime1(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'rowtime, 'int, 'string) + +table + .groupBy('string) + .window(Tumble over 50.milli) + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime2(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table + .groupBy('string) + .window(Tumble over 50.milli) + .select('string, 'int.count as 'rowtime) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime3(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table.as('rowtime, 'myint, 'mystring) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime4(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table + .groupBy('string) + .window(Tumble over 50.milli on 'string) + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidTumblingSize(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82132367 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowITCase.scala --- @@ -0,0 +1,777 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark +import org.apache.flink.api.scala.stream.utils.StreamITCase +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.{Row, _} +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class GroupWindowITCase extends StreamingMultipleProgramsTestBase { + + @Test(expected = classOf[ValidationException]) + def testInvalidBatchWindow(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table + .groupBy('string) + .window(Session withGap 10.rows as 'string) + } + + @Test(expected = classOf[TableException]) + def testInvalidRowtime1(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'rowtime, 'int, 'string) + +table + .groupBy('string) + .window(Tumble over 50.milli) + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime2(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table + .groupBy('string) + .window(Tumble over 50.milli) + .select('string, 'int.count as 'rowtime) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime3(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table.as('rowtime, 'myint, 'mystring) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime4(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table + .groupBy('string) + .window(Tumble over 50.milli on 'string) + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidTumblingSize(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82132349 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowITCase.scala --- @@ -0,0 +1,777 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark +import org.apache.flink.api.scala.stream.utils.StreamITCase +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.{Row, _} +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class GroupWindowITCase extends StreamingMultipleProgramsTestBase { + + @Test(expected = classOf[ValidationException]) + def testInvalidBatchWindow(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table + .groupBy('string) + .window(Session withGap 10.rows as 'string) + } + + @Test(expected = classOf[TableException]) + def testInvalidRowtime1(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'rowtime, 'int, 'string) + +table + .groupBy('string) + .window(Tumble over 50.milli) + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime2(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table + .groupBy('string) + .window(Tumble over 50.milli) + .select('string, 'int.count as 'rowtime) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime3(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table.as('rowtime, 'myint, 'mystring) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime4(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table + .groupBy('string) + .window(Tumble over 50.milli on 'string) + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidTumblingSize(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82131749 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowITCase.scala --- @@ -0,0 +1,777 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark +import org.apache.flink.api.scala.stream.utils.StreamITCase +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.{Row, _} +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class GroupWindowITCase extends StreamingMultipleProgramsTestBase { + + @Test(expected = classOf[ValidationException]) + def testInvalidBatchWindow(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82129916 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala --- @@ -325,33 +337,42 @@ trait ImplicitExpressionOperations { */ def day = toMilliInterval(expr, MILLIS_PER_DAY) -/** + /** * Creates an interval of the given number of hours. * * @return interval of milliseconds */ def hour = toMilliInterval(expr, MILLIS_PER_HOUR) -/** + /** * Creates an interval of the given number of minutes. * * @return interval of milliseconds */ def minute = toMilliInterval(expr, MILLIS_PER_MINUTE) -/** + /** * Creates an interval of the given number of seconds. * * @return interval of milliseconds */ def second = toMilliInterval(expr, MILLIS_PER_SECOND) -/** + /** * Creates an interval of the given number of milliseconds. * * @return interval of milliseconds */ def milli = toMilliInterval(expr, 1) + + // row type + + /** +* Creates a number defining an amount of rows. +* +* @return number of rows +*/ + def rows = expr --- End diff -- I'm still not sure if we need an additional data type for that. Actually we could also just allow `Int` and `Long` and remove the `rows` completely. It is just syntactic sugar without additional advantage. It might also be good if we would allow constant arithmetic in future, such as `10*10.rows`. But if you think a row interval makes sense, I will add it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82009076 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/windowDsl.scala --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table + +import org.apache.flink.api.table.expressions.Expression +import org.apache.flink.api.table.{SessionWindow, SlideWithSize, TumblingWindow} + +/** + * Helper object for creating a tumbling window. In a tumbling window elements are assigned to + * fixed length, non-overlapping windows of a specified window size. For example, if you specify a + * window size of 5 minutes, the following operation will get 5 minutes worth of elements in + * each invocation. + */ +object Tumble { + + /** +* Creates a tumbling window. In a tumbling window elements are assigned to fixed length, +* non-overlapping windows of a specified window size. For example, if you specify a window +* size of 5 minutes, the following operation will get 5 minutes worth of elements in --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82008309 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/windowDsl.scala --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table + +import org.apache.flink.api.table.expressions.Expression +import org.apache.flink.api.table.{SessionWindow, SlideWithSize, TumblingWindow} + +/** + * Helper object for creating a tumbling window. In a tumbling window elements are assigned to + * fixed length, non-overlapping windows of a specified window size. For example, if you specify a + * window size of 5 minutes, the following operation will get 5 minutes worth of elements in + * each invocation. + */ +object Tumble { + + /** +* Creates a tumbling window. In a tumbling window elements are assigned to fixed length, +* non-overlapping windows of a specified window size. For example, if you specify a window --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82006797 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala --- @@ -50,6 +50,10 @@ object WordCountTable { .filter('frequency === 2) .toDataSet[WC] +val w = Slide.over(23).every(23).toLogicalWindow.toString --- End diff -- Sorry. I haven't seen that. Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82006252 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala --- @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table + +import org.apache.flink.api.table.expressions.{Expression, ExpressionParser} +import org.apache.flink.api.table.plan.logical._ + +/** + * Group-window specification. Group-windows allow aggregates which are computed for a group of + * elements. A (time or row-count) window is required to bound the infinite input stream into a + * finite group. Group-windows are evaluated once per group. + */ +trait GroupWindow { + + /** +* Converts an API class to a logical window for planning. This is an internal method. +*/ + private[flink] def toLogicalWindow: LogicalWindow +} + +/** + * A group-window operating on event-time. + * + * @param timeField defines the time mode for streaming tables and acts as a time attribute for + * batch tables over which the query is evaluated. + */ +abstract class EventTimeWindow(timeField: Expression) extends GroupWindow { + + protected var name: Option[Expression] = None + + /** +* Assigns an alias for this window that the following `select()` clause can refer to in order +* to access window properties such as window start or end time. +* +* @param alias alias for this window +* @return this window +*/ + def as(alias: Expression): EventTimeWindow = { +this.name = Some(alias) +this + } + + /** +* Assigns an alias for this window that the following `select()` clause can refer to in order +* to access window properties such as window start or end time. +* +* @param alias alias for this window +* @return this window +*/ + def as(alias: String): EventTimeWindow = as(ExpressionParser.parseExpression(alias)) +} + +// +// Tumbling group-windows +// + +/** + * Tumbling group-window. By default, it works on processing-time. + * + * @param size size of the window either as number of rows or interval of milliseconds + */ +class TumblingWindow(size: Expression) extends GroupWindow { + + /** +* Tumbling group-window. By default, it works on processing-time. +* +* @param size size of the window either as number of rows or interval of milliseconds +*/ + def this(size: String) = this(ExpressionParser.parseExpression(size)) + + private var alias: Option[Expression] = None + + /** +* Defines the time mode for streaming tables and specifies a time attribute for +* batch tables over which the query is evaluated. +* +* @param timeField time mode for streaming tables and time attribute for batch tables +* @return a tumbling group-window on event-time +*/ + def on(timeField: Expression): TumblingEventTimeWindow = +new TumblingEventTimeWindow(alias, timeField, size) + + /** +* Defines the time mode for streaming tables and specifies a time attribute for +* batch tables over which the query is evaluated. +* +* @param timeField time mode for streaming tables and time attribute for batch tables +* @return a tumbling group-window on event-time +*/ + def on(timeField: String): TumblingEventTimeWindow = +on(ExpressionParser.parseExpression(timeField)) + + /** +* Assigns an alias for this window that the following `select()` clause can refer to in order +* to access window properties suc
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r82002196 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala --- @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table + +import org.apache.flink.api.table.expressions.{Expression, ExpressionParser} +import org.apache.flink.api.table.plan.logical._ + +/** + * Group-window specification. Group-windows allow aggregates which are computed for a group of + * elements. A (time or row-count) window is required to bound the infinite input stream into a + * finite group. Group-windows are evaluated once per group. + */ +trait GroupWindow { + + /** +* Converts an API class to a logical window for planning. This is an internal method. +*/ + private[flink] def toLogicalWindow: LogicalWindow +} + +/** + * A group-window operating on event-time. + * + * @param timeField defines the time mode for streaming tables and acts as a time attribute for + * batch tables over which the query is evaluated. + */ +abstract class EventTimeWindow(timeField: Expression) extends GroupWindow { + + protected var name: Option[Expression] = None + + /** +* Assigns an alias for this window that the following `select()` clause can refer to in order +* to access window properties such as window start or end time. +* +* @param alias alias for this window +* @return this window +*/ + def as(alias: Expression): EventTimeWindow = { +this.name = Some(alias) +this + } + + /** +* Assigns an alias for this window that the following `select()` clause can refer to in order +* to access window properties such as window start or end time. +* +* @param alias alias for this window +* @return this window +*/ + def as(alias: String): EventTimeWindow = as(ExpressionParser.parseExpression(alias)) +} + +// +// Tumbling group-windows +// + +/** + * Tumbling group-window. By default, it works on processing-time. + * + * @param size size of the window either as number of rows or interval of milliseconds + */ +class TumblingWindow(size: Expression) extends GroupWindow { + + /** +* Tumbling group-window. By default, it works on processing-time. +* +* @param size size of the window either as number of rows or interval of milliseconds +*/ + def this(size: String) = this(ExpressionParser.parseExpression(size)) + + private var alias: Option[Expression] = None + + /** +* Defines the time mode for streaming tables and specifies a time attribute for +* batch tables over which the query is evaluated. +* +* @param timeField time mode for streaming tables and time attribute for batch tables +* @return a tumbling group-window on event-time +*/ + def on(timeField: Expression): TumblingEventTimeWindow = +new TumblingEventTimeWindow(alias, timeField, size) + + /** +* Defines the time mode for streaming tables and specifies a time attribute for +* batch tables over which the query is evaluated. +* +* @param timeField time mode for streaming tables and time attribute for batch tables +* @return a tumbling group-window on event-time +*/ + def on(timeField: String): TumblingEventTimeWindow = +on(ExpressionParser.parseExpression(timeField)) + + /** +* Assigns an alias for this window that the following `select()` clause can refer to in order +* to access window properties suc
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r81998716 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala --- @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table + +import org.apache.flink.api.table.expressions.{Expression, ExpressionParser} +import org.apache.flink.api.table.plan.logical._ + +/** + * Group-window specification. Group-windows allow aggregates which are computed for a group of + * elements. A (time or row-count) window is required to bound the infinite input stream into a + * finite group. Group-windows are evaluated once per group. + */ +trait GroupWindow { + + /** +* Converts an API class to a logical window for planning. This is an internal method. +*/ + private[flink] def toLogicalWindow: LogicalWindow +} + +/** + * A group-window operating on event-time. + * + * @param timeField defines the time mode for streaming tables and acts as a time attribute for + * batch tables over which the query is evaluated. + */ +abstract class EventTimeWindow(timeField: Expression) extends GroupWindow { + + protected var name: Option[Expression] = None + + /** +* Assigns an alias for this window that the following `select()` clause can refer to in order +* to access window properties such as window start or end time. +* +* @param alias alias for this window +* @return this window +*/ + def as(alias: Expression): EventTimeWindow = { +this.name = Some(alias) +this + } + + /** +* Assigns an alias for this window that the following `select()` clause can refer to in order +* to access window properties such as window start or end time. +* +* @param alias alias for this window +* @return this window +*/ + def as(alias: String): EventTimeWindow = as(ExpressionParser.parseExpression(alias)) +} + +// +// Tumbling group-windows +// + +/** + * Tumbling group-window. By default, it works on processing-time. + * + * @param size size of the window either as number of rows or interval of milliseconds + */ +class TumblingWindow(size: Expression) extends GroupWindow { + + /** +* Tumbling group-window. By default, it works on processing-time. +* +* @param size size of the window either as number of rows or interval of milliseconds +*/ + def this(size: String) = this(ExpressionParser.parseExpression(size)) + + private var alias: Option[Expression] = None + + /** +* Defines the time mode for streaming tables and specifies a time attribute for +* batch tables over which the query is evaluated. +* +* @param timeField time mode for streaming tables and time attribute for batch tables +* @return a tumbling group-window on event-time +*/ + def on(timeField: Expression): TumblingEventTimeWindow = +new TumblingEventTimeWindow(alias, timeField, size) + + /** +* Defines the time mode for streaming tables and specifies a time attribute for +* batch tables over which the query is evaluated. +* +* @param timeField time mode for streaming tables and time attribute for batch tables +* @return a tumbling group-window on event-time +*/ + def on(timeField: String): TumblingEventTimeWindow = +on(ExpressionParser.parseExpression(timeField)) + + /** +* Assigns an alias for this window that the following `select()` clause can refer to in order +* to access window properties suc
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r81997213 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala --- @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table + +import org.apache.flink.api.table.expressions.{Expression, ExpressionParser} +import org.apache.flink.api.table.plan.logical._ + +/** + * Group-window specification. Group-windows allow aggregates which are computed for a group of + * elements. A (time or row-count) window is required to bound the infinite input stream into a + * finite group. Group-windows are evaluated once per group. + */ +trait GroupWindow { + + /** +* Converts an API class to a logical window for planning. This is an internal method. +*/ + private[flink] def toLogicalWindow: LogicalWindow +} + +/** + * A group-window operating on event-time. + * + * @param timeField defines the time mode for streaming tables and acts as a time attribute for + * batch tables over which the query is evaluated. + */ +abstract class EventTimeWindow(timeField: Expression) extends GroupWindow { + + protected var name: Option[Expression] = None + + /** +* Assigns an alias for this window that the following `select()` clause can refer to in order +* to access window properties such as window start or end time. +* +* @param alias alias for this window +* @return this window +*/ + def as(alias: Expression): EventTimeWindow = { +this.name = Some(alias) +this + } + + /** +* Assigns an alias for this window that the following `select()` clause can refer to in order +* to access window properties such as window start or end time. +* +* @param alias alias for this window +* @return this window +*/ + def as(alias: String): EventTimeWindow = as(ExpressionParser.parseExpression(alias)) +} + +// +// Tumbling group-windows +// + +/** + * Tumbling group-window. By default, it works on processing-time. --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r81858699 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowITCase.scala --- @@ -0,0 +1,777 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark +import org.apache.flink.api.scala.stream.utils.StreamITCase +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.{Row, _} +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class GroupWindowITCase extends StreamingMultipleProgramsTestBase { + + @Test(expected = classOf[ValidationException]) --- End diff -- I think it is a good idea to add a brief comment why a test is expected to fail --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r81851322 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala --- @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table + +import org.apache.flink.api.table.expressions.{Expression, ExpressionParser} +import org.apache.flink.api.table.plan.logical._ + +/** + * Group-window specification. Group-windows allow aggregates which are computed for a group of + * elements. A (time or row-count) window is required to bound the infinite input stream into a + * finite group. Group-windows are evaluated once per group. + */ +trait GroupWindow { + + /** +* Converts an API class to a logical window for planning. This is an internal method. +*/ + private[flink] def toLogicalWindow: LogicalWindow +} + +/** + * A group-window operating on event-time. + * + * @param timeField defines the time mode for streaming tables and acts as a time attribute for + * batch tables over which the query is evaluated. + */ +abstract class EventTimeWindow(timeField: Expression) extends GroupWindow { + + protected var name: Option[Expression] = None + + /** +* Assigns an alias for this window that the following `select()` clause can refer to in order +* to access window properties such as window start or end time. +* +* @param alias alias for this window +* @return this window +*/ + def as(alias: Expression): EventTimeWindow = { +this.name = Some(alias) +this + } + + /** +* Assigns an alias for this window that the following `select()` clause can refer to in order +* to access window properties such as window start or end time. +* +* @param alias alias for this window +* @return this window +*/ + def as(alias: String): EventTimeWindow = as(ExpressionParser.parseExpression(alias)) +} + +// +// Tumbling group-windows +// + +/** + * Tumbling group-window. By default, it works on processing-time. + * + * @param size size of the window either as number of rows or interval of milliseconds + */ +class TumblingWindow(size: Expression) extends GroupWindow { + + /** +* Tumbling group-window. By default, it works on processing-time. +* +* @param size size of the window either as number of rows or interval of milliseconds +*/ + def this(size: String) = this(ExpressionParser.parseExpression(size)) + + private var alias: Option[Expression] = None + + /** +* Defines the time mode for streaming tables and specifies a time attribute for --- End diff -- Use event-time mode for streaming tables by calling with `on('rowtime)`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r81862041 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowITCase.scala --- @@ -0,0 +1,777 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark +import org.apache.flink.api.scala.stream.utils.StreamITCase +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.{Row, _} +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class GroupWindowITCase extends StreamingMultipleProgramsTestBase { + + @Test(expected = classOf[ValidationException]) + def testInvalidBatchWindow(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table + .groupBy('string) + .window(Session withGap 10.rows as 'string) + } + + @Test(expected = classOf[TableException]) + def testInvalidRowtime1(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'rowtime, 'int, 'string) + +table + .groupBy('string) + .window(Tumble over 50.milli) + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime2(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table + .groupBy('string) + .window(Tumble over 50.milli) + .select('string, 'int.count as 'rowtime) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime3(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table.as('rowtime, 'myint, 'mystring) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime4(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table + .groupBy('string) + .window(Tumble over 50.milli on 'string) + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidTumblingSize(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r81857880 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala --- @@ -325,33 +337,42 @@ trait ImplicitExpressionOperations { */ def day = toMilliInterval(expr, MILLIS_PER_DAY) -/** + /** * Creates an interval of the given number of hours. * * @return interval of milliseconds */ def hour = toMilliInterval(expr, MILLIS_PER_HOUR) -/** + /** * Creates an interval of the given number of minutes. * * @return interval of milliseconds */ def minute = toMilliInterval(expr, MILLIS_PER_MINUTE) -/** + /** * Creates an interval of the given number of seconds. * * @return interval of milliseconds */ def second = toMilliInterval(expr, MILLIS_PER_SECOND) -/** + /** * Creates an interval of the given number of milliseconds. * * @return interval of milliseconds */ def milli = toMilliInterval(expr, 1) + + // row type + + /** +* Creates a number defining an amount of rows. +* +* @return number of rows +*/ + def rows = expr --- End diff -- Doesn't this allow to use `Int.row` wherever `Int` is allowed? Should we return a `RowInterval` expression? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r81971150 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/Resolvable.scala --- @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.logical + +import org.apache.flink.api.table.expressions.Expression + +trait Resolvable[T <: AnyRef] { --- End diff -- Add docs to explain the purpose of `Resolvable` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r81852219 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala --- @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table + +import org.apache.flink.api.table.expressions.{Expression, ExpressionParser} +import org.apache.flink.api.table.plan.logical._ + +/** + * Group-window specification. Group-windows allow aggregates which are computed for a group of + * elements. A (time or row-count) window is required to bound the infinite input stream into a + * finite group. Group-windows are evaluated once per group. + */ +trait GroupWindow { + + /** +* Converts an API class to a logical window for planning. This is an internal method. +*/ + private[flink] def toLogicalWindow: LogicalWindow +} + +/** + * A group-window operating on event-time. + * + * @param timeField defines the time mode for streaming tables and acts as a time attribute for + * batch tables over which the query is evaluated. + */ +abstract class EventTimeWindow(timeField: Expression) extends GroupWindow { + + protected var name: Option[Expression] = None + + /** +* Assigns an alias for this window that the following `select()` clause can refer to in order +* to access window properties such as window start or end time. +* +* @param alias alias for this window +* @return this window +*/ + def as(alias: Expression): EventTimeWindow = { +this.name = Some(alias) +this + } + + /** +* Assigns an alias for this window that the following `select()` clause can refer to in order +* to access window properties such as window start or end time. +* +* @param alias alias for this window +* @return this window +*/ + def as(alias: String): EventTimeWindow = as(ExpressionParser.parseExpression(alias)) +} + +// +// Tumbling group-windows +// + +/** + * Tumbling group-window. By default, it works on processing-time. + * + * @param size size of the window either as number of rows or interval of milliseconds + */ +class TumblingWindow(size: Expression) extends GroupWindow { + + /** +* Tumbling group-window. By default, it works on processing-time. +* +* @param size size of the window either as number of rows or interval of milliseconds +*/ + def this(size: String) = this(ExpressionParser.parseExpression(size)) + + private var alias: Option[Expression] = None + + /** +* Defines the time mode for streaming tables and specifies a time attribute for +* batch tables over which the query is evaluated. +* +* @param timeField time mode for streaming tables and time attribute for batch tables +* @return a tumbling group-window on event-time +*/ + def on(timeField: Expression): TumblingEventTimeWindow = +new TumblingEventTimeWindow(alias, timeField, size) + + /** +* Defines the time mode for streaming tables and specifies a time attribute for +* batch tables over which the query is evaluated. +* +* @param timeField time mode for streaming tables and time attribute for batch tables +* @return a tumbling group-window on event-time +*/ + def on(timeField: String): TumblingEventTimeWindow = +on(ExpressionParser.parseExpression(timeField)) + + /** +* Assigns an alias for this window that the following `select()` clause can refer to in order +* to access window properties suc
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r81860022 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowITCase.scala --- @@ -0,0 +1,777 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark +import org.apache.flink.api.scala.stream.utils.StreamITCase +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.{Row, _} +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class GroupWindowITCase extends StreamingMultipleProgramsTestBase { + + @Test(expected = classOf[ValidationException]) + def testInvalidBatchWindow(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table + .groupBy('string) + .window(Session withGap 10.rows as 'string) + } + + @Test(expected = classOf[TableException]) + def testInvalidRowtime1(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'rowtime, 'int, 'string) + +table + .groupBy('string) + .window(Tumble over 50.milli) + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime2(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table + .groupBy('string) + .window(Tumble over 50.milli) + .select('string, 'int.count as 'rowtime) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime3(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table.as('rowtime, 'myint, 'mystring) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime4(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table + .groupBy('string) + .window(Tumble over 50.milli on 'string) + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidTumblingSize(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r81971046 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/Resolvable.scala --- @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.logical + +import org.apache.flink.api.table.expressions.Expression + +trait Resolvable[T <: AnyRef] { + + def resolveExpressions(resolver: (Expression) => Expression): T --- End diff -- Add docs what this method is supposed to do. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r81945161 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala --- @@ -55,28 +56,27 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend override def validate(tableEnv: TableEnvironment): LogicalNode = { val resolvedProject = super.validate(tableEnv).asInstanceOf[Project] +val names: mutable.Set[String] = mutable.Set() -def checkUniqueNames(exprs: Seq[Expression]): Unit = { - val names: mutable.Set[String] = mutable.Set() - exprs.foreach { -case n: Alias => - // explicit name - if (names.contains(n.name)) { -throw ValidationException(s"Duplicate field name ${n.name}.") - } else { -names.add(n.name) - } -case r: ResolvedFieldReference => - // simple field forwarding - if (names.contains(r.name)) { -throw ValidationException(s"Duplicate field name ${r.name}.") - } else { -names.add(r.name) - } -case _ => // Do nothing +def checkName(name: String): Unit = { + if (names.contains(name)) { +throw ValidationException(s"Duplicate field name $name.") --- End diff -- use `failValidation()` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r81970900 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalWindow.scala --- @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.logical + +import org.apache.flink.api.table.TableEnvironment +import org.apache.flink.api.table.expressions.{Expression, WindowReference} +import org.apache.flink.api.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} + +abstract class LogicalWindow(val alias: Option[Expression]) extends Resolvable[LogicalWindow] { + def resolveExpressions(resolver: (Expression) => Expression): LogicalWindow = this --- End diff -- add newline --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r81957188 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala --- @@ -497,3 +501,95 @@ case class LogicalRelNode( override def validate(tableEnv: TableEnvironment): LogicalNode = this } + +case class WindowAggregate( +groupingExpressions: Seq[Expression], +window: LogicalWindow, +aggregateExpressions: Seq[NamedExpression], +child: LogicalNode) + extends UnaryNode { + + override def output: Seq[Attribute] = { +(groupingExpressions ++ aggregateExpressions) map { + case ne: NamedExpression => ne.toAttribute + case e => Alias(e, e.toString).toAttribute +} + } + + override def resolveReference( --- End diff -- please add inline comments to make the code easier to follow --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r81861775 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowITCase.scala --- @@ -0,0 +1,777 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark +import org.apache.flink.api.scala.stream.utils.StreamITCase +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.{Row, _} +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class GroupWindowITCase extends StreamingMultipleProgramsTestBase { + + @Test(expected = classOf[ValidationException]) + def testInvalidBatchWindow(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table + .groupBy('string) + .window(Session withGap 10.rows as 'string) + } + + @Test(expected = classOf[TableException]) + def testInvalidRowtime1(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'rowtime, 'int, 'string) + +table + .groupBy('string) + .window(Tumble over 50.milli) + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime2(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table + .groupBy('string) + .window(Tumble over 50.milli) + .select('string, 'int.count as 'rowtime) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime3(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table.as('rowtime, 'myint, 'mystring) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime4(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table + .groupBy('string) + .window(Tumble over 50.milli on 'string) + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidTumblingSize(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r81850803 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala --- @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table + +import org.apache.flink.api.table.expressions.{Expression, ExpressionParser} +import org.apache.flink.api.table.plan.logical._ + +/** + * Group-window specification. Group-windows allow aggregates which are computed for a group of + * elements. A (time or row-count) window is required to bound the infinite input stream into a + * finite group. Group-windows are evaluated once per group. + */ +trait GroupWindow { + + /** +* Converts an API class to a logical window for planning. This is an internal method. +*/ + private[flink] def toLogicalWindow: LogicalWindow +} + +/** + * A group-window operating on event-time. + * + * @param timeField defines the time mode for streaming tables and acts as a time attribute for + * batch tables over which the query is evaluated. + */ +abstract class EventTimeWindow(timeField: Expression) extends GroupWindow { + + protected var name: Option[Expression] = None + + /** +* Assigns an alias for this window that the following `select()` clause can refer to in order +* to access window properties such as window start or end time. +* +* @param alias alias for this window +* @return this window +*/ + def as(alias: Expression): EventTimeWindow = { +this.name = Some(alias) +this + } + + /** +* Assigns an alias for this window that the following `select()` clause can refer to in order +* to access window properties such as window start or end time. +* +* @param alias alias for this window +* @return this window +*/ + def as(alias: String): EventTimeWindow = as(ExpressionParser.parseExpression(alias)) +} + +// +// Tumbling group-windows +// + +/** + * Tumbling group-window. By default, it works on processing-time. --- End diff -- Add hint for how to create event time window. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r81857075 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/windowDsl.scala --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table + +import org.apache.flink.api.table.expressions.Expression +import org.apache.flink.api.table.{SessionWindow, SlideWithSize, TumblingWindow} + +/** + * Helper object for creating a tumbling window. In a tumbling window elements are assigned to + * fixed length, non-overlapping windows of a specified window size. For example, if you specify a + * window size of 5 minutes, the following operation will get 5 minutes worth of elements in + * each invocation. + */ +object Tumble { + + /** +* Creates a tumbling window. In a tumbling window elements are assigned to fixed length, +* non-overlapping windows of a specified window size. For example, if you specify a window +* size of 5 minutes, the following operation will get 5 minutes worth of elements in --- End diff -- I think following operation is not well defined. How about "For example, if you specify a tumbling window of 5 minutes size, elements will be grouped in 5 minute intervals." --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r81857293 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/windowDsl.scala --- @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.table + +import org.apache.flink.api.table.expressions.Expression +import org.apache.flink.api.table.{SessionWindow, SlideWithSize, TumblingWindow} + +/** + * Helper object for creating a tumbling window. In a tumbling window elements are assigned to + * fixed length, non-overlapping windows of a specified window size. For example, if you specify a + * window size of 5 minutes, the following operation will get 5 minutes worth of elements in + * each invocation. + */ +object Tumble { + + /** +* Creates a tumbling window. In a tumbling window elements are assigned to fixed length, +* non-overlapping windows of a specified window size. For example, if you specify a window --- End diff -- "fixed length ... of a specified window size" is duplicate. How about "non-overlapping windows of a specified fixed length"? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r81861356 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowITCase.scala --- @@ -0,0 +1,777 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark +import org.apache.flink.api.scala.stream.utils.StreamITCase +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.{Row, _} +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class GroupWindowITCase extends StreamingMultipleProgramsTestBase { + + @Test(expected = classOf[ValidationException]) + def testInvalidBatchWindow(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table + .groupBy('string) + .window(Session withGap 10.rows as 'string) + } + + @Test(expected = classOf[TableException]) + def testInvalidRowtime1(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'rowtime, 'int, 'string) + +table + .groupBy('string) + .window(Tumble over 50.milli) + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime2(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table + .groupBy('string) + .window(Tumble over 50.milli) + .select('string, 'int.count as 'rowtime) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime3(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table.as('rowtime, 'myint, 'mystring) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime4(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table + .groupBy('string) + .window(Tumble over 50.milli on 'string) + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidTumblingSize(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r81852394 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala --- @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table + +import org.apache.flink.api.table.expressions.{Expression, ExpressionParser} +import org.apache.flink.api.table.plan.logical._ + +/** + * Group-window specification. Group-windows allow aggregates which are computed for a group of + * elements. A (time or row-count) window is required to bound the infinite input stream into a + * finite group. Group-windows are evaluated once per group. + */ +trait GroupWindow { + + /** +* Converts an API class to a logical window for planning. This is an internal method. +*/ + private[flink] def toLogicalWindow: LogicalWindow +} + +/** + * A group-window operating on event-time. + * + * @param timeField defines the time mode for streaming tables and acts as a time attribute for + * batch tables over which the query is evaluated. + */ +abstract class EventTimeWindow(timeField: Expression) extends GroupWindow { + + protected var name: Option[Expression] = None + + /** +* Assigns an alias for this window that the following `select()` clause can refer to in order +* to access window properties such as window start or end time. +* +* @param alias alias for this window +* @return this window +*/ + def as(alias: Expression): EventTimeWindow = { +this.name = Some(alias) +this + } + + /** +* Assigns an alias for this window that the following `select()` clause can refer to in order +* to access window properties such as window start or end time. +* +* @param alias alias for this window +* @return this window +*/ + def as(alias: String): EventTimeWindow = as(ExpressionParser.parseExpression(alias)) +} + +// +// Tumbling group-windows +// + +/** + * Tumbling group-window. By default, it works on processing-time. + * + * @param size size of the window either as number of rows or interval of milliseconds + */ +class TumblingWindow(size: Expression) extends GroupWindow { + + /** +* Tumbling group-window. By default, it works on processing-time. +* +* @param size size of the window either as number of rows or interval of milliseconds +*/ + def this(size: String) = this(ExpressionParser.parseExpression(size)) + + private var alias: Option[Expression] = None + + /** +* Defines the time mode for streaming tables and specifies a time attribute for +* batch tables over which the query is evaluated. +* +* @param timeField time mode for streaming tables and time attribute for batch tables +* @return a tumbling group-window on event-time +*/ + def on(timeField: Expression): TumblingEventTimeWindow = +new TumblingEventTimeWindow(alias, timeField, size) + + /** +* Defines the time mode for streaming tables and specifies a time attribute for +* batch tables over which the query is evaluated. +* +* @param timeField time mode for streaming tables and time attribute for batch tables +* @return a tumbling group-window on event-time +*/ + def on(timeField: String): TumblingEventTimeWindow = +on(ExpressionParser.parseExpression(timeField)) + + /** +* Assigns an alias for this window that the following `select()` clause can refer to in order +* to access window properties suc
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r81862223 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowITCase.scala --- @@ -0,0 +1,777 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark +import org.apache.flink.api.scala.stream.utils.StreamITCase +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.{Row, _} +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class GroupWindowITCase extends StreamingMultipleProgramsTestBase { + + @Test(expected = classOf[ValidationException]) + def testInvalidBatchWindow(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table + .groupBy('string) + .window(Session withGap 10.rows as 'string) + } + + @Test(expected = classOf[TableException]) + def testInvalidRowtime1(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'rowtime, 'int, 'string) + +table + .groupBy('string) + .window(Tumble over 50.milli) + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime2(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table + .groupBy('string) + .window(Tumble over 50.milli) + .select('string, 'int.count as 'rowtime) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime3(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table.as('rowtime, 'myint, 'mystring) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime4(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table + .groupBy('string) + .window(Tumble over 50.milli on 'string) + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidTumblingSize(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r81970778 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/LogicalWindow.scala --- @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.logical + +import org.apache.flink.api.table.TableEnvironment +import org.apache.flink.api.table.expressions.{Expression, WindowReference} +import org.apache.flink.api.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} + +abstract class LogicalWindow(val alias: Option[Expression]) extends Resolvable[LogicalWindow] { --- End diff -- rename to `LogicalGroupWindow` and merge with `groupWindows.scala`? Or is this class supposed to be used for row windows as well? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r81860061 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowITCase.scala --- @@ -0,0 +1,777 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark +import org.apache.flink.api.scala.stream.utils.StreamITCase +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.{Row, _} +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class GroupWindowITCase extends StreamingMultipleProgramsTestBase { + + @Test(expected = classOf[ValidationException]) + def testInvalidBatchWindow(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table + .groupBy('string) + .window(Session withGap 10.rows as 'string) + } + + @Test(expected = classOf[TableException]) + def testInvalidRowtime1(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'rowtime, 'int, 'string) + +table + .groupBy('string) + .window(Tumble over 50.milli) + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime2(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table + .groupBy('string) + .window(Tumble over 50.milli) + .select('string, 'int.count as 'rowtime) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime3(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table.as('rowtime, 'myint, 'mystring) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidRowtime4(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) +val table = stream.toTable(tEnv, 'long, 'int, 'string) + +table + .groupBy('string) + .window(Tumble over 50.milli on 'string) + .select('string, 'int.count) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidTumblingSize(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r81847736 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala --- @@ -675,11 +692,77 @@ class GroupedTable( * Example: * * {{{ -* tab.groupBy("key").select("key, value.avg + " The average" as average") +* tab.groupBy("key").select("key, value.avg + ' The average' as average") +* }}} +*/ + def select(fields: String): Table = { +val fieldExprs = ExpressionParser.parseExpressionList(fields) +select(fieldExprs: _*) + } + + /** +* Windows a table to divide a (potentially) infinite stream of records into finite slices +* based on the timestamps of elements or other criteria. This division is required when +* working with infinite data and performing transformations that aggregate elements. +* +* @param groupWindow group-window specification required to bound the infinite input stream +*into a finite group +* @return group-windowed table +*/ + def window(groupWindow: GroupWindow): GroupWindowedTable = { +if (table.tableEnv.isInstanceOf[BatchTableEnvironment]) { + throw new ValidationException(s"Windows on batch tables are currently not supported.") +} +new GroupWindowedTable(table, groupKey, groupWindow) + } +} + +class GroupWindowedTable( +private[flink] val table: Table, +private[flink] val groupKey: Seq[Expression], +private[flink] val window: GroupWindow) { + + /** +* Performs a selection operation on a group-windowed table. Similar to an SQL SELECT statement. +* The field expressions can contain complex expressions and aggregations. +* +* Example: +* +* {{{ +* groupWindowTable.select('key, 'window.start, 'value.avg + " The average" as 'average) +* }}} +*/ + def select(fields: Expression*): Table = { +val projectionOnAggregates = fields.map(extractAggregations(_, table.tableEnv)) +val aggregations = projectionOnAggregates.flatMap(_._2) + +val groupWindow = window.toLogicalWindow + +val logical = if (aggregations.nonEmpty) { + Project(projectionOnAggregates.map(e => UnresolvedAlias(e._1)), +WindowAggregate(groupKey, groupWindow, aggregations, table.logicalPlan) + .validate(table.tableEnv)) +} else { + Project(projectionOnAggregates.map(e => UnresolvedAlias(e._1)), +WindowAggregate(groupKey, groupWindow, Nil, table.logicalPlan).validate(table.tableEnv)) +} + +new Table(table.tableEnv, logical.validate(table.tableEnv)) --- End diff -- Wasn't `logical` validated before? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r81983925 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/groupWindows.scala --- @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.logical + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.table.{StreamTableEnvironment, TableEnvironment} +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, TypeCoercion} +import org.apache.flink.api.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} + +abstract class EventTimeGroupWindow( +name: Option[Expression], +time: Expression) + extends LogicalWindow(name) { + + override def validate(tableEnv: TableEnvironment): ValidationResult = { +val valid = super.validate(tableEnv) +if (valid.isFailure) { +return valid +} + +if (tableEnv.isInstanceOf[StreamTableEnvironment]) { + val valid = time match { +case RowtimeAttribute() => + ValidationSuccess +case _ => + ValidationFailure("Event-time window expects a 'rowtime' time field.") + } + if (valid.isFailure) { +return valid + } +} +if (!TypeCoercion.canCast(time.resultType, BasicTypeInfo.LONG_TYPE_INFO)) { + ValidationFailure(s"Event-time window expects a time field that can be safely cast " + +s"to Long, but is ${time.resultType}") +} else { + ValidationSuccess +} + } +} + +abstract class ProcessingTimeGroupWindow(name: Option[Expression]) extends LogicalWindow(name) + +// +// Tumbling group windows +// + +object TumblingGroupWindow { + def validate(tableEnv: TableEnvironment, size: Expression): ValidationResult = size match { +case Literal(_, IntervalTypeInfo.INTERVAL_MILLIS) => + ValidationSuccess +case Literal(_, BasicTypeInfo.LONG_TYPE_INFO) | Literal(_, BasicTypeInfo.INT_TYPE_INFO) => + ValidationSuccess +case _ => + ValidationFailure( +"Tumbling window expects size literal of type Interval of Milliseconds or Long/Integer.") + } +} + +case class ProcessingTimeTumblingGroupWindow( +name: Option[Expression], +size: Expression) + extends ProcessingTimeGroupWindow(name) { + + override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow = +ProcessingTimeTumblingGroupWindow( + name.map(resolve), + resolve(size)) + + override def validate(tableEnv: TableEnvironment): ValidationResult = +super.validate(tableEnv).orElse(TumblingGroupWindow.validate(tableEnv, size)) + + override def toString: String = s"ProcessingTimeTumblingGroupWindow($name, $size)" +} + +case class EventTimeTumblingGroupWindow( +name: Option[Expression], +timeField: Expression, +size: Expression) + extends EventTimeGroupWindow( +name, +timeField) { + + override def resolveExpressions(resolve: (Expression) => Expression): LogicalWindow = +EventTimeTumblingGroupWindow( + name.map(resolve), + resolve(timeField), + resolve(size)) + + override def validate(tableEnv: TableEnvironment): ValidationResult = +super.validate(tableEnv).orElse(TumblingGroupWindow.validate(tableEnv, size)) + + override def toString: String = s"EventTimeTumblingGroupWindow($name, $timeField, $size)" +} + +// -
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r81976580 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/groupWindows.scala --- @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.logical + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.table.{StreamTableEnvironment, TableEnvironment} +import org.apache.flink.api.table.expressions._ +import org.apache.flink.api.table.typeutils.{IntervalTypeInfo, TypeCoercion} +import org.apache.flink.api.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} + +abstract class EventTimeGroupWindow( +name: Option[Expression], +time: Expression) + extends LogicalWindow(name) { + + override def validate(tableEnv: TableEnvironment): ValidationResult = { +val valid = super.validate(tableEnv) +if (valid.isFailure) { +return valid +} + +if (tableEnv.isInstanceOf[StreamTableEnvironment]) { + val valid = time match { +case RowtimeAttribute() => + ValidationSuccess +case _ => + ValidationFailure("Event-time window expects a 'rowtime' time field.") + } + if (valid.isFailure) { +return valid + } +} +if (!TypeCoercion.canCast(time.resultType, BasicTypeInfo.LONG_TYPE_INFO)) { --- End diff -- Can you put this in an `else` branch (for `BatchTableEnvironment`)? `RowtimeAttribute`'s result type is `Long`. So it will always pass the check. The `valid.isFailure` check could also be removed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r81852306 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/windows.scala --- @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table + +import org.apache.flink.api.table.expressions.{Expression, ExpressionParser} +import org.apache.flink.api.table.plan.logical._ + +/** + * Group-window specification. Group-windows allow aggregates which are computed for a group of + * elements. A (time or row-count) window is required to bound the infinite input stream into a + * finite group. Group-windows are evaluated once per group. + */ +trait GroupWindow { + + /** +* Converts an API class to a logical window for planning. This is an internal method. +*/ + private[flink] def toLogicalWindow: LogicalWindow +} + +/** + * A group-window operating on event-time. + * + * @param timeField defines the time mode for streaming tables and acts as a time attribute for + * batch tables over which the query is evaluated. + */ +abstract class EventTimeWindow(timeField: Expression) extends GroupWindow { + + protected var name: Option[Expression] = None + + /** +* Assigns an alias for this window that the following `select()` clause can refer to in order +* to access window properties such as window start or end time. +* +* @param alias alias for this window +* @return this window +*/ + def as(alias: Expression): EventTimeWindow = { +this.name = Some(alias) +this + } + + /** +* Assigns an alias for this window that the following `select()` clause can refer to in order +* to access window properties such as window start or end time. +* +* @param alias alias for this window +* @return this window +*/ + def as(alias: String): EventTimeWindow = as(ExpressionParser.parseExpression(alias)) +} + +// +// Tumbling group-windows +// + +/** + * Tumbling group-window. By default, it works on processing-time. + * + * @param size size of the window either as number of rows or interval of milliseconds + */ +class TumblingWindow(size: Expression) extends GroupWindow { + + /** +* Tumbling group-window. By default, it works on processing-time. +* +* @param size size of the window either as number of rows or interval of milliseconds +*/ + def this(size: String) = this(ExpressionParser.parseExpression(size)) + + private var alias: Option[Expression] = None + + /** +* Defines the time mode for streaming tables and specifies a time attribute for +* batch tables over which the query is evaluated. +* +* @param timeField time mode for streaming tables and time attribute for batch tables +* @return a tumbling group-window on event-time +*/ + def on(timeField: Expression): TumblingEventTimeWindow = +new TumblingEventTimeWindow(alias, timeField, size) + + /** +* Defines the time mode for streaming tables and specifies a time attribute for +* batch tables over which the query is evaluated. +* +* @param timeField time mode for streaming tables and time attribute for batch tables +* @return a tumbling group-window on event-time +*/ + def on(timeField: String): TumblingEventTimeWindow = +on(ExpressionParser.parseExpression(timeField)) + + /** +* Assigns an alias for this window that the following `select()` clause can refer to in order +* to access window properties suc
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r81854254 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala --- @@ -50,6 +50,10 @@ object WordCountTable { .filter('frequency === 2) .toDataSet[WC] +val w = Slide.over(23).every(23).toLogicalWindow.toString --- End diff -- please remove changes --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2562#discussion_r81859596 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/GroupWindowITCase.scala --- @@ -0,0 +1,777 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark +import org.apache.flink.api.scala.stream.utils.StreamITCase +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.table.{Row, _} +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class GroupWindowITCase extends StreamingMultipleProgramsTestBase { + + @Test(expected = classOf[ValidationException]) + def testInvalidBatchWindow(): Unit = { +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val data = new mutable.MutableList[(Long, Int, String)] +val stream = env.fromCollection(data) --- End diff -- not a stream --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2562: [FLINK-4691] [table] Add group-windows for streami...
GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/2562 [FLINK-4691] [table] Add group-windows for streaming tables This PR implements Tumble, Slide, Session group-windows for streaming tables as described in FLIP-11. It adds API, validation, logical representation, and runtime components. Some additional comments: I have not implemented the 'systemtime' keyword yet as this would cause more problems than it solves. Especially integrating it into the validation layer would be tricky. The resolution of those special fields happens within a WindowAggregate, however, the logical type of a window should already be known at this point. We are mixing logical operators and expressions which is not very nice. Furthermore, what happens in batch environment if 'systemtime' is used? It could also be a existing column but does not have to be one. That is not specified in the FLIP yet. The aggregations are not very efficient yet. Currently this PR uses window functions that wrap the GroupReduce functions. We have to rework the aggregations first. Maybe we could use `WindowedStream#apply(R, FoldFunction, WindowFunction, TypeInformation)` which means that `R` has to be created in the translation phase. The tests are mainly ITCases yet, we might want to change that to unit tests once we have means (like new test bases) to do that. The website documentation is missing yet. You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-4691 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2562.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2562 commit edbfe00cb0fd7ea8362c90988eb0860eb9ce6078 Author: twalthr Date: 2016-08-25T07:19:53Z [FLINK-4691] [table] Add group-windows for streaming tables --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---