Repository: flink
Updated Branches:
  refs/heads/release-1.2 c0fb70f1c -> fdb3f65f2


[FLINK-6059] [table] Reject GenericType<Row> when converting DataSet or 
DataStream to Table.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fdb3f65f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fdb3f65f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fdb3f65f

Branch: refs/heads/release-1.2
Commit: fdb3f65f2d6595b88edae849ae6c848e5bbfaa2d
Parents: c0fb70f
Author: Fabian Hueske <fhue...@apache.org>
Authored: Wed Mar 15 13:24:42 2017 +0100
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Sat Apr 29 01:40:07 2017 +0200

----------------------------------------------------------------------
 .../flink/table/api/TableEnvironment.scala      | 16 +++++++--
 .../api/java/batch/TableEnvironmentITCase.java  | 38 ++++++++++++++++++++
 .../flink/table/TableEnvironmentTest.scala      | 23 +++++++++---
 .../scala/batch/TableEnvironmentITCase.scala    | 34 ++++++++++++++++++
 4 files changed, 104 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fdb3f65f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 5dc04d1..5cafe4f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -32,7 +32,7 @@ import org.apache.calcite.sql.util.ChainedSqlOperatorTable
 import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RuleSet, 
RuleSets}
 import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
 import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, PojoTypeInfo, 
TupleTypeInfo}
 import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv}
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv}
@@ -50,6 +50,7 @@ import org.apache.flink.table.plan.schema.RelTable
 import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.sources.{DefinedFieldNames, TableSource}
 import org.apache.flink.table.validate.FunctionCatalog
+import org.apache.flink.types.Row
 
 import _root_.scala.collection.JavaConverters._
 
@@ -328,7 +329,14 @@ abstract class TableEnvironment(val config: TableConfig) {
     */
   protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]):
   (Array[String], Array[Int]) = {
-    (TableEnvironment.getFieldNames(inputType), 
TableEnvironment.getFieldIndices(inputType))
+
+    if (inputType.isInstanceOf[GenericTypeInfo[A]] && inputType.getTypeClass 
== classOf[Row]) {
+      throw new TableException(
+        "An input of GenericTypeInfo<Row> cannot be converted to Table. " +
+          "Please specify the type of the input with a RowTypeInfo.")
+    } else {
+      (TableEnvironment.getFieldNames(inputType), 
TableEnvironment.getFieldIndices(inputType))
+    }
   }
 
   /**
@@ -347,6 +355,10 @@ abstract class TableEnvironment(val config: TableConfig) {
     TableEnvironment.validateType(inputType)
 
     val indexedNames: Array[(Int, String)] = inputType match {
+      case g: GenericTypeInfo[A] if g.getTypeClass == classOf[Row] =>
+        throw new TableException(
+          "An input of GenericTypeInfo<Row> cannot be converted to Table. " +
+            "Please specify the type of the input with a RowTypeInfo.")
       case a: AtomicType[A] =>
         if (exprs.length != 1) {
           throw new TableException("Table of atomic type can only have a 
single field.")

http://git-wip-us.apache.org/repos/asf/flink/blob/fdb3f65f/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
index 67eb2d1..1cc3071 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.calcite.tools.RuleSets;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.table.api.java.BatchTableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
@@ -45,6 +46,8 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import static org.junit.Assert.assertTrue;
+
 @RunWith(Parameterized.class)
 public class TableEnvironmentITCase extends TableProgramsTestBase {
 
@@ -375,6 +378,41 @@ public class TableEnvironmentITCase extends 
TableProgramsTestBase {
        }
 
        @Test(expected = TableException.class)
+       public void testGenericRow() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
+
+               // use null value the enforce GenericType
+               Row row = new Row(4);
+               row.setField(0, 1);
+               row.setField(1, 2L);
+               row.setField(2, "Hello");
+               row.setField(3, null);
+               DataSet<Row> dataSet = env.fromElements(row);
+               assertTrue(dataSet.getType() instanceof GenericTypeInfo);
+               assertTrue(dataSet.getType().getTypeClass().equals(Row.class));
+
+               // Must fail. Cannot import DataSet<Row> with GenericTypeInfo.
+               tableEnv.fromDataSet(dataSet);
+       }
+
+       @Test(expected = TableException.class)
+       public void testGenericRowWithAlias() throws Exception {
+               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+               BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
+
+               Row row = new Row(1);
+               row.setField(0, null);
+               // use null value the enforce GenericType
+               DataSet<Row> dataSet = env.fromElements(row);
+               assertTrue(dataSet.getType() instanceof GenericTypeInfo);
+               assertTrue(dataSet.getType().getTypeClass().equals(Row.class));
+
+               // Must fail. Cannot import DataSet<Row> with GenericTypeInfo.
+               tableEnv.fromDataSet(dataSet, "nullField");
+       }
+
+       @Test(expected = TableException.class)
        public void testAsWithToFewFields() throws Exception {
                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());

http://git-wip-us.apache.org/repos/asf/flink/blob/fdb3f65f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
index f91aee9..50f5b08 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/TableEnvironmentTest.scala
@@ -22,10 +22,12 @@ import org.apache.calcite.tools.RuleSet
 import org.apache.flink.api.scala._
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
-import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment, 
TableException}
-import org.apache.flink.table.expressions.{Alias, UnresolvedFieldReference}
+import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment}
 import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, TupleTypeInfo, 
TypeExtractor}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.expressions.{Alias, UnresolvedFieldReference}
+import org.apache.flink.types.Row
 import org.junit.Test
 import org.junit.Assert.assertEquals
 
@@ -44,6 +46,8 @@ class TableEnvironmentTest {
 
   val atomicType = INT_TYPE_INFO
 
+  val genericRowType = new GenericTypeInfo[Row](classOf[Row])
+
   @Test
   def testGetFieldInfoTuple(): Unit = {
     val fieldInfo = tEnv.getFieldInfo(tupleType)
@@ -76,6 +80,11 @@ class TableEnvironmentTest {
     fieldInfo._2.zip(Array(0)).foreach(x => assertEquals(x._2, x._1))
   }
 
+  @Test(expected = classOf[TableException])
+  def testGetFieldInfoGenericRow(): Unit = {
+    tEnv.getFieldInfo(genericRowType)
+  }
+
   @Test
   def testGetFieldInfoTupleNames(): Unit = {
     val fieldInfo = tEnv.getFieldInfo(
@@ -84,7 +93,7 @@ class TableEnvironmentTest {
         new UnresolvedFieldReference("name1"),
         new UnresolvedFieldReference("name2"),
         new UnresolvedFieldReference("name3")
-    ))
+      ))
 
     fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => 
assertEquals(x._2, x._1))
     fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
@@ -98,7 +107,7 @@ class TableEnvironmentTest {
         new UnresolvedFieldReference("name1"),
         new UnresolvedFieldReference("name2"),
         new UnresolvedFieldReference("name3")
-    ))
+      ))
 
     fieldInfo._1.zip(Array("name1", "name2", "name3")).foreach(x => 
assertEquals(x._2, x._1))
     fieldInfo._2.zip(Array(0, 1, 2)).foreach(x => assertEquals(x._2, x._1))
@@ -276,6 +285,10 @@ class TableEnvironmentTest {
       ))
   }
 
+  @Test(expected = classOf[TableException])
+  def testGetFieldInfoGenericRowAlias(): Unit = {
+    tEnv.getFieldInfo(genericRowType, Array(UnresolvedFieldReference("first")))
+  }
 }
 
 class MockTableEnvironment extends TableEnvironment(new TableConfig) {

http://git-wip-us.apache.org/repos/asf/flink/blob/fdb3f65f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
index 961e575..d294abe 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.scala.batch
 
 import java.util
 
+import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase
 import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
@@ -32,6 +33,7 @@ import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
+import org.junit.Assert.assertTrue
 
 import scala.collection.JavaConverters._
 
@@ -256,6 +258,38 @@ class TableEnvironmentITCase(
     CollectionDataSets.get3TupleDataSet(env)
       .toTable(tEnv, 'a as 'foo, 'b, 'c)
   }
+
+  @Test(expected = classOf[TableException])
+  def testGenericRow() {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    // use null value the enforce GenericType
+    val row = new Row(1)
+    row.setField(0, null)
+    val dataSet = env.fromElements(row)
+    assertTrue(dataSet.getType().isInstanceOf[GenericTypeInfo[_]])
+    assertTrue(dataSet.getType().getTypeClass == classOf[Row])
+
+    // Must fail. Cannot import DataSet<Row> with GenericTypeInfo.
+    tableEnv.fromDataSet(dataSet)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testGenericRowWithAlias() {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    // use null value the enforce GenericType
+    val row = new Row(1)
+    row.setField(0, null)
+    val dataSet = env.fromElements(row)
+    assertTrue(dataSet.getType().isInstanceOf[GenericTypeInfo[_]])
+    assertTrue(dataSet.getType().getTypeClass == classOf[Row])
+
+    // Must fail. Cannot import DataSet<Row> with GenericTypeInfo.
+    tableEnv.fromDataSet(dataSet, "nullField")
+  }
 }
 
 object TableEnvironmentITCase {

Reply via email to