Repository: flink Updated Branches: refs/heads/master e0ab5f524 -> 6cd98a9b6
[FLINK-6476] [table] Add support to convert DataSet[Row] and DataStream[Row] to Table. This closes #3849. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6cd98a9b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6cd98a9b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6cd98a9b Branch: refs/heads/master Commit: 6cd98a9b6f223bc665831e4ff3626c98aed9b272 Parents: b5ddbe5 Author: rtudoran <tudoranr...@ymail.com> Authored: Mon May 8 20:30:57 2017 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Tue May 9 18:50:20 2017 +0200 ---------------------------------------------------------------------- .../flink/table/api/TableEnvironment.scala | 18 +++++++++ .../table/api/java/stream/sql/SqlITCase.java | 42 ++++++++++++++++++++ .../flink/table/TableEnvironmentTest.scala | 22 ++++++++++ .../table/api/scala/stream/sql/SqlITCase.scala | 36 +++++++++++++++++ 4 files changed, 118 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6cd98a9b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index bb0de3e..bf4a8e0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -63,6 +63,7 @@ import org.apache.flink.table.sinks.TableSink import org.apache.flink.table.sources.{DefinedFieldNames, TableSource} import org.apache.flink.table.validate.FunctionCatalog import org.apache.flink.types.Row +import org.apache.flink.api.java.typeutils.RowTypeInfo import _root_.scala.collection.JavaConverters._ import _root_.scala.collection.mutable.HashMap @@ -677,6 +678,23 @@ abstract class TableEnvironment(val config: TableConfig) { case _ => throw new TableException( "Field reference expression or alias on field expression expected.") } + case r: RowTypeInfo => { + exprs.zipWithIndex flatMap { + case (UnresolvedFieldReference(name), idx) => + Some((idx, name)) + case (Alias(UnresolvedFieldReference(origName), name, _), _) => + val idx = r.getFieldIndex(origName) + if (idx < 0) { + throw new TableException(s"$origName is not a field of type $r") + } + Some((idx, name)) + case (_: TimeAttribute, _) => + None + case _ => throw new TableException( + "Field reference expression or alias on field expression expected.") + } + + } case tpe => throw new TableException( s"Source of type $tpe cannot be converted into Table.") } http://git-wip-us.apache.org/repos/asf/flink/blob/6cd98a9b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java index 7c01d2b..0c0b37e 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java @@ -30,11 +30,53 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.table.api.java.stream.utils.StreamTestData; import org.junit.Test; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import java.util.ArrayList; import java.util.List; public class SqlITCase extends StreamingMultipleProgramsTestBase { + + @Test + public void testRowRegisterRowWithNames() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); + StreamITCase.clear(); + + List<Row> data = new ArrayList<>(); + data.add(Row.of(1, 1L, "Hi")); + data.add(Row.of(2, 2L, "Hello")); + data.add(Row.of(3, 2L, "Hello world")); + + TypeInformation<?>[] types = { + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO}; + String names[] = {"a","b","c"}; + + RowTypeInfo typeInfo = new RowTypeInfo(types, names); + + DataStream<Row> ds = env.fromCollection(data).returns(typeInfo); + + Table in = tableEnv.fromDataStream(ds, "a,b,c"); + tableEnv.registerTable("MyTableRow", in); + + String sqlQuery = "SELECT a,c FROM MyTableRow"; + Table result = tableEnv.sql(sqlQuery); + + DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class); + resultSet.addSink(new StreamITCase.StringSink()); + env.execute(); + + List<String> expected = new ArrayList<>(); + expected.add("1,Hi"); + expected.add("2,Hello"); + expected.add("3,Hello world"); + + StreamITCase.compareWithList(expected); + } @Test public void testSelect() throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/6cd98a9b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala index ba3b591..98a8edb 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala @@ -54,6 +54,28 @@ class TableEnvironmentTest extends TableTestBase { val genericRowType = new GenericTypeInfo[Row](classOf[Row]) @Test + def testGetFieldInfoRow(): Unit = { + val fieldInfo = tEnv.getFieldInfo(rowType) + + fieldInfo._1.zip(Array("f0", "f1", "f2")).foreach(x => assertEquals(x._2, x._1)) + fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1)) + } + + @Test + def testGetFieldInfoRowNames(): Unit = { + val fieldInfo = tEnv.getFieldInfo( + rowType, + Array( + UnresolvedFieldReference("name1"), + UnresolvedFieldReference("name2"), + UnresolvedFieldReference("name3") + )) + + fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1)) + fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1)) + } + + @Test def testGetFieldInfoTuple(): Unit = { val fieldInfo = tEnv.getFieldInfo(tupleType) http://git-wip-us.apache.org/repos/asf/flink/blob/6cd98a9b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala index 4147358..ba8c185 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala @@ -23,12 +23,48 @@ import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase} +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.types.Row import org.junit.Assert._ import org.junit._ class SqlITCase extends StreamingWithStateTestBase { + /** test row stream registered table **/ + @Test + def testRowRegister(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.clear + + val sqlQuery = "SELECT * FROM MyTableRow WHERE c < 3" + + val data = List( + Row.of("Hello", "Worlds", Int.box(1)), + Row.of("Hello", "Hiden", Int.box(5)), + Row.of("Hello again", "Worlds", Int.box(2))) + + implicit val tpe: TypeInformation[Row] = new RowTypeInfo( + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO) // tpe is automatically + + val ds = env.fromCollection(data) + + val t = ds.toTable(tEnv).as('a, 'b, 'c) + tEnv.registerTable("MyTableRow", t) + + val result = tEnv.sql(sqlQuery).toDataStream[Row] + result.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = List("Hello,Worlds,1","Hello again,Worlds,2") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + /** test unbounded groupby (without window) **/ @Test def testUnboundedGroupby(): Unit = {