Repository: flink Updated Branches: refs/heads/master 760f0bc23 -> 9a2ba6e05
http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala index 9e84607..a9e9632 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSourceITCase.scala @@ -18,26 +18,60 @@ package org.apache.flink.table.runtime.stream.table +import java.lang.{Boolean => JBool, Integer => JInt, Long => JLong} + import org.apache.calcite.runtime.SqlFunctions.{internalToTimestamp => toTimestamp} import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo} import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JExecEnv} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase -import org.apache.flink.table.api.{TableEnvironment, Types} +import org.apache.flink.table.api.{TableEnvironment, TableException, TableSchema, Types} import org.apache.flink.table.api.scala._ import org.apache.flink.table.runtime.utils.{CommonTestData, StreamITCase} -import org.apache.flink.table.utils.{TestFilterableTableSource, TestTableSourceWithTime} +import org.apache.flink.table.sources.StreamTableSource +import org.apache.flink.table.utils._ import org.apache.flink.types.Row import org.junit.Assert._ import org.junit.Test -import java.lang.{Integer => JInt, Long => JLong} +import scala.collection.JavaConverters._ import scala.collection.mutable class TableSourceITCase extends StreamingMultipleProgramsTestBase { + @Test(expected = classOf[TableException]) + def testInvalidDatastreamType(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val tableSource = new StreamTableSource[Row]() { + private val fieldNames: Array[String] = Array("name", "id", "value") + private val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.LONG, Types.INT) + .asInstanceOf[Array[TypeInformation[_]]] + + override def getDataStream(execEnv: JExecEnv): DataStream[Row] = { + val data = List(Row.of("Mary", new JLong(1L), new JInt(1))).asJava + // return DataStream[Row] with GenericTypeInfo + execEnv.fromCollection(data, new GenericTypeInfo[Row](classOf[Row])) + } + override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes, fieldNames) + override def getTableSchema: TableSchema = new TableSchema(fieldNames, fieldTypes) + } + tEnv.registerTableSource("T", tableSource) + + tEnv.scan("T") + .select('value, 'name) + .addSink(new StreamITCase.StringSink[Row]) + env.execute() + + // test should fail because type info of returned DataStream does not match type return type + // info. + } + @Test def testCsvTableSource(): Unit = { @@ -56,7 +90,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase { env.execute() - val expected = mutable.MutableList( + val expected = Seq( "Williams,69.0", "Miller,13.56", "Smith,180.2", @@ -78,13 +112,12 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase { env.execute() - val expected = mutable.MutableList( - "5,Record_5", "6,Record_6", "7,Record_7", "8,Record_8") + val expected = Seq("5,Record_5", "6,Record_6", "7,Record_7", "8,Record_8") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } @Test - def testRowtimeTableSource(): Unit = { + def testRowtimeRowTableSource(): Unit = { StreamITCase.testResults = mutable.MutableList() val tableName = "MyTable" val env = StreamExecutionEnvironment.getExecutionEnvironment @@ -96,11 +129,15 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase { Row.of("Bob", new JLong(2L), new JInt(20)), Row.of("Mary", new JLong(2L), new JInt(30)), Row.of("Liz", new JLong(2001L), new JInt(40))) + + val fieldNames = Array("name", "rtime", "amount") + val schema = new TableSchema(fieldNames, Array(Types.STRING, Types.SQL_TIMESTAMP, Types.INT)) val rowType = new RowTypeInfo( Array(Types.STRING, Types.LONG, Types.INT).asInstanceOf[Array[TypeInformation[_]]], - Array("name", "rtime", "amount")) + fieldNames) - tEnv.registerTableSource(tableName, new TestTableSourceWithTime(data, rowType, "rtime", null)) + val tableSource = new TestTableSourceWithTime(schema, rowType, data, "rtime", null) + tEnv.registerTableSource(tableName, tableSource) tEnv.scan(tableName) .window(Tumble over 1.second on 'rtime as 'w) @@ -109,7 +146,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase { .addSink(new StreamITCase.StringSink[Row]) env.execute() - val expected = mutable.MutableList( + val expected = Seq( "Mary,1970-01-01 00:00:00.0,40", "Bob,1970-01-01 00:00:00.0,20", "Liz,1970-01-01 00:00:02.0,40") @@ -117,7 +154,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase { } @Test - def testProctimeTableSource(): Unit = { + def testProctimeRowTableSource(): Unit = { StreamITCase.testResults = mutable.MutableList() val tableName = "MyTable" val env = StreamExecutionEnvironment.getExecutionEnvironment @@ -129,11 +166,17 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase { Row.of("Bob", new JLong(2L), new JInt(20)), Row.of("Mary", new JLong(2L), new JInt(30)), Row.of("Liz", new JLong(2001L), new JInt(40))) + + val fieldNames = Array("name", "rtime", "amount") + val schema = new TableSchema( + fieldNames :+ "ptime", + Array(Types.STRING, Types.LONG, Types.INT, Types.SQL_TIMESTAMP)) val rowType = new RowTypeInfo( Array(Types.STRING, Types.LONG, Types.INT).asInstanceOf[Array[TypeInformation[_]]], - Array("name", "rtime", "amount")) + fieldNames) - tEnv.registerTableSource(tableName, new TestTableSourceWithTime(data, rowType, null, "ptime")) + val tableSource = new TestTableSourceWithTime(schema, rowType, data, null, "ptime") + tEnv.registerTableSource(tableName, tableSource) tEnv.scan(tableName) .where('ptime.cast(Types.LONG) > 0L) @@ -141,7 +184,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase { .addSink(new StreamITCase.StringSink[Row]) env.execute() - val expected = mutable.MutableList( + val expected = Seq( "Mary,10", "Bob,20", "Mary,30", @@ -150,7 +193,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase { } @Test - def testRowtimeProctimeTableSource(): Unit = { + def testRowtimeProctimeRowTableSource(): Unit = { StreamITCase.testResults = mutable.MutableList() val tableName = "MyTable" val env = StreamExecutionEnvironment.getExecutionEnvironment @@ -162,13 +205,17 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase { Row.of("Bob", new JLong(2L), new JInt(20)), Row.of("Mary", new JLong(2L), new JInt(30)), Row.of("Liz", new JLong(2001L), new JInt(40))) + + val fieldNames = Array("name", "rtime", "amount") + val schema = new TableSchema( + fieldNames :+ "ptime", + Array(Types.STRING, Types.SQL_TIMESTAMP, Types.INT, Types.SQL_TIMESTAMP)) val rowType = new RowTypeInfo( Array(Types.STRING, Types.LONG, Types.INT).asInstanceOf[Array[TypeInformation[_]]], - Array("name", "rtime", "amount")) + fieldNames) - tEnv.registerTableSource( - tableName, - new TestTableSourceWithTime(data, rowType, "rtime", "ptime")) + val tableSource = new TestTableSourceWithTime(schema, rowType, data, "rtime", "ptime") + tEnv.registerTableSource(tableName, tableSource) tEnv.scan(tableName) .window(Tumble over 1.second on 'rtime as 'w) @@ -177,7 +224,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase { .addSink(new StreamITCase.StringSink[Row]) env.execute() - val expected = mutable.MutableList( + val expected = Seq( "Mary,1970-01-01 00:00:00.0,40", "Bob,1970-01-01 00:00:00.0,20", "Liz,1970-01-01 00:00:02.0,40") @@ -185,7 +232,7 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase { } @Test - def testRowtimeAsTimestampTableSource(): Unit = { + def testRowtimeAsTimestampRowTableSource(): Unit = { StreamITCase.testResults = mutable.MutableList() val tableName = "MyTable" val env = StreamExecutionEnvironment.getExecutionEnvironment @@ -197,11 +244,141 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase { Row.of("Bob", toTimestamp(2L), new JInt(20)), Row.of("Mary", toTimestamp(2L), new JInt(30)), Row.of("Liz", toTimestamp(2001L), new JInt(40))) + + val fieldNames = Array("name", "rtime", "amount") + val schema = new TableSchema(fieldNames, Array(Types.STRING, Types.SQL_TIMESTAMP, Types.INT)) val rowType = new RowTypeInfo( Array(Types.STRING, Types.SQL_TIMESTAMP, Types.INT).asInstanceOf[Array[TypeInformation[_]]], - Array("name", "rtime", "amount")) + fieldNames) + + val tableSource = new TestTableSourceWithTime(schema, rowType, data, "rtime", null) + tEnv.registerTableSource(tableName, tableSource) + + tEnv.scan(tableName) + .window(Tumble over 1.second on 'rtime as 'w) + .groupBy('name, 'w) + .select('name, 'w.start, 'amount.sum) + .addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = Seq( + "Mary,1970-01-01 00:00:00.0,40", + "Bob,1970-01-01 00:00:00.0,20", + "Liz,1970-01-01 00:00:02.0,40") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testRowtimeLongTableSource(): Unit = { + StreamITCase.testResults = mutable.MutableList() + val tableName = "MyTable" + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + + val data = Seq(new JLong(1L), new JLong(2L), new JLong(2L), new JLong(2001L), new JLong(4001L)) + + val schema = new TableSchema(Array("rtime"), Array(Types.SQL_TIMESTAMP)) + val returnType = Types.LONG + + val tableSource = new TestTableSourceWithTime(schema, returnType, data, "rtime", null) + tEnv.registerTableSource(tableName, tableSource) + + tEnv.scan(tableName) + .window(Tumble over 1.second on 'rtime as 'w) + .groupBy('w) + .select('w.start, 1.count) + .addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:00.0,3", + "1970-01-01 00:00:02.0,1", + "1970-01-01 00:00:04.0,1") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testProctimeStringTableSource(): Unit = { + StreamITCase.testResults = mutable.MutableList() + val tableName = "MyTable" + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + + val data = Seq("Mary", "Peter", "Bob", "Liz") + + val schema = new TableSchema(Array("name", "ptime"), Array(Types.STRING, Types.SQL_TIMESTAMP)) + val returnType = Types.STRING + + val tableSource = new TestTableSourceWithTime(schema, returnType, data, null, "ptime") + tEnv.registerTableSource(tableName, tableSource) + + tEnv.scan(tableName) + .where('ptime.cast(Types.LONG) > 1) + .select('name) + .addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = Seq("Mary", "Peter", "Bob", "Liz") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testRowtimeProctimeLongTableSource(): Unit = { + StreamITCase.testResults = mutable.MutableList() + val tableName = "MyTable" + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + + val data = Seq(new JLong(1L), new JLong(2L), new JLong(2L), new JLong(2001L), new JLong(4001L)) - tEnv.registerTableSource(tableName, new TestTableSourceWithTime(data, rowType, "rtime", null)) + val schema = new TableSchema( + Array("rtime", "ptime"), + Array(Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP)) + val returnType = Types.LONG + + val tableSource = new TestTableSourceWithTime(schema, returnType, data, "rtime", "ptime") + tEnv.registerTableSource(tableName, tableSource) + + tEnv.scan(tableName) + .where('ptime.cast(Types.LONG) > 1) + .window(Tumble over 1.second on 'rtime as 'w) + .groupBy('w) + .select('w.start, 1.count) + .addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:00.0,3", + "1970-01-01 00:00:02.0,1", + "1970-01-01 00:00:04.0,1") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testFieldMappingTableSource(): Unit = { + StreamITCase.testResults = mutable.MutableList() + val tableName = "MyTable" + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + + val data = Seq( + Row.of("Mary", new JLong(1L), new JInt(10)), + Row.of("Bob", new JLong(2L), new JInt(20)), + Row.of("Mary", new JLong(2L), new JInt(30)), + Row.of("Liz", new JLong(2001L), new JInt(40))) + + val schema = new TableSchema( + Array("ptime", "amount", "name", "rtime"), + Array(Types.SQL_TIMESTAMP, Types.INT, Types.STRING, Types.SQL_TIMESTAMP)) + val returnType = new RowTypeInfo(Types.STRING, Types.LONG, Types.INT) + val mapping = Map("amount" -> "f2", "name" -> "f0", "rtime" -> "f1") + + val source = new TestTableSourceWithTime(schema, returnType, data, "rtime", "ptime", mapping) + tEnv.registerTableSource(tableName, source) tEnv.scan(tableName) .window(Tumble over 1.second on 'rtime as 'w) @@ -210,11 +387,307 @@ class TableSourceITCase extends StreamingMultipleProgramsTestBase { .addSink(new StreamITCase.StringSink[Row]) env.execute() - val expected = mutable.MutableList( + val expected = Seq( "Mary,1970-01-01 00:00:00.0,40", "Bob,1970-01-01 00:00:00.0,20", "Liz,1970-01-01 00:00:02.0,40") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + @Test + def testProjectWithoutRowtimeProctime(): Unit = { + StreamITCase.testResults = mutable.MutableList() + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + + val data = Seq( + Row.of(new JInt(1), "Mary", new JLong(10L), new JLong(1)), + Row.of(new JInt(2), "Bob", new JLong(20L), new JLong(2)), + Row.of(new JInt(3), "Mike", new JLong(30L), new JLong(2)), + Row.of(new JInt(4), "Liz", new JLong(40L), new JLong(2001))) + + val tableSchema = new TableSchema( + Array("id", "rtime", "val", "ptime", "name"), + Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING)) + val returnType = new RowTypeInfo( + Array(Types.INT, Types.STRING, Types.LONG, Types.LONG) + .asInstanceOf[Array[TypeInformation[_]]], + Array("id", "name", "val", "rtime")) + + tEnv.registerTableSource( + "T", + new TestProjectableTableSource(tableSchema, returnType, data, "rtime", "ptime")) + + tEnv.scan("T") + .select('name, 'val, 'id) + .addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = Seq( + "Mary,10,1", + "Bob,20,2", + "Mike,30,3", + "Liz,40,4") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testProjectWithoutProctime(): Unit = { + StreamITCase.testResults = mutable.MutableList() + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + + val data = Seq( + Row.of(new JInt(1), "Mary", new JLong(10L), new JLong(1)), + Row.of(new JInt(2), "Bob", new JLong(20L), new JLong(2)), + Row.of(new JInt(3), "Mike", new JLong(30L), new JLong(2)), + Row.of(new JInt(4), "Liz", new JLong(40L), new JLong(2001))) + + val tableSchema = new TableSchema( + Array("id", "rtime", "val", "ptime", "name"), + Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING)) + val returnType = new RowTypeInfo( + Array(Types.INT, Types.STRING, Types.LONG, Types.LONG) + .asInstanceOf[Array[TypeInformation[_]]], + Array("id", "name", "val", "rtime")) + + tEnv.registerTableSource( + "T", + new TestProjectableTableSource(tableSchema, returnType, data, "rtime", "ptime")) + + tEnv.scan("T") + .select('rtime, 'name, 'id) + .addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:00.001,Mary,1", + "1970-01-01 00:00:00.002,Bob,2", + "1970-01-01 00:00:00.002,Mike,3", + "1970-01-01 00:00:02.001,Liz,4") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testProjectWithoutRowtime(): Unit = { + StreamITCase.testResults = mutable.MutableList() + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + + val data = Seq( + Row.of(new JInt(1), "Mary", new JLong(10L), new JLong(1)), + Row.of(new JInt(2), "Bob", new JLong(20L), new JLong(2)), + Row.of(new JInt(3), "Mike", new JLong(30L), new JLong(2)), + Row.of(new JInt(4), "Liz", new JLong(40L), new JLong(2001))) + + val tableSchema = new TableSchema( + Array("id", "rtime", "val", "ptime", "name"), + Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING)) + val returnType = new RowTypeInfo( + Array(Types.INT, Types.STRING, Types.LONG, Types.LONG) + .asInstanceOf[Array[TypeInformation[_]]], + Array("id", "name", "val", "rtime")) + + tEnv.registerTableSource( + "T", + new TestProjectableTableSource(tableSchema, returnType, data, "rtime", "ptime")) + + tEnv.scan("T") + .filter('ptime.cast(Types.LONG) > 0) + .select('name, 'id) + .addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = Seq( + "Mary,1", + "Bob,2", + "Mike,3", + "Liz,4") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + def testProjectOnlyProctime(): Unit = { + StreamITCase.testResults = mutable.MutableList() + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + + val data = Seq( + Row.of(new JInt(1), new JLong(1), new JLong(10L), "Mary"), + Row.of(new JInt(2), new JLong(2L), new JLong(20L), "Bob"), + Row.of(new JInt(3), new JLong(2L), new JLong(30L), "Mike"), + Row.of(new JInt(4), new JLong(2001L), new JLong(30L), "Liz")) + + val tableSchema = new TableSchema( + Array("id", "rtime", "val", "ptime", "name"), + Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING)) + val returnType = new RowTypeInfo( + Array(Types.INT, Types.LONG, Types.LONG, Types.STRING) + .asInstanceOf[Array[TypeInformation[_]]], + Array("id", "rtime", "val", "name")) + + tEnv.registerTableSource( + "T", + new TestProjectableTableSource(tableSchema, returnType, data, "rtime", "ptime")) + + tEnv.scan("T") + .select('ptime > 0) + .select(1.count) + .addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = Seq("4") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + def testProjectOnlyRowtime(): Unit = { + StreamITCase.testResults = mutable.MutableList() + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + + val data = Seq( + Row.of(new JInt(1), new JLong(1), new JLong(10L), "Mary"), + Row.of(new JInt(2), new JLong(2L), new JLong(20L), "Bob"), + Row.of(new JInt(3), new JLong(2L), new JLong(30L), "Mike"), + Row.of(new JInt(4), new JLong(2001L), new JLong(30L), "Liz")) + + val tableSchema = new TableSchema( + Array("id", "rtime", "val", "ptime", "name"), + Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING)) + val returnType = new RowTypeInfo( + Array(Types.INT, Types.LONG, Types.LONG, Types.STRING) + .asInstanceOf[Array[TypeInformation[_]]], + Array("id", "rtime", "val", "name")) + + tEnv.registerTableSource( + "T", + new TestProjectableTableSource(tableSchema, returnType, data, "rtime", "ptime")) + + tEnv.scan("T") + .select('rtime) + .addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:00.001", + "1970-01-01 00:00:00.002", + "1970-01-01 00:00:00.002", + "1970-01-01 00:00:02.001") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testProjectWithMapping(): Unit = { + StreamITCase.testResults = mutable.MutableList() + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + + val data = Seq( + Row.of(new JLong(1), new JInt(1), "Mary", new JLong(10)), + Row.of(new JLong(2), new JInt(2), "Bob", new JLong(20)), + Row.of(new JLong(2), new JInt(3), "Mike", new JLong(30)), + Row.of(new JLong(2001), new JInt(4), "Liz", new JLong(40))) + + val tableSchema = new TableSchema( + Array("id", "rtime", "val", "ptime", "name"), + Array(Types.INT, Types.SQL_TIMESTAMP, Types.LONG, Types.SQL_TIMESTAMP, Types.STRING)) + val returnType = new RowTypeInfo( + Array(Types.LONG, Types.INT, Types.STRING, Types.LONG) + .asInstanceOf[Array[TypeInformation[_]]], + Array("p-rtime", "p-id", "p-name", "p-val")) + val mapping = Map("rtime" -> "p-rtime", "id" -> "p-id", "val" -> "p-val", "name" -> "p-name") + + tEnv.registerTableSource( + "T", + new TestProjectableTableSource(tableSchema, returnType, data, "rtime", "ptime", mapping)) + + tEnv.scan("T") + .select('name, 'rtime, 'val) + .addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = Seq( + "Mary,1970-01-01 00:00:00.001,10", + "Bob,1970-01-01 00:00:00.002,20", + "Mike,1970-01-01 00:00:00.002,30", + "Liz,1970-01-01 00:00:02.001,40") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testNestedProject(): Unit = { + + StreamITCase.testResults = mutable.MutableList() + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + + val data = Seq( + Row.of(new JLong(1), + Row.of( + Row.of("Sarah", new JInt(100)), + Row.of(new JInt(1000), new JBool(true)) + ), + Row.of("Peter", new JInt(10000)), + "Mary"), + Row.of(new JLong(2), + Row.of( + Row.of("Rob", new JInt(200)), + Row.of(new JInt(2000), new JBool(false)) + ), + Row.of("Lucy", new JInt(20000)), + "Bob"), + Row.of(new JLong(3), + Row.of( + Row.of("Mike", new JInt(300)), + Row.of(new JInt(3000), new JBool(true)) + ), + Row.of("Betty", new JInt(30000)), + "Liz")) + + val nested1 = new RowTypeInfo( + Array(Types.STRING, Types.INT).asInstanceOf[Array[TypeInformation[_]]], + Array("name", "value") + ) + val nested2 = new RowTypeInfo( + Array(Types.INT, Types.BOOLEAN).asInstanceOf[Array[TypeInformation[_]]], + Array("num", "flag") + ) + val deepNested = new RowTypeInfo( + Array(nested1, nested2).asInstanceOf[Array[TypeInformation[_]]], + Array("nested1", "nested2") + ) + val tableSchema = new TableSchema( + Array("id", "deepNested", "nested", "name"), + Array(Types.LONG, deepNested, nested1, Types.STRING)) + + val returnType = new RowTypeInfo( + Array(Types.LONG, deepNested, nested1, Types.STRING).asInstanceOf[Array[TypeInformation[_]]], + Array("id", "deepNested", "nested", "name")) + + tEnv.registerTableSource( + "T", + new TestNestedProjectableTableSource(tableSchema, returnType, data)) + + tEnv + .scan("T") + .select('id, + 'deepNested.get("nested1").get("name") as 'nestedName, + 'nested.get("value") as 'nestedValue, + 'deepNested.get("nested2").get("flag") as 'nestedFlag, + 'deepNested.get("nested2").get("num") as 'nestedNum) + .addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = Seq( + "1,Sarah,10000,true,1000", + "2,Rob,20000,false,2000", + "3,Mike,30000,true,3000") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala index d716d5d..e8568ca 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala @@ -160,6 +160,8 @@ object CommonTestData { override def getReturnType: TypeInformation[Person] = { TypeExtractor.getForClass(classOf[Person]) } + + override def getTableSchema: TableSchema = TableSchema.fromTypeInfo(getReturnType) } } http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala deleted file mode 100644 index fb99864..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.utils - -import java.util.{List => JList} - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} -import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment -import org.apache.flink.table.api.Types._ -import org.apache.flink.table.expressions._ -import org.apache.flink.table.sources.{BatchTableSource, FilterableTableSource, StreamTableSource, TableSource} -import org.apache.flink.types.Row -import org.apache.flink.util.Preconditions - -import scala.collection.JavaConverters._ -import scala.collection.mutable - -/** - * This source can only handle simple comparision with field "amount". - * Supports ">, <, >=, <=, =, <>" with an integer. - */ -class TestFilterableTableSource( - val recordNum: Int = 33) - extends BatchTableSource[Row] - with StreamTableSource[Row] - with FilterableTableSource[Row] { - - var filterPushedDown: Boolean = false - - val fieldNames: Array[String] = Array("name", "id", "amount", "price") - - val fieldTypes: Array[TypeInformation[_]] = Array(STRING, LONG, INT, DOUBLE) - - // all predicates with field "amount" - private var filterPredicates = new mutable.ArrayBuffer[Expression] - - // all comparing values for field "amount" - private val filterValues = new mutable.ArrayBuffer[Int] - - override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = { - execEnv.fromCollection[Row](generateDynamicCollection().asJava, getReturnType) - } - - override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = { - execEnv.fromCollection[Row](generateDynamicCollection().asJava, getReturnType) - } - - override def explainSource(): String = { - if (filterPredicates.nonEmpty) { - s"filter=[${filterPredicates.reduce((l, r) => And(l, r)).toString}]" - } else { - "" - } - } - - override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes, fieldNames) - - override def applyPredicate(predicates: JList[Expression]): TableSource[Row] = { - val newSource = new TestFilterableTableSource(recordNum) - newSource.filterPushedDown = true - - val iterator = predicates.iterator() - while (iterator.hasNext) { - iterator.next() match { - case expr: BinaryComparison => - (expr.left, expr.right) match { - case (f: ResolvedFieldReference, v: Literal) if f.name.equals("amount") => - newSource.filterPredicates += expr - newSource.filterValues += v.value.asInstanceOf[Number].intValue() - iterator.remove() - case (_, _) => - } - case _ => - } - } - - newSource - } - - override def isFilterPushedDown: Boolean = filterPushedDown - - private def generateDynamicCollection(): Seq[Row] = { - Preconditions.checkArgument(filterPredicates.length == filterValues.length) - - for { - cnt <- 0 until recordNum - if shouldCreateRow(cnt) - } yield { - Row.of( - s"Record_$cnt", - cnt.toLong.asInstanceOf[Object], - cnt.toInt.asInstanceOf[Object], - cnt.toDouble.asInstanceOf[Object]) - } - } - - private def shouldCreateRow(value: Int): Boolean = { - filterPredicates.zip(filterValues).forall { - case (_: GreaterThan, v) => - value > v - case (_: LessThan, v) => - value < v - case (_: GreaterThanOrEqual, v) => - value >= v - case (_: LessThanOrEqual, v) => - value <= v - case (_: EqualTo, v) => - value == v - case (_: NotEqualTo, v) => - value != v - case (expr, _) => - throw new RuntimeException(expr + " not supported!") - } - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestTableSourceWithTime.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestTableSourceWithTime.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestTableSourceWithTime.scala deleted file mode 100644 index 7d6919e..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestTableSourceWithTime.scala +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.utils - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment -import org.apache.flink.table.sources._ -import org.apache.flink.types.Row - -import scala.collection.JavaConverters._ - -class TestTableSourceWithTime( - rows: Seq[Row], - rowType: RowTypeInfo, - rowtime: String, - proctime: String) - extends StreamTableSource[Row] - with DefinedRowtimeAttribute - with DefinedProctimeAttribute { - - override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = { - - // The source deliberately does not assign timestamps and watermarks. - // If a rowtime field is configured, the field carries the timestamp. - // The FromElementsFunction sends out a Long.MaxValue watermark when all rows are emitted. - execEnv.fromCollection(rows.asJava, rowType) - } - - override def getRowtimeAttribute: String = rowtime - - override def getProctimeAttribute: String = proctime - - override def getReturnType: TypeInformation[Row] = rowType -} http://git-wip-us.apache.org/repos/asf/flink/blob/9a2ba6e0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala new file mode 100644 index 0000000..c52bff8 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils + +import java.util +import java.util.{Collections, List => JList} + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.table.api.Types.{DOUBLE, INT, LONG, STRING} +import org.apache.flink.table.api.TableSchema +import org.apache.flink.table.expressions._ +import org.apache.flink.table.sources._ +import org.apache.flink.types.Row +import org.apache.flink.util.Preconditions + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +class TestTableSourceWithTime[T]( + tableSchema: TableSchema, + returnType: TypeInformation[T], + values: Seq[T], + rowtime: String = null, + proctime: String = null, + mapping: Map[String, String] = null) + extends StreamTableSource[T] + with BatchTableSource[T] + with DefinedRowtimeAttributes + with DefinedProctimeAttribute + with DefinedFieldMapping { + + override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[T] = { + execEnv.fromCollection(values.asJava, returnType) + } + + override def getDataSet(execEnv: ExecutionEnvironment): DataSet[T] = { + execEnv.fromCollection(values.asJava, returnType) + } + + override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = { + // return a RowtimeAttributeDescriptor if rowtime attribute is defined + if (rowtime != null) { + Collections.singletonList(new RowtimeAttributeDescriptor( + rowtime, + new ExistingField(rowtime), + new AscendingWatermarks)) + } else { + Collections.EMPTY_LIST.asInstanceOf[util.List[RowtimeAttributeDescriptor]] + } + } + + override def getProctimeAttribute: String = proctime + + override def getReturnType: TypeInformation[T] = returnType + + override def getTableSchema: TableSchema = tableSchema + + override def getFieldMapping: util.Map[String, String] = { + if (mapping != null) { + mapping.asJava + } else { + null + } + } +} + +class TestProjectableTableSource( + tableSchema: TableSchema, + returnType: TypeInformation[Row], + values: Seq[Row], + rowtime: String = null, + proctime: String = null, + fieldMapping: Map[String, String] = null) + extends TestTableSourceWithTime[Row]( + tableSchema, + returnType, + values, + rowtime, + proctime, + fieldMapping) + with ProjectableTableSource[Row] { + + override def projectFields(fields: Array[Int]): TableSource[Row] = { + + val rowType = returnType.asInstanceOf[RowTypeInfo] + + val (projectedNames: Array[String], projectedMapping) = if (fieldMapping == null) { + val projectedNames = fields.map(rowType.getFieldNames.apply(_)) + (projectedNames, null) + } else { + val invertedMapping = fieldMapping.map(_.swap) + val projectedNames = fields.map(rowType.getFieldNames.apply(_)) + + val projectedMapping: Map[String, String] = projectedNames.map{ f => + val logField = invertedMapping(f) + logField -> s"remapped-$f" + }.toMap + val renamedNames = projectedNames.map(f => s"remapped-$f") + (renamedNames, projectedMapping) + } + + val projectedTypes = fields.map(rowType.getFieldTypes.apply(_)) + val projectedReturnType = new RowTypeInfo( + projectedTypes.asInstanceOf[Array[TypeInformation[_]]], + projectedNames) + + val projectedValues = values.map { fromRow => + val pRow = new Row(fields.length) + fields.zipWithIndex.foreach{ case (from, to) => pRow.setField(to, fromRow.getField(from)) } + pRow + } + + new TestProjectableTableSource( + tableSchema, + projectedReturnType, + projectedValues, + rowtime, + proctime, + projectedMapping) + } + + override def explainSource(): String = { + s"TestSource(" + + s"physical fields: ${getReturnType.asInstanceOf[RowTypeInfo].getFieldNames.mkString(", ")})" + } +} + +/** + * This source can only handle simple comparision with field "amount". + * Supports ">, <, >=, <=, =, <>" with an integer. + */ +class TestFilterableTableSource( + val recordNum: Int = 33) + extends BatchTableSource[Row] + with StreamTableSource[Row] + with FilterableTableSource[Row] { + + var filterPushedDown: Boolean = false + + val fieldNames: Array[String] = Array("name", "id", "amount", "price") + + val fieldTypes: Array[TypeInformation[_]] = Array(STRING, LONG, INT, DOUBLE) + + // all predicates with field "amount" + private var filterPredicates = new mutable.ArrayBuffer[Expression] + + // all comparing values for field "amount" + private val filterValues = new mutable.ArrayBuffer[Int] + + override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = { + execEnv.fromCollection[Row](generateDynamicCollection().asJava, getReturnType) + } + + override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = { + execEnv.fromCollection[Row](generateDynamicCollection().asJava, getReturnType) + } + + override def explainSource(): String = { + if (filterPredicates.nonEmpty) { + s"filter=[${filterPredicates.reduce((l, r) => And(l, r)).toString}]" + } else { + "" + } + } + + override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes, fieldNames) + + override def applyPredicate(predicates: JList[Expression]): TableSource[Row] = { + val newSource = new TestFilterableTableSource(recordNum) + newSource.filterPushedDown = true + + val iterator = predicates.iterator() + while (iterator.hasNext) { + iterator.next() match { + case expr: BinaryComparison => + (expr.left, expr.right) match { + case (f: ResolvedFieldReference, v: Literal) if f.name.equals("amount") => + newSource.filterPredicates += expr + newSource.filterValues += v.value.asInstanceOf[Number].intValue() + iterator.remove() + case (_, _) => + } + case _ => + } + } + + newSource + } + + override def isFilterPushedDown: Boolean = filterPushedDown + + private def generateDynamicCollection(): Seq[Row] = { + Preconditions.checkArgument(filterPredicates.length == filterValues.length) + + for { + cnt <- 0 until recordNum + if shouldCreateRow(cnt) + } yield { + Row.of( + s"Record_$cnt", + cnt.toLong.asInstanceOf[Object], + cnt.toInt.asInstanceOf[Object], + cnt.toDouble.asInstanceOf[Object]) + } + } + + private def shouldCreateRow(value: Int): Boolean = { + filterPredicates.zip(filterValues).forall { + case (_: GreaterThan, v) => + value > v + case (_: LessThan, v) => + value < v + case (_: GreaterThanOrEqual, v) => + value >= v + case (_: LessThanOrEqual, v) => + value <= v + case (_: EqualTo, v) => + value == v + case (_: NotEqualTo, v) => + value != v + case (expr, _) => + throw new RuntimeException(expr + " not supported!") + } + } + + override def getTableSchema: TableSchema = new TableSchema(fieldNames, fieldTypes) +} + +class TestNestedProjectableTableSource( + tableSchema: TableSchema, + returnType: TypeInformation[Row], + values: Seq[Row], + rowtime: String = null, + proctime: String = null) + extends TestTableSourceWithTime[Row]( + tableSchema, + returnType, + values, + rowtime, + proctime, + null) + with NestedFieldsProjectableTableSource[Row] { + + var readNestedFields: Seq[String] = tableSchema.getColumnNames.map(f => s"$f.*") + + override def projectNestedFields( + fields: Array[Int], + nestedFields: Array[Array[String]]): TableSource[Row] = { + + val rowType = returnType.asInstanceOf[RowTypeInfo] + + val projectedNames = fields.map(rowType.getFieldNames.apply(_)) + val projectedTypes = fields.map(rowType.getFieldTypes.apply(_)) + + val projectedReturnType = new RowTypeInfo( + projectedTypes.asInstanceOf[Array[TypeInformation[_]]], + projectedNames) + + // update read nested fields + val newReadNestedFields = projectedNames.zip(nestedFields) + .flatMap(f => f._2.map(n => s"${f._1}.$n")) + + val projectedValues = values.map { fromRow => + val pRow = new Row(fields.length) + fields.zipWithIndex.foreach{ case (from, to) => pRow.setField(to, fromRow.getField(from)) } + pRow + } + + val copy = new TestNestedProjectableTableSource( + tableSchema, + projectedReturnType, + projectedValues, + rowtime, + proctime) + copy.readNestedFields = newReadNestedFields + + copy + } + + override def explainSource(): String = { + s"TestSource(" + + s"read nested fields: ${readNestedFields.mkString(", ")})" + } +}
