Repository: flink Updated Branches: refs/heads/master e79cedf23 -> 14bc62740
[FLINK-7426] [table] Support null values in keys This closes #4732. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/14bc6274 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/14bc6274 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/14bc6274 Branch: refs/heads/master Commit: 14bc62740e90ecefd34f9202f4a37c883c3122e5 Parents: e79cedf Author: twalthr <twal...@apache.org> Authored: Wed Sep 27 13:05:44 2017 +0200 Committer: twalthr <twal...@apache.org> Committed: Tue Oct 17 10:35:53 2017 +0200 ---------------------------------------------------------------------- .../main/java/org/apache/flink/types/Row.java | 22 +++++++++-- .../java/org/apache/flink/types/RowTest.java | 33 ++++++++++++++++ .../flink/table/calcite/FlinkTypeFactory.scala | 14 +++---- .../datastream/DataStreamGroupAggregate.scala | 3 +- .../DataStreamGroupWindowAggregate.scala | 18 +++++---- .../datastream/DataStreamOverAggregate.scala | 5 ++- .../nodes/datastream/DataStreamWindowJoin.scala | 9 ++++- .../flink/table/plan/schema/RowSchema.scala | 8 ++++ .../flink/table/runtime/CRowKeySelector.scala | 41 ++++++++++++++++++++ .../table/runtime/aggregate/AggregateUtil.scala | 7 ++-- ...IncrementalAggregateTimeWindowFunction.scala | 2 +- .../IncrementalAggregateWindowFunction.scala | 7 ++-- .../table/runtime/stream/sql/JoinITCase.scala | 5 ++- .../runtime/stream/table/AggregateITCase.scala | 8 ++-- .../stream/table/GroupWindowITCase.scala | 27 ++++++++----- .../runtime/stream/table/OverWindowITCase.scala | 6 ++- 16 files changed, 166 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-core/src/main/java/org/apache/flink/types/Row.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/Row.java b/flink-core/src/main/java/org/apache/flink/types/Row.java index 0b2120f..4783314 100644 --- a/flink-core/src/main/java/org/apache/flink/types/Row.java +++ b/flink-core/src/main/java/org/apache/flink/types/Row.java @@ -142,15 +142,29 @@ public class Row implements Serializable{ /** * Creates a new Row which copied from another row. + * This method does not perform a deep copy. * * @param row The row being copied. * @return The cloned new Row */ public static Row copy(Row row) { - Row ret = new Row(row.getArity()); - for (int i = 0; i < row.getArity(); ++i) { - ret.setField(i, row.getField(i)); + final Row newRow = new Row(row.fields.length); + System.arraycopy(row.fields, 0, newRow.fields, 0, row.fields.length); + return newRow; + } + + /** + * Creates a new Row with projected fields from another row. + * This method does not perform a deep copy. + * + * @param fields fields to be projected + * @return the new projected Row + */ + public static Row project(Row row, int[] fields) { + final Row newRow = new Row(fields.length); + for (int i = 0; i < fields.length; i++) { + newRow.fields[i] = row.fields[fields[i]]; } - return ret; + return newRow; } } http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-core/src/test/java/org/apache/flink/types/RowTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/RowTest.java b/flink-core/src/test/java/org/apache/flink/types/RowTest.java index 13a4d6a..067992a 100644 --- a/flink-core/src/test/java/org/apache/flink/types/RowTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/RowTest.java @@ -21,6 +21,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class RowTest { @Test @@ -46,4 +47,36 @@ public class RowTest { row2.setField(4, true); assertEquals(row1, row2); } + + @Test + public void testRowCopy() { + Row row = new Row(5); + row.setField(0, 1); + row.setField(1, "hello"); + row.setField(2, null); + row.setField(3, new Tuple2<>(2, "hi")); + row.setField(4, "hello world"); + + Row copy = Row.copy(row); + assertEquals(row, copy); + assertTrue(row != copy); + } + + @Test + public void testRowProject() { + Row row = new Row(5); + row.setField(0, 1); + row.setField(1, "hello"); + row.setField(2, null); + row.setField(3, new Tuple2<>(2, "hi")); + row.setField(4, "hello world"); + + Row projected = Row.project(row, new int[]{0, 2, 4}); + + Row expected = new Row(3); + expected.setField(0, 1); + expected.setField(1, null); + expected.setField(2, "hello world"); + assertEquals(expected, projected); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala index 768d700..2874e61 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala @@ -149,13 +149,6 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp createTypeFromTypeInfo(oa.getComponentInfo, isNullable = true), isNullable) - case mp: MapTypeInfo[_, _] => - new MapRelDataType( - mp, - createTypeFromTypeInfo(mp.getKeyTypeInfo, isNullable = true), - createTypeFromTypeInfo(mp.getValueTypeInfo, isNullable = true), - isNullable) - case mts: MultisetTypeInfo[_] => new MultisetRelDataType( mts, @@ -163,6 +156,13 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp isNullable ) + case mp: MapTypeInfo[_, _] => + new MapRelDataType( + mp, + createTypeFromTypeInfo(mp.getKeyTypeInfo, isNullable = true), + createTypeFromTypeInfo(mp.getValueTypeInfo, isNullable = true), + isNullable) + case ti: TypeInformation[_] => new GenericRelDataType( ti, http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala index 742a7e4..71de57c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupAggregate.scala @@ -27,6 +27,7 @@ import org.apache.flink.table.codegen.AggregationCodeGenerator import org.apache.flink.table.plan.nodes.CommonAggregate import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.CRowKeySelector import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair import org.apache.flink.table.runtime.aggregate._ import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} @@ -144,7 +145,7 @@ class DataStreamGroupAggregate( // grouped / keyed aggregation if (groupings.nonEmpty) { inputDS - .keyBy(groupings: _*) + .keyBy(new CRowKeySelector(groupings, inputSchema.projectedTypeInfo(groupings))) .process(processFunction) .returns(outRowType) .name(keyedAggOpName) http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala index db15839..267bc3b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala @@ -22,7 +22,6 @@ import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.AggregateCall import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} -import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.datastream.{AllWindowedStream, DataStream, KeyedStream, WindowedStream} import org.apache.flink.streaming.api.windowing.assigners._ import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger @@ -34,16 +33,17 @@ import org.apache.flink.table.expressions.ExpressionUtils._ import org.apache.flink.table.expressions.ResolvedFieldReference import org.apache.flink.table.plan.logical._ import org.apache.flink.table.plan.nodes.CommonAggregate -import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.table.plan.nodes.datastream.DataStreamGroupWindowAggregate._ import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules -import org.apache.flink.table.runtime.RowtimeProcessFunction +import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.table.runtime.aggregate.AggregateUtil._ import org.apache.flink.table.runtime.aggregate._ +import org.apache.flink.table.runtime.triggers.StateCleaningCountTrigger import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.table.runtime.{CRowKeySelector, RowtimeProcessFunction} import org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval -import org.apache.flink.table.runtime.triggers.StateCleaningCountTrigger import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row class DataStreamGroupWindowAggregate( window: LogicalWindow, @@ -189,10 +189,12 @@ class DataStreamGroupWindowAggregate( schema.arity, namedProperties) - val keyedStream = timestampedInput.keyBy(grouping: _*) + val keySelector = new CRowKeySelector(grouping, inputSchema.projectedTypeInfo(grouping)) + + val keyedStream = timestampedInput.keyBy(keySelector) val windowedStream = createKeyedWindowedStream(queryConfig, window, keyedStream) - .asInstanceOf[WindowedStream[CRow, Tuple, DataStreamWindow]] + .asInstanceOf[WindowedStream[CRow, Row, DataStreamWindow]] val (aggFunction, accumulatorRowType, aggResultRowType) = AggregateUtil.createDataStreamAggregateFunction( @@ -241,8 +243,8 @@ object DataStreamGroupWindowAggregate { private def createKeyedWindowedStream( queryConfig: StreamQueryConfig, groupWindow: LogicalWindow, - stream: KeyedStream[CRow, Tuple]): - WindowedStream[CRow, Tuple, _ <: DataStreamWindow] = groupWindow match { + stream: KeyedStream[CRow, Row]): + WindowedStream[CRow, Row, _ <: DataStreamWindow] = groupWindow match { case TumblingGroupWindow(_, timeField, size) if isProctimeAttribute(timeField) && isTimeIntervalLiteral(size)=> http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala index b9b3e3e..c41c1a9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala @@ -34,6 +34,7 @@ import org.apache.flink.table.codegen.AggregationCodeGenerator import org.apache.flink.table.plan.nodes.OverAggregate import org.apache.flink.table.plan.rules.datastream.DataStreamRetractionRules import org.apache.flink.table.plan.schema.RowSchema +import org.apache.flink.table.runtime.CRowKeySelector import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair import org.apache.flink.table.runtime.aggregate._ import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} @@ -214,7 +215,7 @@ class DataStreamOverAggregate( // partitioned aggregation if (partitionKeys.nonEmpty) { inputDS - .keyBy(partitionKeys: _*) + .keyBy(new CRowKeySelector(partitionKeys, inputSchema.projectedTypeInfo(partitionKeys))) .process(processFunction) .returns(returnTypeInfo) .name(aggOpName) @@ -264,7 +265,7 @@ class DataStreamOverAggregate( // partitioned aggregation if (partitionKeys.nonEmpty) { inputDS - .keyBy(partitionKeys: _*) + .keyBy(new CRowKeySelector(partitionKeys, inputSchema.projectedTypeInfo(partitionKeys))) .process(processFunction) .returns(returnTypeInfo) .name(aggOpName) http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala index 3e23006..27f2c74 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamWindowJoin.scala @@ -32,6 +32,7 @@ import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, Ta import org.apache.flink.table.plan.nodes.CommonJoin import org.apache.flink.table.plan.schema.RowSchema import org.apache.flink.table.plan.util.UpdatingPlanChecker +import org.apache.flink.table.runtime.CRowKeySelector import org.apache.flink.table.runtime.join.{ProcTimeBoundedStreamInnerJoin, RowTimeBoundedStreamInnerJoin, WindowJoinUtil} import org.apache.flink.table.runtime.operators.KeyedCoProcessOperatorWithWatermarkDelay import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} @@ -225,7 +226,9 @@ class DataStreamWindowJoin( if (!leftKeys.isEmpty) { leftDataStream.connect(rightDataStream) - .keyBy(leftKeys, rightKeys) + .keyBy( + new CRowKeySelector(leftKeys, leftSchema.projectedTypeInfo(leftKeys)), + new CRowKeySelector(rightKeys, rightSchema.projectedTypeInfo(rightKeys))) .process(procInnerJoinFunc) .name(operatorName) .returns(returnTypeInfo) @@ -264,7 +267,9 @@ class DataStreamWindowJoin( if (!leftKeys.isEmpty) { leftDataStream .connect(rightDataStream) - .keyBy(leftKeys, rightKeys) + .keyBy( + new CRowKeySelector(leftKeys, leftSchema.projectedTypeInfo(leftKeys)), + new CRowKeySelector(rightKeys, rightSchema.projectedTypeInfo(rightKeys))) .transform( operatorName, returnTypeInfo, http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala index ad0f552..cfe6683 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/RowSchema.scala @@ -62,4 +62,12 @@ class RowSchema(private val logicalRowType: RelDataType) { */ def fieldNames: Seq[String] = logicalRowType.getFieldNames + /** + * Returns a projected [[TypeInformation]] of the schema. + */ + def projectedTypeInfo(fields: Array[Int]): TypeInformation[Row] = { + val projectedTypes = fields.map(fieldTypeInfos(_)) + val projectedNames = fields.map(fieldNames(_)) + new RowTypeInfo(projectedTypes, projectedNames) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowKeySelector.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowKeySelector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowKeySelector.scala new file mode 100644 index 0000000..216a7f9 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowKeySelector.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * Null-aware key selector. + */ +class CRowKeySelector( + val keyFields: Array[Int], + @transient var returnType: TypeInformation[Row]) + extends KeySelector[CRow, Row] + with ResultTypeQueryable[Row] { + + override def getKey(value: CRow): Row = { + Row.project(value.row, keyFields) + } + + override def getProducedType: TypeInformation[Row] = returnType +} http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala index c84b254..2efd13d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala @@ -27,8 +27,7 @@ import org.apache.calcite.sql.fun._ import org.apache.calcite.sql.{SqlAggFunction, SqlKind} import org.apache.flink.api.common.functions.{MapFunction, RichGroupReduceFunction, AggregateFunction => DataStreamAggFunction, _} import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} -import org.apache.flink.api.java.tuple.Tuple -import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo} +import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction} import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow} @@ -981,7 +980,7 @@ object AggregateUtil { numAggregates: Int, finalRowArity: Int, properties: Seq[NamedWindowProperty]): - WindowFunction[Row, CRow, Tuple, DataStreamWindow] = { + WindowFunction[Row, CRow, Row, DataStreamWindow] = { if (isTimeWindow(window)) { val (startPos, endPos, timePos) = computeWindowPropertyPos(properties) @@ -992,7 +991,7 @@ object AggregateUtil { endPos, timePos, finalRowArity) - .asInstanceOf[WindowFunction[Row, CRow, Tuple, DataStreamWindow]] + .asInstanceOf[WindowFunction[Row, CRow, Row, DataStreamWindow]] } else { new IncrementalAggregateWindowFunction( numGroupingKeys, http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala index 1950230..69e4f7b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala @@ -59,7 +59,7 @@ class IncrementalAggregateTimeWindowFunction( } override def apply( - key: Tuple, + key: Row, window: TimeWindow, records: Iterable[Row], out: Collector[CRow]): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala index 7e9d738..c9fa0c9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala @@ -19,12 +19,11 @@ package org.apache.flink.table.runtime.aggregate import java.lang.Iterable -import org.apache.flink.api.java.tuple.Tuple -import org.apache.flink.types.Row import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction import org.apache.flink.streaming.api.windowing.windows.Window import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row import org.apache.flink.util.Collector /** @@ -38,7 +37,7 @@ class IncrementalAggregateWindowFunction[W <: Window]( private val numGroupingKey: Int, private val numAggregates: Int, private val finalRowArity: Int) - extends RichWindowFunction[Row, CRow, Tuple, W] { + extends RichWindowFunction[Row, CRow, Row, W] { private var output: CRow = _ @@ -51,7 +50,7 @@ class IncrementalAggregateWindowFunction[W <: Window]( * Row based on the mapping relation between intermediate aggregate data and output data. */ override def apply( - key: Tuple, + key: Row, window: W, records: Iterable[Row], out: Collector[CRow]): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala index 119f92f..1d7bab6 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/JoinITCase.scala @@ -25,8 +25,9 @@ import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.watermark.Watermark -import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.{TableEnvironment, Types} +import org.apache.flink.table.expressions.Null import org.apache.flink.table.runtime.utils.{StreamITCase, StreamingWithStateTestBase} import org.apache.flink.types.Row import org.junit._ @@ -66,7 +67,9 @@ class JoinITCase extends StreamingWithStateTestBase { data2.+=((2, 2L, "HeHe")) val t1 = env.fromCollection(data1).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) + .select(('a === 1)?(Null(Types.INT), 'a) as 'a, 'b, 'c, 'proctime) // test null values val t2 = env.fromCollection(data2).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime) + .select(('a === 1)?(Null(Types.INT), 'a) as 'a, 'b, 'c, 'proctime) // test null values tEnv.registerTable("T1", t1) tEnv.registerTable("T2", t2) http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala index e67c784..67558d9 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala @@ -23,7 +23,8 @@ import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.scala._ import org.apache.flink.table.runtime.utils.StreamITCase.RetractingSink -import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment} +import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment, Types} +import org.apache.flink.table.expressions.Null import org.apache.flink.table.runtime.utils.JavaUserDefinedAggFunctions.{CountDistinct, DataViewTestAgg} import org.apache.flink.table.runtime.utils.{JavaUserDefinedAggFunctions, StreamITCase, StreamTestData, StreamingWithStateTestBase} import org.apache.flink.types.Row @@ -39,7 +40,6 @@ class AggregateITCase extends StreamingWithStateTestBase { private val queryConfig = new StreamQueryConfig() queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2)) - @Test def testDistinct(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment @@ -48,13 +48,13 @@ class AggregateITCase extends StreamingWithStateTestBase { StreamITCase.clear val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c) - .select('b).distinct() + .select('b, Null(Types.LONG)).distinct() val results = t.toRetractStream[Row](queryConfig) results.addSink(new StreamITCase.RetractingSink).setParallelism(1) env.execute() - val expected = mutable.MutableList("1", "2", "3", "4", "5", "6") + val expected = mutable.MutableList("1,null", "2,null", "3,null", "4,null", "5,null", "6,null") assertEquals(expected.sorted, StreamITCase.retractedResults.sorted) } http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala index f6e739e..a9d4e44 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/GroupWindowITCase.scala @@ -61,7 +61,8 @@ class GroupWindowITCase extends StreamingMultipleProgramsTestBase { (4L, 5, 5d, 5f, new BigDecimal("5"), "Hello"), (7L, 3, 3d, 3f, new BigDecimal("3"), "Hello"), (8L, 3, 3d, 3f, new BigDecimal("3"), "Hello world"), - (16L, 4, 4d, 4f, new BigDecimal("4"), "Hello world")) + (16L, 4, 4d, 4f, new BigDecimal("4"), "Hello world"), + (32L, 4, 4d, 4f, new BigDecimal("4"), null.asInstanceOf[String])) @Test def testProcessingTimeSlidingGroupWindowOverCount(): Unit = { @@ -232,8 +233,6 @@ class GroupWindowITCase extends StreamingMultipleProgramsTestBase { assertEquals(expected.sorted, StreamITCase.testResults.sorted) } - - // ---------------------------------------------------------------------------------------------- // Sliding windows // ---------------------------------------------------------------------------------------------- @@ -270,7 +269,10 @@ class GroupWindowITCase extends StreamingMultipleProgramsTestBase { "2,1970-01-01 00:00:00.006,1970-01-01 00:00:00.011", "3,1970-01-01 00:00:00.002,1970-01-01 00:00:00.007", "3,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009", - "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005") + "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005", + "1,1970-01-01 00:00:00.028,1970-01-01 00:00:00.033", + "1,1970-01-01 00:00:00.03,1970-01-01 00:00:00.035", + "1,1970-01-01 00:00:00.032,1970-01-01 00:00:00.037") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } @@ -308,7 +310,9 @@ class GroupWindowITCase extends StreamingMultipleProgramsTestBase { "Hello,2,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005", "Hello,3,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01", "Hi,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005", - "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01") + "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01", + "null,1,1970-01-01 00:00:00.025,1970-01-01 00:00:00.035", + "null,1,1970-01-01 00:00:00.03,1970-01-01 00:00:00.04") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } @@ -343,7 +347,9 @@ class GroupWindowITCase extends StreamingMultipleProgramsTestBase { "Hello world,1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021", "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005", "Hello,2,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009", - "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005") + "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005", + "null,1,1970-01-01 00:00:00.028,1970-01-01 00:00:00.033", + "null,1,1970-01-01 00:00:00.032,1970-01-01 00:00:00.037") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } @@ -373,7 +379,8 @@ class GroupWindowITCase extends StreamingMultipleProgramsTestBase { val expected = Seq( "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005", "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005", - "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005") + "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005", + "null,1,1970-01-01 00:00:00.03,1970-01-01 00:00:00.035") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } @@ -402,7 +409,8 @@ class GroupWindowITCase extends StreamingMultipleProgramsTestBase { val expected = Seq( "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003", - "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003") + "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003", + "null,1,1970-01-01 00:00:00.03,1970-01-01 00:00:00.033") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } @@ -430,7 +438,8 @@ class GroupWindowITCase extends StreamingMultipleProgramsTestBase { env.execute() val expected = Seq( "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003", - "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003") + "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003", + "null,1,1970-01-01 00:00:00.03,1970-01-01 00:00:00.033") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } } http://git-wip-us.apache.org/repos/asf/flink/blob/14bc6274/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala index 3e6b0c6..7563dab 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala @@ -52,7 +52,8 @@ class OverWindowITCase extends StreamingWithStateTestBase { (7L, 7, "Hello World"), (8L, 8, "Hello World"), (8L, 8, "Hello World"), - (20L, 20, "Hello World")) + (20L, 20, "Hello World"), + (20L, 20, null.asInstanceOf[String])) val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) @@ -80,7 +81,8 @@ class OverWindowITCase extends StreamingWithStateTestBase { val expected = Seq( "Hello World,1,7,1", "Hello World,2,7,2", "Hello World,3,7,2", "Hello World,4,13,3", - "Hello,1,1,1", "Hello,2,1,2", "Hello,3,2,3", "Hello,4,3,4", "Hello,5,3,5", "Hello,6,4,6") + "Hello,1,1,1", "Hello,2,1,2", "Hello,3,2,3", "Hello,4,3,4", "Hello,5,3,5", "Hello,6,4,6", + "null,1,20,1") assertEquals(expected.sorted, StreamITCase.testResults.sorted) }