http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala index e61e190..c7c553b 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala @@ -28,6 +28,7 @@ import org.apache.flink.table.api.scala.batch.utils.{TableProgramsCollectionTest import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode import org.apache.flink.types.Row import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.runtime.types.CRow import org.apache.flink.test.util.TestBaseUtils import org.junit._ import org.junit.runner.RunWith @@ -164,6 +165,25 @@ class TableEnvironmentITCase( } @Test + def testToDataSetWithTypeOfCRow(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env, config) + + val t = CollectionDataSets.get3TupleDataSet(env) + .toTable(tEnv, 'a, 'b, 'c) + .select('a, 'b, 'c) + + val expected = "+1,1,Hi\n" + "+2,2,Hello\n" + "+3,2,Hello world\n" + + "+4,3,Hello world, how are you?\n" + "+5,3,I am fine.\n" + "+6,3,Luke Skywalker\n" + + "+7,4,Comment#1\n" + "+8,4,Comment#2\n" + "+9,4,Comment#3\n" + "+10,4,Comment#4\n" + + "+11,5,Comment#5\n" + "+12,5,Comment#6\n" + "+13,5,Comment#7\n" + "+14,5,Comment#8\n" + + "+15,5,Comment#9\n" + "+16,6,Comment#10\n" + "+17,6,Comment#11\n" + "+18,6,Comment#12\n" + + "+19,6,Comment#13\n" + "+20,6,Comment#14\n" + "+21,6,Comment#15\n" + val results = t.toDataSet[CRow].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test def testToTableFromCaseClass(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config)
http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala new file mode 100644 index 0000000..62fcfcd --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.scala.stream + +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.table.api.{TableEnvironment, TableException} +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase} +import org.apache.flink.types.Row +import org.junit.Assert._ +import org.junit.Test +import org.apache.flink.table.api.scala._ +import org.apache.flink.api.scala._ +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.utils.TableFunc0 + +import scala.collection.mutable + +/** + * tests for retraction + */ +class RetractionITCase extends StreamingWithStateTestBase { + // input data + val data = List( + ("Hello", 1), + ("word", 1), + ("Hello", 1), + ("bark", 1), + ("bark", 1), + ("bark", 1), + ("bark", 1), + ("bark", 1), + ("bark", 1), + ("flink", 1) + ) + + // keyed groupby + keyed groupby + @Test + def testWordCount(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + env.setParallelism(1) + env.setStateBackend(getStateBackend) + + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'word, 'num) + val resultTable = table + .groupBy('word) + .select('word as 'word, 'num.sum as 'count) + .groupBy('count) + .select('count, 'word.count as 'frequency) + + // to DataStream with CRow + val results = resultTable.toDataStream[CRow] + results.addSink(new StreamITCase.StringSinkWithCRow) + env.execute() + + val expected = Seq("+1,1", "+1,2", "+1,1", "+2,1", "+1,2", "+1,1", "+2,2", "+2,1", "+3,1", + "+3,0", "+4,1", "+4,0", "+5,1", "+5,0", "+6,1", "+1,2") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + // keyed groupby + non-keyed groupby + @Test + def testGroupByAndNonKeyedGroupBy(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + env.setParallelism(1) + env.setStateBackend(getStateBackend) + + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'word, 'num) + val resultTable = table + .groupBy('word) + .select('word as 'word, 'num.sum as 'count) + .select('count.sum) + + val results = resultTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq("1", "2", "1", "3", "4", "3", "5", "3", "6", "3", "7", "3", "8", "3", "9", + "10") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + // non-keyed groupby + keyed groupby + @Test + def testNonKeyedGroupByAndGroupBy(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + env.setParallelism(1) + env.setStateBackend(getStateBackend) + + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'word, 'num) + val resultTable = table + .select('num.sum as 'count) + .groupBy('count) + .select('count, 'count.count) + + val results = resultTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq("1,1", "1,0", "2,1", "2,0", "3,1", "3,0", "4,1", "4,0", "5,1", "5,0", "6," + + "1", "6,0", "7,1", "7,0", "8,1", "8,0", "9,1", "9,0", "10,1") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + // test unique process, if the current output message of unbounded groupby equals the + // previous message, unbounded groupby will ignore the current one. + @Test + def testUniqueProcess(): Unit = { + // data input + val data = List( + (1, 1L), + (2, 2L), + (3, 3L), + (3, 3L), + (4, 1L), + (4, 0L), + (4, 0L), + (4, 0L), + (5, 1L), + (6, 6L), + (6, 6L), + (6, 6L), + (7, 8L) + ) + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + env.setParallelism(1) + env.setStateBackend(getStateBackend) + + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'pk, 'value) + val resultTable = table + .groupBy('pk) + .select('pk as 'pk, 'value.sum as 'sum) + .groupBy('sum) + .select('sum, 'pk.count as 'count) + + val results = resultTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq("1,1", "2,1", "3,1", "3,0", "6,1", "1,2", "1,3", "6,2", "6,1", "12,1","12," + + "0", "18,1", "8,1") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + // correlate should handle retraction messages correctly + @Test + def testCorrelate(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + env.setParallelism(1) + env.setStateBackend(getStateBackend) + + val func0 = new TableFunc0 + + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'word, 'num) + val resultTable = table + .groupBy('word) + .select('word as 'word, 'num.sum as 'count) + .leftOuterJoin(func0('word)) + .groupBy('count) + .select('count, 'word.count as 'frequency) + + val results = resultTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "1,1", "1,2", "1,1", "2,1", "1,2", "1,1", "2,2", "2,1", "3,1", "3,0", "4,1", "4,0", "5,1", + "5,0", "6,1", "1,2") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + // keyed groupby + over agg(unbounded, procTime, keyed) + @Test(expected = classOf[TableException]) + def testGroupByAndUnboundPartitionedProcessingWindowWithRow(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStateBackend(getStateBackend) + env.setParallelism(1) + val tEnv = TableEnvironment.getTableEnvironment(env) + + StreamITCase.testResults = mutable.MutableList() + + val t1 = env.fromCollection(data).toTable(tEnv).as('word, 'number) + + tEnv.registerTable("T1", t1) + + val sqlQuery = "SELECT word, cnt, count(word) " + + "OVER (PARTITION BY cnt ORDER BY ProcTime() " + + "ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" + + "FROM " + + "(SELECT word, count(number) as cnt from T1 group by word) " + + val result = tEnv.sql(sqlQuery).toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + } + + // keyed groupby + over agg(unbounded, procTime, non-keyed) + @Test(expected = classOf[TableException]) + def testGroupByAndUnboundNonPartitionedProcessingWindowWithRow(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStateBackend(getStateBackend) + env.setParallelism(1) + val tEnv = TableEnvironment.getTableEnvironment(env) + + StreamITCase.testResults = mutable.MutableList() + + val t1 = env.fromCollection(data).toTable(tEnv).as('word, 'number) + + tEnv.registerTable("T1", t1) + + val sqlQuery = "SELECT word, cnt, count(word) " + + "OVER (ORDER BY ProcTime() ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" + + "FROM (SELECT word , count(number) as cnt from T1 group by word) " + + val result = tEnv.sql(sqlQuery).toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + } + + + + // groupby + window agg + @Test(expected = classOf[TableException]) + def testGroupByAndProcessingTimeSlidingGroupWindow(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) + env.setParallelism(1) + env.setStateBackend(getStateBackend) + + val stream = env.fromCollection(data) + val table = stream.toTable(tEnv, 'word, 'num) + val windowedTable = table + .groupBy('word) + .select('word as 'word, 'num.sum as 'count) + .window(Tumble over 2.rows as 'w) + .groupBy('w, 'count) + .select('count, 'word.count) + + val results = windowedTable.toDataStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + } + + // groupby + over agg(rowTime) + @Test(expected = classOf[TableException]) + def testEventTimeOverWindow(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + env.setStateBackend(getStateBackend) + env.setParallelism(1) + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + + StreamITCase.testResults = mutable.MutableList() + + val t1 = env.fromCollection(data).toTable(tEnv).as('word, 'number) + + tEnv.registerTable("T1", t1) + + val sqlQuery = "SELECT word, cnt, count(word) " + + "OVER (ORDER BY rowtime() ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" + + "FROM (SELECT word, count(number) as cnt from T1 group by word) " + + val result = tEnv.sql(sqlQuery).toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + } + + // groupby + over agg(bounded) + @Test(expected = classOf[TableException]) + def testBoundedOverWindow(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + env.setStateBackend(getStateBackend) + env.setParallelism(1) + env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) + + StreamITCase.testResults = mutable.MutableList() + + val t1 = env.fromCollection(data).toTable(tEnv).as('word, 'number) + + tEnv.registerTable("T1", t1) + + val sqlQuery = "SELECT word, cnt, count(word) " + + "OVER (ORDER BY ProcTime() ROWS BETWEEN 2 preceding AND CURRENT ROW)" + + "FROM (SELECT word, count(number) as cnt from T1 group by word) " + + val result = tEnv.sql(sqlQuery).toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala index cdc4329..c446d64 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala @@ -59,5 +59,5 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase { TestBaseUtils.compareResultsByLinesInMemory(expected, path) } - + } http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala index f826bba..6c75d53 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala @@ -25,6 +25,7 @@ import org.junit.Assert._ import scala.collection.mutable import org.apache.flink.streaming.api.functions.sink.RichSinkFunction +import org.apache.flink.table.runtime.types.CRow import scala.collection.JavaConverters._ @@ -44,7 +45,15 @@ object StreamITCase { final class StringSink extends RichSinkFunction[Row]() { def invoke(value: Row) { testResults.synchronized { - testResults += value.toString + testResults += value.toString + } + } + } + + final class StringSinkWithCRow extends RichSinkFunction[CRow]() { + def invoke(value: CRow) { + testResults.synchronized { + testResults += value.toString } } } http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala index 5e3e995..eadcfc8 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRangeProcessFunctionTest.scala @@ -33,6 +33,7 @@ import org.apache.flink.table.codegen.GeneratedAggregationsFunction import org.apache.flink.table.functions.AggregateFunction import org.apache.flink.table.functions.aggfunctions.{LongMaxWithRetractAggFunction, LongMinWithRetractAggFunction} import org.apache.flink.table.runtime.aggregate.BoundedProcessingOverRangeProcessFunctionTest._ +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.apache.flink.types.Row import org.junit.Test @@ -41,13 +42,13 @@ class BoundedProcessingOverRangeProcessFunctionTest { @Test def testProcTimePartitionedOverRange(): Unit = { - val rT = new RowTypeInfo(Array[TypeInformation[_]]( + val rT = new CRowTypeInfo(new RowTypeInfo(Array[TypeInformation[_]]( INT_TYPE_INFO, LONG_TYPE_INFO, INT_TYPE_INFO, STRING_TYPE_INFO, LONG_TYPE_INFO), - Array("a", "b", "c", "d", "e")) + Array("a", "b", "c", "d", "e"))) val aggregates = Array(new LongMinWithRetractAggFunction, @@ -183,14 +184,14 @@ class BoundedProcessingOverRangeProcessFunctionTest { val funcName = "BoundedOverAggregateHelper$33" val genAggFunction = GeneratedAggregationsFunction(funcName, funcCode) - val processFunction = new KeyedProcessOperator[String, Row, Row]( + val processFunction = new KeyedProcessOperator[String, CRow, CRow]( new ProcTimeBoundedRangeOver( genAggFunction, 1000, aggregationStateType, rT)) - val testHarness = new KeyedOneInputStreamOperatorTestHarness[JInt, Row, Row]( + val testHarness = new KeyedOneInputStreamOperatorTestHarness[JInt, CRow, CRow]( processFunction, new TupleRowSelector(0), BasicTypeInfo.INT_TYPE_INFO) @@ -201,26 +202,26 @@ class BoundedProcessingOverRangeProcessFunctionTest { testHarness.setProcessingTime(3) // key = 1 testHarness.processElement(new StreamRecord( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), 0)) + new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong), true), 0)) // key = 2 testHarness.processElement(new StreamRecord( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), 0)) + new CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong), true), 0)) // Time = 4 testHarness.setProcessingTime(4) // key = 1 testHarness.processElement(new StreamRecord( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), 0)) + new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong), true), 0)) testHarness.processElement(new StreamRecord( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), 0)) + new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong), true), 0)) // key = 2 testHarness.processElement(new StreamRecord( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), 0)) + new CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong), true), 0)) // Time = 5 testHarness.setProcessingTime(5) testHarness.processElement(new StreamRecord( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), 0)) + new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong), true), 0)) // Time = 6 testHarness.setProcessingTime(6) @@ -229,33 +230,33 @@ class BoundedProcessingOverRangeProcessFunctionTest { testHarness.setProcessingTime(1002) // key = 1 testHarness.processElement(new StreamRecord( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), 0)) + new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong), true), 0)) testHarness.processElement(new StreamRecord( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), 0)) + new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong), true), 0)) // key = 2 testHarness.processElement(new StreamRecord( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), 0)) + new CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong), true), 0)) // Time = 1003 testHarness.setProcessingTime(1003) testHarness.processElement(new StreamRecord( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), 0)) + new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong), true), 0)) // Time = 1004 testHarness.setProcessingTime(1004) testHarness.processElement(new StreamRecord( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), 0)) + new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong), true), 0)) // Time = 1005 testHarness.setProcessingTime(1005) // key = 1 testHarness.processElement(new StreamRecord( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), 0)) + new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong), true), 0)) testHarness.processElement(new StreamRecord( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), 0)) + new CRow(Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong), true), 0)) // key = 2 testHarness.processElement(new StreamRecord( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), 0)) + new CRow(Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong), true), 0)) testHarness.setProcessingTime(1006) @@ -264,34 +265,34 @@ class BoundedProcessingOverRangeProcessFunctionTest { val expectedOutput = new ConcurrentLinkedQueue[Object]() // all elements at the same proc timestamp have the same value - expectedOutput.add(new StreamRecord( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), 4)) - expectedOutput.add(new StreamRecord( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), 4)) - expectedOutput.add(new StreamRecord( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 3L: JLong), 5)) - expectedOutput.add(new StreamRecord( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), 5)) - expectedOutput.add(new StreamRecord( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), 5)) - expectedOutput.add(new StreamRecord( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), 6)) - expectedOutput.add(new StreamRecord( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 6L: JLong), 1003)) - expectedOutput.add(new StreamRecord( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), 1003)) - expectedOutput.add(new StreamRecord( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), 1003)) - expectedOutput.add(new StreamRecord( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), 1004)) - expectedOutput.add(new StreamRecord( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 2L: JLong, 8L: JLong), 1005)) - expectedOutput.add(new StreamRecord( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 4L: JLong, 10L: JLong), 1006)) - expectedOutput.add(new StreamRecord( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 4L: JLong, 10L: JLong), 1006)) - expectedOutput.add(new StreamRecord( - Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 30L: JLong, 40L: JLong), 1006)) + expectedOutput.add(new StreamRecord(new CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true), 4)) + expectedOutput.add(new StreamRecord(new CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true), 4)) + expectedOutput.add(new StreamRecord(new CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 2L: JLong, 1L: JLong, 3L: JLong), true), 5)) + expectedOutput.add(new StreamRecord(new CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true), 5)) + expectedOutput.add(new StreamRecord(new CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true), 5)) + expectedOutput.add(new StreamRecord(new CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true), 6)) + expectedOutput.add(new StreamRecord(new CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 5L: JLong, 1L: JLong, 6L: JLong), true), 1003)) + expectedOutput.add(new StreamRecord(new CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true), 1003)) + expectedOutput.add(new StreamRecord(new CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true), 1003)) + expectedOutput.add(new StreamRecord(new CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true), 1004)) + expectedOutput.add(new StreamRecord(new CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 8L: JLong, 2L: JLong, 8L: JLong), true), 1005)) + expectedOutput.add(new StreamRecord(new CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 9L: JLong, 4L: JLong, 10L: JLong), true), 1006)) + expectedOutput.add(new StreamRecord(new CRow( + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 10L: JLong, 4L: JLong, 10L: JLong), true), 1006)) + expectedOutput.add(new StreamRecord(new CRow( + Row.of(2: JInt, 0L: JLong, 0: JInt, "bbb", 40L: JLong, 30L: JLong, 40L: JLong), true), 1006)) TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, result, new RowResultSortComparator(6)) @@ -304,7 +305,7 @@ class BoundedProcessingOverRangeProcessFunctionTest { object BoundedProcessingOverRangeProcessFunctionTest { /** - * Return 0 for equal Rows and non zero for different rows + * Return 0 for equal CRows and non zero for different CRows */ class RowResultSortComparator(indexCounter: Int) extends Comparator[Object] with Serializable { @@ -314,8 +315,8 @@ class RowResultSortComparator(indexCounter: Int) extends Comparator[Object] with // watermark is not expected -1 } else { - val row1 = o1.asInstanceOf[StreamRecord[Row]].getValue - val row2 = o2.asInstanceOf[StreamRecord[Row]].getValue + val row1 = o1.asInstanceOf[StreamRecord[CRow]].getValue + val row2 = o2.asInstanceOf[StreamRecord[CRow]].getValue row1.toString.compareTo(row2.toString) } } @@ -325,10 +326,10 @@ class RowResultSortComparator(indexCounter: Int) extends Comparator[Object] with * Simple test class that returns a specified field as the selector function */ class TupleRowSelector( - private val selectorField:Int) extends KeySelector[Row, Integer] { + private val selectorField:Int) extends KeySelector[CRow, Integer] { - override def getKey(value: Row): Integer = { - value.getField(selectorField).asInstanceOf[Integer] + override def getKey(value: CRow): Integer = { + value.row.getField(selectorField).asInstanceOf[Integer] } } http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowComparatorTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowComparatorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowComparatorTest.scala new file mode 100644 index 0000000..e574084 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/types/CRowComparatorTest.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.types + +import java.lang.{Integer => JInt, Long => JLong} + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.common.typeutils.{ComparatorTestBase, TypeComparator, TypeSerializer} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.types.Row + +class CRowComparatorTest extends ComparatorTestBase[CRow] { + + val rowType = new RowTypeInfo( + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO + ) + + val cRowType = new CRowTypeInfo(rowType) + + override protected def createComparator(asc: Boolean): TypeComparator[CRow] = { + cRowType.createComparator( + Array[Int](0, 2), + Array[Boolean](asc, asc), + 0, + new ExecutionConfig + ) + } + + override protected def createSerializer(): TypeSerializer[CRow] = + cRowType.createSerializer(new ExecutionConfig) + + override protected def getSortedTestData: Array[CRow] = Array[CRow]( + new CRow(Row.of(new JInt(1), "Hello", new JLong(1L)), true), + new CRow(Row.of(new JInt(1), "Hello", new JLong(2L)), true), + new CRow(Row.of(new JInt(2), "Hello", new JLong(2L)), false), + new CRow(Row.of(new JInt(2), "Hello", new JLong(3L)), true), + new CRow(Row.of(new JInt(3), "World", new JLong(0L)), false), + new CRow(Row.of(new JInt(4), "Hello", new JLong(0L)), true), + new CRow(Row.of(new JInt(5), "Hello", new JLong(1L)), true), + new CRow(Row.of(new JInt(5), "Hello", new JLong(4L)), false) + ) +} http://git-wip-us.apache.org/repos/asf/flink/blob/856485be/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala index c5e13a1..79e957a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala @@ -18,7 +18,9 @@ package org.apache.flink.table.utils +import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.tools.RuleSet +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment} import org.apache.flink.table.sinks.TableSink import org.apache.flink.table.sources.TableSource @@ -36,4 +38,10 @@ class MockTableEnvironment extends TableEnvironment(new TableConfig) { override protected def getBuiltInNormRuleSet: RuleSet = ??? override protected def getBuiltInPhysicalOptRuleSet: RuleSet = ??? + + override protected def getConversionMapper[IN, OUT]( + physicalTypeInfo: TypeInformation[IN], + logicalRowType: RelDataType, + requestedTypeInfo: TypeInformation[OUT], + functionName: String) = ??? }