Repository: flink Updated Branches: refs/heads/master 7eb45c133 -> f150f9877
[FLINK-4252] [table] Validate input and output classes of Table API This closes #2507. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f150f987 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f150f987 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f150f987 Branch: refs/heads/master Commit: f150f987772c8d96f41a5acd1d20cba6622cb5c9 Parents: 7eb45c1 Author: twalthr <twal...@apache.org> Authored: Fri Sep 16 14:27:47 2016 +0200 Committer: twalthr <twal...@apache.org> Committed: Mon Sep 26 18:58:44 2016 +0200 ---------------------------------------------------------------------- .../api/java/table/BatchTableEnvironment.scala | 5 ++--- .../flink/api/table/BatchTableEnvironment.scala | 4 +++- .../flink/api/table/FlinkRelBuilder.scala | 4 ++-- .../api/table/StreamTableEnvironment.scala | 2 ++ .../flink/api/table/TableEnvironment.scala | 15 +++++++++++++ .../api/java/batch/table/FromDataSetITCase.java | 23 ++++++++++++++++++++ 6 files changed, 47 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f150f987/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala index 9ba5b20..a4f40d5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala @@ -17,12 +17,11 @@ */ package org.apache.flink.api.java.table -import org.apache.flink.api.common.io.InputFormat import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.{ExecutionEnvironment, DataSet} import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} import org.apache.flink.api.table.expressions.ExpressionParser -import org.apache.flink.api.table.{Row, TableConfig, Table} +import org.apache.flink.api.table.{Table, TableConfig} /** * The [[org.apache.flink.api.table.TableEnvironment]] for a Java batch [[DataSet]] http://git-wip-us.apache.org/repos/asf/flink/blob/f150f987/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala index eb4c819..ad3ff7a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala @@ -190,7 +190,7 @@ abstract class BatchTableEnvironment( * * @param table The table for which the AST and execution plan will be returned. */ - def explain(table: Table): String = explain(table: Table, false) + def explain(table: Table): String = explain(table: Table, extended = false) /** * Registers a [[DataSet]] as a table under a given name in the [[TableEnvironment]]'s catalog. @@ -240,6 +240,8 @@ abstract class BatchTableEnvironment( */ protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = { + validateType(tpe) + val relNode = table.getRelNode // decorrelate http://git-wip-us.apache.org/repos/asf/flink/blob/f150f987/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala index e3bb97e..3827f05 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala @@ -19,7 +19,7 @@ package org.apache.flink.api.table import org.apache.calcite.jdbc.CalciteSchema -import org.apache.calcite.plan.{Context, RelOptCluster, RelOptSchema} +import org.apache.calcite.plan.{Context, RelOptCluster, RelOptPlanner, RelOptSchema} import org.apache.calcite.prepare.CalciteCatalogReader import org.apache.calcite.rex.RexBuilder import org.apache.calcite.schema.SchemaPlus @@ -38,7 +38,7 @@ class FlinkRelBuilder( cluster, relOptSchema) { - def getPlanner = cluster.getPlanner + def getPlanner: RelOptPlanner = cluster.getPlanner def getCluster = cluster http://git-wip-us.apache.org/repos/asf/flink/blob/f150f987/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala index f73cd3f..e3e5751 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala @@ -240,6 +240,8 @@ abstract class StreamTableEnvironment( */ protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = { + validateType(tpe) + val relNode = table.getRelNode // decorrelate http://git-wip-us.apache.org/repos/asf/flink/blob/f150f987/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala index b95198c..f56df0c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala @@ -18,6 +18,7 @@ package org.apache.flink.api.table +import java.lang.reflect.Modifier import java.util.concurrent.atomic.AtomicInteger import org.apache.calcite.config.Lex @@ -242,6 +243,16 @@ abstract class TableEnvironment(val config: TableConfig) { frameworkConfig } + protected def validateType(typeInfo: TypeInformation[_]): Unit = { + val clazz = typeInfo.getTypeClass + if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) || + !Modifier.isPublic(clazz.getModifiers) || + clazz.getCanonicalName == null) { + throw TableException(s"Class '$clazz' described in type information '$typeInfo' must be " + + s"static and globally accessible.") + } + } + /** * Returns field names and field positions for a given [[TypeInformation]]. * @@ -257,6 +268,8 @@ abstract class TableEnvironment(val config: TableConfig) { protected[flink] def getFieldInfo[A](inputType: TypeInformation[A]): (Array[String], Array[Int]) = { + validateType(inputType) + val fieldNames: Array[String] = inputType match { case t: TupleTypeInfo[A] => t.getFieldNames case c: CaseClassTypeInfo[A] => c.getFieldNames @@ -286,6 +299,8 @@ abstract class TableEnvironment(val config: TableConfig) { inputType: TypeInformation[A], exprs: Array[Expression]): (Array[String], Array[Int]) = { + validateType(inputType) + val indexedNames: Array[(Int, String)] = inputType match { case a: AtomicType[A] => if (exprs.length != 1) { http://git-wip-us.apache.org/repos/asf/flink/blob/f150f987/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java index af96a04..e6b9226 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java @@ -299,8 +299,31 @@ public class FromDataSetITCase extends TableProgramsTestBase { tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a as foo, b, c"); } + @Test(expected = TableException.class) + public void testNonStaticClassInput() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); + + // Must fail since class is not static + tableEnv.fromDataSet(env.fromElements(new MyNonStatic()), "name"); + } + + @Test(expected = TableException.class) + public void testNonStaticClassOutput() throws Exception { + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); + + // Must fail since class is not static + Table t = tableEnv.fromDataSet(env.fromElements(1, 2, 3), "number"); + tableEnv.toDataSet(t, MyNonStatic.class); + } + // -------------------------------------------------------------------------------------------- + public class MyNonStatic { + public int number; + } + @SuppressWarnings("unused") public static class SmallPojo {