Repository: flink Updated Branches: refs/heads/release-1.2 c0fb70f1c -> fdb3f65f2
[FLINK-6059] [table] Reject GenericType<Row> when converting DataSet or DataStream to Table. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fdb3f65f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fdb3f65f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fdb3f65f Branch: refs/heads/release-1.2 Commit: fdb3f65f2d6595b88edae849ae6c848e5bbfaa2d Parents: c0fb70f Author: Fabian Hueske <fhue...@apache.org> Authored: Wed Mar 15 13:24:42 2017 +0100 Committer: Fabian Hueske <fhue...@apache.org> Committed: Sat Apr 29 01:40:07 2017 +0200 ---------------------------------------------------------------------- .../flink/table/api/TableEnvironment.scala | 16 +++++++-- .../api/java/batch/TableEnvironmentITCase.java | 38 ++++++++++++++++++++ .../flink/table/TableEnvironmentTest.scala | 23 +++++++++--- .../scala/batch/TableEnvironmentITCase.scala | 34 ++++++++++++++++++ 4 files changed, 104 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/fdb3f65f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index 5dc04d1..5cafe4f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -32,7 +32,7 @@ import org.apache.calcite.sql.util.ChainedSqlOperatorTable import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RuleSet, RuleSets} import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} +import org.apache.flink.api.java.typeutils.{GenericTypeInfo, PojoTypeInfo, TupleTypeInfo} import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv} @@ -50,6 +50,7 @@ import org.apache.flink.table.plan.schema.RelTable import org.apache.flink.table.sinks.TableSink import org.apache.flink.table.sources.{DefinedFieldNames, TableSource} import org.apache.flink.table.validate.FunctionCatalog +import org.apache.flink.types.Row import _root_.scala.collection.JavaConverters._ @@ -328,7 +329,14 @@ abstract class TableEnvironment(val config: TableConfig) { */ protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = { - (TableEnvironment.getFieldNames(inputType), TableEnvironment.getFieldIndices(inputType)) + + if (inputType.isInstanceOf[GenericTypeInfo[A]] && inputType.getTypeClass == classOf[Row]) { + throw new TableException( + "An input of GenericTypeInfo<Row> cannot be converted to Table. " + + "Please specify the type of the input with a RowTypeInfo.") + } else { + (TableEnvironment.getFieldNames(inputType), TableEnvironment.getFieldIndices(inputType)) + } } /** @@ -347,6 +355,10 @@ abstract class TableEnvironment(val config: TableConfig) { TableEnvironment.validateType(inputType) val indexedNames: Array[(Int, String)] = inputType match { + case g: GenericTypeInfo[A] if g.getTypeClass == classOf[Row] => + throw new TableException( + "An input of GenericTypeInfo<Row> cannot be converted to Table. " + + "Please specify the type of the input with a RowTypeInfo.") case a: AtomicType[A] => if (exprs.length != 1) { throw new TableException("Table of atomic type can only have a single field.") http://git-wip-us.apache.org/repos/asf/flink/blob/fdb3f65f/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java index 67eb2d1..1cc3071 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java @@ -28,6 +28,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.calcite.tools.RuleSets; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.table.api.java.BatchTableEnvironment; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; @@ -45,6 +46,8 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import static org.junit.Assert.assertTrue; + @RunWith(Parameterized.class) public class TableEnvironmentITCase extends TableProgramsTestBase { @@ -375,6 +378,41 @@ public class TableEnvironmentITCase extends TableProgramsTestBase { } @Test(expected = TableException.class) + public void testGenericRow() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); + + // use null value the enforce GenericType + Row row = new Row(4); + row.setField(0, 1); + row.setField(1, 2L); + row.setField(2, "Hello"); + row.setField(3, null); + DataSet<Row> dataSet = env.fromElements(row); + assertTrue(dataSet.getType() instanceof GenericTypeInfo); + assertTrue(dataSet.getType().getTypeClass().equals(Row.class)); + + // Must fail. Cannot import DataSet<Row> with GenericTypeInfo. + tableEnv.fromDataSet(dataSet); + } + + @Test(expected = TableException.class) + public void testGenericRowWithAlias() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); + + Row row = new Row(1); + row.setField(0, null); + // use null value the enforce GenericType + DataSet<Row> dataSet = env.fromElements(row); + assertTrue(dataSet.getType() instanceof GenericTypeInfo); + assertTrue(dataSet.getType().getTypeClass().equals(Row.class)); + + // Must fail. Cannot import DataSet<Row> with GenericTypeInfo. + tableEnv.fromDataSet(dataSet, "nullField"); + } + + @Test(expected = TableException.class) public void testAsWithToFewFields() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); http://git-wip-us.apache.org/repos/asf/flink/blob/fdb3f65f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala index f91aee9..50f5b08 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala @@ -22,10 +22,12 @@ import org.apache.calcite.tools.RuleSet import org.apache.flink.api.scala._ import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor} -import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment, TableException} -import org.apache.flink.table.expressions.{Alias, UnresolvedFieldReference} +import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment} import org.apache.flink.table.sinks.TableSink +import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo, TypeExtractor} +import org.apache.flink.table.api.TableException +import org.apache.flink.table.expressions.{Alias, UnresolvedFieldReference} +import org.apache.flink.types.Row import org.junit.Test import org.junit.Assert.assertEquals @@ -44,6 +46,8 @@ class TableEnvironmentTest { val atomicType = INT_TYPE_INFO + val genericRowType = new GenericTypeInfo[Row](classOf[Row]) + @Test def testGetFieldInfoTuple(): Unit = { val fieldInfo = tEnv.getFieldInfo(tupleType) @@ -76,6 +80,11 @@ class TableEnvironmentTest { fieldInfo._2.zip(Array(0)).foreach(x => assertEquals(x._2, x._1)) } + @Test(expected = classOf[TableException]) + def testGetFieldInfoGenericRow(): Unit = { + tEnv.getFieldInfo(genericRowType) + } + @Test def testGetFieldInfoTupleNames(): Unit = { val fieldInfo = tEnv.getFieldInfo( @@ -84,7 +93,7 @@ class TableEnvironmentTest { new UnresolvedFieldReference("name1"), new UnresolvedFieldReference("name2"), new UnresolvedFieldReference("name3") - )) + )) fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1)) fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1)) @@ -98,7 +107,7 @@ class TableEnvironmentTest { new UnresolvedFieldReference("name1"), new UnresolvedFieldReference("name2"), new UnresolvedFieldReference("name3") - )) + )) fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => assertEquals(x._2, x._1)) fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1)) @@ -276,6 +285,10 @@ class TableEnvironmentTest { )) } + @Test(expected = classOf[TableException]) + def testGetFieldInfoGenericRowAlias(): Unit = { + tEnv.getFieldInfo(genericRowType, Array(UnresolvedFieldReference("first"))) + } } class MockTableEnvironment extends TableEnvironment(new TableConfig) { http://git-wip-us.apache.org/repos/asf/flink/blob/fdb3f65f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala index 961e575..d294abe 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.api.scala.batch import java.util +import org.apache.flink.api.java.typeutils.GenericTypeInfo import org.apache.flink.api.scala._ import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode @@ -32,6 +33,7 @@ import org.apache.flink.test.util.TestBaseUtils import org.junit._ import org.junit.runner.RunWith import org.junit.runners.Parameterized +import org.junit.Assert.assertTrue import scala.collection.JavaConverters._ @@ -256,6 +258,38 @@ class TableEnvironmentITCase( CollectionDataSets.get3TupleDataSet(env) .toTable(tEnv, 'a as 'foo, 'b, 'c) } + + @Test(expected = classOf[TableException]) + def testGenericRow() { + val env = ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env, config) + + // use null value the enforce GenericType + val row = new Row(1) + row.setField(0, null) + val dataSet = env.fromElements(row) + assertTrue(dataSet.getType().isInstanceOf[GenericTypeInfo[_]]) + assertTrue(dataSet.getType().getTypeClass == classOf[Row]) + + // Must fail. Cannot import DataSet<Row> with GenericTypeInfo. + tableEnv.fromDataSet(dataSet) + } + + @Test(expected = classOf[TableException]) + def testGenericRowWithAlias() { + val env = ExecutionEnvironment.getExecutionEnvironment + val tableEnv = TableEnvironment.getTableEnvironment(env, config) + + // use null value the enforce GenericType + val row = new Row(1) + row.setField(0, null) + val dataSet = env.fromElements(row) + assertTrue(dataSet.getType().isInstanceOf[GenericTypeInfo[_]]) + assertTrue(dataSet.getType().getTypeClass == classOf[Row]) + + // Must fail. Cannot import DataSet<Row> with GenericTypeInfo. + tableEnv.fromDataSet(dataSet, "nullField") + } } object TableEnvironmentITCase {