Repository: flink Updated Branches: refs/heads/tableOnCalcite d720b002a -> 3f8cea74a
[FLINK-3604][tableAPI] Enable and fix ignored tests. This closes #1782. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3f8cea74 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3f8cea74 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3f8cea74 Branch: refs/heads/tableOnCalcite Commit: 3f8cea74a79d31ee1d0cb74a767635f70a29e3c1 Parents: d720b00 Author: Fabian Hueske <fhue...@apache.org> Authored: Thu Mar 10 14:59:18 2016 +0100 Committer: vasia <va...@apache.org> Committed: Fri Mar 11 15:44:20 2016 +0100 ---------------------------------------------------------------------- .../api/java/table/JavaBatchTranslator.scala | 6 +- .../api/scala/table/DataSetConversions.scala | 14 +- .../apache/flink/api/scala/table/package.scala | 2 +- .../flink/api/table/plan/TypeConverter.scala | 23 +- .../plan/nodes/dataset/DataSetAggregate.scala | 2 +- .../org/apache/flink/api/table/table.scala | 361 ++++++++++--------- .../flink/api/table/typeinfo/RowTypeInfo.scala | 15 +- .../api/java/table/test/AggregationsITCase.java | 8 +- .../flink/api/java/table/test/AsITCase.java | 84 ++--- .../table/test/GroupedAggregationsITCase.java | 1 - .../api/java/table/test/PojoGroupingITCase.java | 3 +- .../table/test/StringExpressionsITCase.java | 16 +- .../scala/table/test/PageRankTableITCase.java | 4 +- .../scala/table/test/AggregationsITCase.scala | 16 +- .../flink/api/scala/table/test/AsITCase.scala | 49 ++- .../api/scala/table/test/CastingITCase.scala | 3 +- .../api/scala/table/test/FilterITCase.scala | 35 +- .../table/test/StringExpressionsITCase.scala | 13 +- 18 files changed, 316 insertions(+), 339 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala index 14ee78e..dbbe7e8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala @@ -88,11 +88,13 @@ class JavaBatchTranslator(config: TableConfig) extends PlanTranslator { s"Cannot generate a valid execution plan for the given query: \n\n" + s"${RelOptUtil.toString(lPlan)}\n" + "Please consider filing a bug report.", e) + case a: AssertionError => + throw a.getCause } - println("---------------") + println("-------------") println("DataSet Plan:") - println("---------------") + println("-------------") println(RelOptUtil.toString(dataSetPlan)) dataSetPlan match { http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala index 2508a3d..6e630ea 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/DataSetConversions.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.api.scala.table +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.table._ import org.apache.flink.api.table.expressions.{UnresolvedFieldReference, Expression} import org.apache.flink.api.common.typeutils.CompositeType @@ -27,7 +28,7 @@ import org.apache.flink.api.scala._ * Methods for converting a [[DataSet]] to a [[Table]]. A [[DataSet]] is * wrapped in this by the implicit conversions in [[org.apache.flink.api.scala.table]]. */ -class DataSetConversions[T](set: DataSet[T], inputType: CompositeType[T]) { +class DataSetConversions[T](set: DataSet[T], inputType: TypeInformation[T]) { /** * Converts the [[DataSet]] to a [[Table]]. The field names can be specified like this: @@ -59,8 +60,15 @@ class DataSetConversions[T](set: DataSet[T], inputType: CompositeType[T]) { * of type `Int`. */ def toTable: Table = { - val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference) - as(resultFields: _*) + + inputType match { + case c: CompositeType[T] => + val resultFields = c.getFieldNames.map(UnresolvedFieldReference) + as(resultFields: _*) + case _ => + throw new IllegalArgumentException("" + + "Please specify a field name with 'as' to convert an atomic type dataset to a table ") + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala index 86bb7c0..5c3ba44 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/package.scala @@ -72,7 +72,7 @@ package object table extends ImplicitExpressionConversions { } implicit def dataSet2DataSetConversions[T](set: DataSet[T]): DataSetConversions[T] = { - new DataSetConversions[T](set, set.getType.asInstanceOf[CompositeType[T]]) + new DataSetConversions[T](set, set.getType) } implicit def table2RowDataSet( http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala index 62c87a1..030d577 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala @@ -37,7 +37,7 @@ import scala.collection.JavaConversions._ object TypeConverter { - val DEFAULT_ROW_TYPE = new RowTypeInfo(Seq()).asInstanceOf[TypeInformation[Any]] + val DEFAULT_ROW_TYPE = new RowTypeInfo(Seq(), Seq()).asInstanceOf[TypeInformation[Any]] def typeInfoToSqlType(typeInfo: TypeInformation[_]): SqlTypeName = typeInfo match { case BOOLEAN_TYPE_INFO => BOOLEAN @@ -132,17 +132,16 @@ object TypeConverter { // POJO type expected case pt: PojoTypeInfo[_] => - logicalFieldTypes.zipWithIndex foreach { - case (fieldTypeInfo, i) => - val fieldName = logicalFieldNames(i) - val index = pt.getFieldIndex(fieldName) - if (index < 0) { - throw new TableException(s"POJO does not define field name: $fieldName") + logicalFieldNames.zip(logicalFieldTypes) foreach { + case (fName, fType) => + val pojoIdx = pt.getFieldIndex(fName) + if (pojoIdx < 0) { + throw new TableException(s"POJO does not define field name: $fName") } - val expectedTypeInfo = pt.getTypeAt(i) - if (fieldTypeInfo != expectedTypeInfo) { + val expectedTypeInfo = pt.getTypeAt(pojoIdx) + if (fType != expectedTypeInfo) { throw new TableException(s"Result field does not match expected type. " + - s"Expected: $expectedTypeInfo; Actual: $fieldTypeInfo") + s"Expected: $expectedTypeInfo; Actual: $fType") } } @@ -172,7 +171,7 @@ object TypeConverter { // Row is expected, create the arity for it case Some(typeInfo) if typeInfo.getTypeClass == classOf[Row] => - new RowTypeInfo(logicalFieldTypes) + new RowTypeInfo(logicalFieldTypes, logicalFieldNames) // no physical type // determine type based on logical fields and configuration parameters @@ -180,7 +179,7 @@ object TypeConverter { // no need for efficient types -> use Row // we cannot use efficient types if row arity > tuple arity or nullable if (!useEfficientTypes || logicalFieldTypes.length > Tuple.MAX_ARITY || nullable) { - new RowTypeInfo(logicalFieldTypes) + new RowTypeInfo(logicalFieldTypes, logicalFieldNames) } // use efficient type tuple or atomic type else { http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala index 9a9bf99..d3416ee 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetAggregate.scala @@ -91,7 +91,7 @@ class DataSetAggregate( .map(n => TypeConverter.sqlTypeToTypeInfo(n)) .toArray - val rowTypeInfo = new RowTypeInfo(fieldTypes) + val rowTypeInfo = new RowTypeInfo(fieldTypes, rowType.getFieldNames.asScala) val mappedInput = inputDS.map(aggregateResult._1) val groupReduceFunction = aggregateResult._2 http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala index 0763f34..43c097e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala @@ -28,39 +28,39 @@ import org.apache.calcite.tools.RelBuilder.{AggCall, GroupKey} import org.apache.calcite.util.NlsString import org.apache.flink.api.table.plan.{PlanGenException, RexNodeTranslator} import RexNodeTranslator.{toRexNode, extractAggCalls} -import org.apache.flink.api.table.expressions.Expression +import org.apache.flink.api.table.expressions.{Naming, UnresolvedFieldReference, Expression} import org.apache.flink.api.table.parser.ExpressionParser import scala.collection.JavaConverters._ case class BaseTable( - private[flink] val relNode: RelNode, - private[flink] val relBuilder: RelBuilder) + private[flink] val relNode: RelNode, + private[flink] val relBuilder: RelBuilder) /** - * The abstraction for writing Table API programs. Similar to how the batch and streaming APIs - * have [[org.apache.flink.api.scala.DataSet]] and - * [[org.apache.flink.streaming.api.scala.DataStream]]. - * - * Use the methods of [[Table]] to transform data. Use - * [[org.apache.flink.api.java.table.TableEnvironment]] to convert a [[Table]] back to a DataSet - * or DataStream. - * - * When using Scala a [[Table]] can also be converted using implicit conversions. - * - * Example: - * - * {{{ - * val table = set.toTable('a, 'b) - * ... - * val table2 = ... - * val set = table2.toDataSet[MyType] - * }}} - * - * Operations such as [[join]], [[select]], [[where]] and [[groupBy]] either take arguments - * in a Scala DSL or as an expression String. Please refer to the documentation for the expression - * syntax. - */ + * The abstraction for writing Table API programs. Similar to how the batch and streaming APIs + * have [[org.apache.flink.api.scala.DataSet]] and + * [[org.apache.flink.streaming.api.scala.DataStream]]. + * + * Use the methods of [[Table]] to transform data. Use + * [[org.apache.flink.api.java.table.TableEnvironment]] to convert a [[Table]] back to a DataSet + * or DataStream. + * + * When using Scala a [[Table]] can also be converted using implicit conversions. + * + * Example: + * + * {{{ + * val table = set.toTable('a, 'b) + * ... + * val table2 = ... + * val set = table2.toDataSet[MyType] + * }}} + * + * Operations such as [[join]], [[select]], [[where]] and [[groupBy]] either take arguments + * in a Scala DSL or as an expression String. Please refer to the documentation for the expression + * syntax. + */ class Table( private[flink] override val relNode: RelNode, private[flink] override val relBuilder: RelBuilder) @@ -68,15 +68,15 @@ class Table( { /** - * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions - * can contain complex expressions and aggregations. - * - * Example: - * - * {{{ - * in.select('key, 'value.avg + " The average" as 'average, 'other.substring(0, 10)) - * }}} - */ + * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions + * can contain complex expressions and aggregations. + * + * Example: + * + * {{{ + * in.select('key, 'value.avg + " The average" as 'average, 'other.substring(0, 10)) + * }}} + */ def select(fields: Expression*): Table = { relBuilder.push(relNode) @@ -101,90 +101,95 @@ class Table( .map(toRexNode(_, relBuilder)) relBuilder.project(exprs.toIterable.asJava) - var projected = relBuilder.build() + val projected = relBuilder.build() if(relNode == projected) { // Calcite's RelBuilder does not translate identity projects even if they rename fields. // Add a projection ourselves (will be automatically removed by translation rules). - val names = exprs.map{ e => - e.getKind match { - case SqlKind.AS => - e.asInstanceOf[RexCall].getOperands.get(1) - .asInstanceOf[RexLiteral].getValue - .asInstanceOf[NlsString].getValue - case SqlKind.INPUT_REF => - relNode.getRowType.getFieldNames.get(e.asInstanceOf[RexInputRef].getIndex) - case _ => - throw new PlanGenException("Unexpected expression type encountered.") - } - - } - - projected = LogicalProject.create(relNode, exprs.toList.asJava, names.toList.asJava) + new Table(createRenamingProject(exprs), relBuilder) + } else { + new Table(projected, relBuilder) } - new Table(projected, relBuilder) } /** - * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions - * can contain complex expressions and aggregations. - * - * Example: - * - * {{{ - * in.select("key, value.avg + " The average" as average, other.substring(0, 10)") - * }}} - */ + * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions + * can contain complex expressions and aggregations. + * + * Example: + * + * {{{ + * in.select("key, value.avg + " The average" as average, other.substring(0, 10)") + * }}} + */ def select(fields: String): Table = { val fieldExprs = ExpressionParser.parseExpressionList(fields) select(fieldExprs: _*) } /** - * Renames the fields of the expression result. Use this to disambiguate fields before - * joining to operations. - * - * Example: - * - * {{{ - * in.as('a, 'b) - * }}} - */ + * Renames the fields of the expression result. Use this to disambiguate fields before + * joining to operations. + * + * Example: + * + * {{{ + * in.as('a, 'b) + * }}} + */ def as(fields: Expression*): Table = { + val curNames = relNode.getRowType.getFieldNames.asScala + + // validate that AS has only field references + if (! fields.forall( _.isInstanceOf[UnresolvedFieldReference] )) { + throw new IllegalArgumentException("All expressions must be field references.") + } + // validate that we have not more field references than fields + if ( fields.length > curNames.size) { + throw new IllegalArgumentException("More field references than fields.") + } + + val curFields = curNames.map(new UnresolvedFieldReference(_)) + + val renamings = fields.zip(curFields).map { + case (newName, oldName) => new Naming(oldName, newName.name) + } + val remaining = curFields.drop(fields.size) + relBuilder.push(relNode) - val expressions = fields.map(toRexNode(_, relBuilder)).toIterable.asJava - val names = fields.map(_.name).toIterable.asJava - relBuilder.project(expressions, names) - new Table(relBuilder.build(), relBuilder) + + val exprs = (renamings ++ remaining).map(toRexNode(_, relBuilder)) + + new Table(createRenamingProject(exprs), relBuilder) } /** - * Renames the fields of the expression result. Use this to disambiguate fields before - * joining to operations. - * - * Example: - * - * {{{ - * in.as("a, b") - * }}} - */ + * Renames the fields of the expression result. Use this to disambiguate fields before + * joining to operations. + * + * Example: + * + * {{{ + * in.as("a, b") + * }}} + */ def as(fields: String): Table = { val fieldExprs = ExpressionParser.parseExpressionList(fields) as(fieldExprs: _*) } /** - * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE - * clause. - * - * Example: - * - * {{{ - * in.filter('name === "Fred") - * }}} - */ + * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE + * clause. + * + * Example: + * + * {{{ + * in.filter('name === "Fred") + * }}} + */ def filter(predicate: Expression): Table = { relBuilder.push(relNode) @@ -194,58 +199,58 @@ class Table( } /** - * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE - * clause. - * - * Example: - * - * {{{ - * in.filter("name = 'Fred'") - * }}} - */ + * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE + * clause. + * + * Example: + * + * {{{ + * in.filter("name = 'Fred'") + * }}} + */ def filter(predicate: String): Table = { val predicateExpr = ExpressionParser.parseExpression(predicate) filter(predicateExpr) } /** - * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE - * clause. - * - * Example: - * - * {{{ - * in.where('name === "Fred") - * }}} - */ + * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE + * clause. + * + * Example: + * + * {{{ + * in.where('name === "Fred") + * }}} + */ def where(predicate: Expression): Table = { filter(predicate) } /** - * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE - * clause. - * - * Example: - * - * {{{ - * in.where("name = 'Fred'") - * }}} - */ + * Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE + * clause. + * + * Example: + * + * {{{ + * in.where("name = 'Fred'") + * }}} + */ def where(predicate: String): Table = { filter(predicate) } /** - * Groups the elements on some grouping keys. Use this before a selection with aggregations - * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. - * - * Example: - * - * {{{ - * in.groupBy('key).select('key, 'value.avg) - * }}} - */ + * Groups the elements on some grouping keys. Use this before a selection with aggregations + * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. + * + * Example: + * + * {{{ + * in.groupBy('key).select('key, 'value.avg) + * }}} + */ def groupBy(fields: Expression*): GroupedTable = { relBuilder.push(relNode) @@ -256,29 +261,29 @@ class Table( } /** - * Groups the elements on some grouping keys. Use this before a selection with aggregations - * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. - * - * Example: - * - * {{{ - * in.groupBy("key").select("key, value.avg") - * }}} - */ + * Groups the elements on some grouping keys. Use this before a selection with aggregations + * to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement. + * + * Example: + * + * {{{ + * in.groupBy("key").select("key, value.avg") + * }}} + */ def groupBy(fields: String): GroupedTable = { val fieldsExpr = ExpressionParser.parseExpressionList(fields) groupBy(fieldsExpr: _*) } /** - * Removes duplicate values and returns only distinct (different) values. - * - * Example: - * - * {{{ - * in.select("key, value").distinct() - * }}} - */ + * Removes duplicate values and returns only distinct (different) values. + * + * Example: + * + * {{{ + * in.select("key, value").distinct() + * }}} + */ def distinct(): Table = { relBuilder.push(relNode) relBuilder.distinct() @@ -286,16 +291,16 @@ class Table( } /** - * Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined - * operations must not overlap, use [[as]] to rename fields if necessary. You can use - * where and select clauses after a join to further specify the behaviour of the join. - * - * Example: - * - * {{{ - * left.join(right).where('a === 'b && 'c > 3).select('a, 'b, 'd) - * }}} - */ + * Joins two [[Table]]s. Similar to an SQL join. The fields of the two joined + * operations must not overlap, use [[as]] to rename fields if necessary. You can use + * where and select clauses after a join to further specify the behaviour of the join. + * + * Example: + * + * {{{ + * left.join(right).where('a === 'b && 'c > 3).select('a, 'b, 'd) + * }}} + */ def join(right: Table): Table = { // check that join inputs do not have overlapping field names @@ -314,15 +319,15 @@ class Table( } /** - * Union two [[Table]]s. Similar to an SQL UNION ALL. The fields of the two union operations - * must fully overlap. - * - * Example: - * - * {{{ - * left.unionAll(right) - * }}} - */ + * Union two [[Table]]s. Similar to an SQL UNION ALL. The fields of the two union operations + * must fully overlap. + * + * Example: + * + * {{{ + * left.unionAll(right) + * }}} + */ def unionAll(right: Table): Table = { val leftRowType: List[RelDataTypeField] = relNode.getRowType.getFieldList.asScala.toList @@ -370,12 +375,32 @@ class Table( } def explain(): String = explain(false) + + private def createRenamingProject(exprs: Seq[RexNode]): LogicalProject = { + + val names = exprs.map{ e => + e.getKind match { + case SqlKind.AS => + e.asInstanceOf[RexCall].getOperands.get(1) + .asInstanceOf[RexLiteral].getValue + .asInstanceOf[NlsString].getValue + case SqlKind.INPUT_REF => + relNode.getRowType.getFieldNames.get(e.asInstanceOf[RexInputRef].getIndex) + case _ => + throw new PlanGenException("Unexpected expression type encountered.") + } + + } + + LogicalProject.create(relNode, exprs.toList.asJava, names.toList.asJava) + } + } class GroupedTable( - private[flink] override val relNode: RelNode, - private[flink] override val relBuilder: RelBuilder, - private[flink] val groupKey: GroupKey) extends BaseTable(relNode, relBuilder) { + private[flink] override val relNode: RelNode, + private[flink] override val relBuilder: RelBuilder, + private[flink] val groupKey: GroupKey) extends BaseTable(relNode, relBuilder) { /** * Performs a selection operation. Similar to an SQL SELECT statement. The field expressions @@ -404,7 +429,7 @@ class GroupedTable( // get selection expressions val exprs: List[RexNode] = try { - extractedAggCalls + extractedAggCalls .map(_._1) .map(toRexNode(_, relBuilder)) } http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala index 522c7f3..81c3836 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala @@ -30,14 +30,25 @@ import org.apache.flink.api.table.Row /** * TypeInformation for [[Row]]. */ -class RowTypeInfo(fieldTypes: Seq[TypeInformation[_]]) +class RowTypeInfo(fieldTypes: Seq[TypeInformation[_]], fieldNames: Seq[String]) extends CaseClassTypeInfo[Row]( classOf[Row], Array(), fieldTypes, - for (i <- fieldTypes.indices) yield "f" + i) + fieldNames) { + if (fieldTypes.length != fieldNames.length) { + throw new IllegalArgumentException("Number of field types and names is different.") + } + if (fieldNames.length != fieldNames.toSet.size) { + throw new IllegalArgumentException("Field names are not unique.") + } + + def this(fieldTypes: Seq[TypeInformation[_]]) = { + this(fieldTypes, for (i <- fieldTypes.indices) yield "f" + i) + } + /** * Temporary variable for directly passing orders to comparators. */ http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java index 30598c4..e26bc32 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java @@ -44,15 +44,13 @@ import org.apache.flink.api.java.table.TableEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple7; +import org.apache.flink.api.table.plan.PlanGenException; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import scala.NotImplementedError; - import java.util.List; @RunWith(Parameterized.class) @@ -160,9 +158,7 @@ public class AggregationsITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - // Calcite does not eagerly check type compatibility - @Ignore - @Test(expected = ExpressionException.class) + @Test(expected = PlanGenException.class) public void testNonWorkingDataTypes() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java index 628cbef..097339e 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java @@ -49,8 +49,9 @@ public class AsITCase extends TableProgramsTestBase { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = getJavaTableEnvironment(); - Table table = - tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c"); + Table table = tableEnv + .fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c") + .select("a, b, c"); DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); List<Row> results = ds.collect(); @@ -69,8 +70,9 @@ public class AsITCase extends TableProgramsTestBase { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = getJavaTableEnvironment(); - Table table = - tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c"); + Table table = tableEnv + .fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c") + .select("a, b, c"); TypeInformation<?> ti = new TupleTypeInfo<Tuple3<Integer, Long, String>>( BasicTypeInfo.INT_TYPE_INFO, @@ -99,8 +101,9 @@ public class AsITCase extends TableProgramsTestBase { data.add(new Tuple4<>("lol", 2, 1.0, "Hi")); data.add(new Tuple4<>("Test me", 4, 3.33, "Hello world")); - Table table = - tableEnv.fromDataSet(env.fromCollection(data), "a, b, c, d"); + Table table = tableEnv + .fromDataSet(env.fromCollection(data), "a, b, c, d") + .select("a, b, c, d"); DataSet<SmallPojo2> ds = tableEnv.toDataSet(table, SmallPojo2.class); List<SmallPojo2> results = ds.collect(); @@ -118,12 +121,13 @@ public class AsITCase extends TableProgramsTestBase { data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering")); data.add(new SmallPojo("Lucy", 42, 6000.00, "HR")); - Table table = - tableEnv.fromDataSet(env.fromCollection(data), + Table table = tableEnv + .fromDataSet(env.fromCollection(data), "department AS a, " + "age AS b, " + "salary AS c, " + - "name AS d"); + "name AS d") + .select("a, b, c, d"); DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); List<Row> results = ds.collect(); @@ -144,12 +148,13 @@ public class AsITCase extends TableProgramsTestBase { data.add(new PrivateSmallPojo("Anna", 56, 10000.00, "Engineering")); data.add(new PrivateSmallPojo("Lucy", 42, 6000.00, "HR")); - Table table = - tableEnv.fromDataSet(env.fromCollection(data), + Table table = tableEnv + .fromDataSet(env.fromCollection(data), "department AS a, " + "age AS b, " + "salary AS c, " + - "name AS d"); + "name AS d") + .select("a, b, c, d"); DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); List<Row> results = ds.collect(); @@ -170,12 +175,13 @@ public class AsITCase extends TableProgramsTestBase { data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering")); data.add(new SmallPojo("Lucy", 42, 6000.00, "HR")); - Table table = - tableEnv.fromDataSet(env.fromCollection(data), + Table table = tableEnv + .fromDataSet(env.fromCollection(data), "department AS a, " + "age AS b, " + "salary AS c, " + - "name AS d"); + "name AS d") + .select("a, b, c, d"); DataSet<SmallPojo2> ds = tableEnv.toDataSet(table, SmallPojo2.class); List<SmallPojo2> results = ds.collect(); @@ -196,12 +202,13 @@ public class AsITCase extends TableProgramsTestBase { data.add(new PrivateSmallPojo("Anna", 56, 10000.00, "Engineering")); data.add(new PrivateSmallPojo("Lucy", 42, 6000.00, "HR")); - Table table = - tableEnv.fromDataSet(env.fromCollection(data), + Table table = tableEnv + .fromDataSet(env.fromCollection(data), "department AS a, " + "age AS b, " + "salary AS c, " + - "name AS d"); + "name AS d") + .select("a, b, c, d"); DataSet<PrivateSmallPojo2> ds = tableEnv.toDataSet(table, PrivateSmallPojo2.class); List<PrivateSmallPojo2> results = ds.collect(); @@ -217,13 +224,7 @@ public class AsITCase extends TableProgramsTestBase { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = getJavaTableEnvironment(); - Table table = - tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b"); - - DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); - List<Row> results = ds.collect(); - String expected = ""; - compareResultAsText(results, expected); + tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b"); } @Test(expected = IllegalArgumentException.class) @@ -231,13 +232,7 @@ public class AsITCase extends TableProgramsTestBase { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = getJavaTableEnvironment(); - Table table = - tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d"); - - DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); - List<Row> results = ds.collect(); - String expected = ""; - compareResultAsText(results, expected); + tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d"); } @Test(expected = IllegalArgumentException.class) @@ -245,13 +240,7 @@ public class AsITCase extends TableProgramsTestBase { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = getJavaTableEnvironment(); - Table table = - tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, b"); - - DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); - List<Row> results = ds.collect(); - String expected = ""; - compareResultAsText(results, expected); + tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, b"); } @Test(expected = IllegalArgumentException.class) @@ -259,13 +248,7 @@ public class AsITCase extends TableProgramsTestBase { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = getJavaTableEnvironment(); - Table table = - tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c"); - - DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); - List<Row> results = ds.collect(); - String expected = ""; - compareResultAsText(results, expected); + tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c"); } @Test(expected = IllegalArgumentException.class) @@ -273,14 +256,7 @@ public class AsITCase extends TableProgramsTestBase { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = getJavaTableEnvironment(); - Table table = - tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a as foo, b," + - " c"); - - DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); - List<Row> results = ds.collect(); - String expected = ""; - compareResultAsText(results, expected); + tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a as foo, b, c"); } // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java index b8a16b3..91d1976 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java @@ -29,7 +29,6 @@ import org.apache.flink.test.util.MultipleProgramsTestBase; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import scala.NotImplementedError; import java.util.List; http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java index 993638d..b8b84f6 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java @@ -26,7 +26,6 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.table.TableEnvironment; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.table.Table; -import org.apache.flink.api.table.TableException; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.junit.Test; import org.junit.runner.RunWith; @@ -39,7 +38,7 @@ public class PojoGroupingITCase extends MultipleProgramsTestBase { super(mode); } - @Test(expected = TableException.class) + @Test public void testPojoGrouping() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java index 6b8f984..707ee66 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java @@ -80,9 +80,7 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - // Calcite does eagerly check expression types - @Ignore - @Test(expected = IllegalArgumentException.class) + @Test(expected = CodeGenException.class) public void testNonWorkingSubstring1() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -97,14 +95,10 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase { .select("a.substring(0, b)"); DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); - List<Row> results = resultSet.collect(); - String expected = ""; - compareResultAsText(results, expected); + resultSet.collect(); } - // Calcite does eagerly check expression types - @Ignore - @Test(expected = IllegalArgumentException.class) + @Test(expected = CodeGenException.class) public void testNonWorkingSubstring2() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -119,8 +113,6 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase { .select("a.substring(b, 15)"); DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class); - List<Row> results = resultSet.collect(); - String expected = ""; - compareResultAsText(results, expected); + resultSet.collect(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java index 55f1bde..a893d4d 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/scala/table/test/PageRankTableITCase.java @@ -74,7 +74,9 @@ public class PageRankTableITCase extends JavaProgramTestBase { tConfigs.add(config); } - // TODO: Disabling test until Table API is operational again + // TODO: Enable test again once: + // 1) complex types (long[]) can be shipped through Table API + // 2) abs function is available // return toParameterList(tConfigs); return new LinkedList<>(); } http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala index 00261c0..ad9a66d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AggregationsITCase.scala @@ -18,6 +18,7 @@ package org.apache.flink.api.scala.table.test +import org.apache.flink.api.table.plan.PlanGenException import org.apache.flink.api.table.{ExpressionException, Row} import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ @@ -50,10 +51,6 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa val env = ExecutionEnvironment.getExecutionEnvironment val t = CollectionDataSets.get3TupleDataSet(env).toTable .select('foo.avg) - - val expected = "" - val results = t.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) } @Test @@ -124,17 +121,14 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa TestBaseUtils.compareResultAsText(result.asJava, expected) } - @Ignore // Calcite does not eagerly check types - @Test(expected = classOf[ExpressionException]) + @Test(expected = classOf[PlanGenException]) def testNonWorkingAggregationDataTypes(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val t = env.fromElements(("Hello", 1)).toTable .select('_1.sum) - val expected = "" - val results = t.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) + t.collect() } @Test(expected = classOf[IllegalArgumentException]) @@ -143,10 +137,6 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa val env = ExecutionEnvironment.getExecutionEnvironment val t = env.fromElements(("Hello", 1)).toTable .select('_2.sum.sum) - - val expected = "" - val results = t.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) } @Test http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala index f2120ef..9f9a3b4 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/AsITCase.scala @@ -43,7 +43,9 @@ class AsITCase( def testAs(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment - val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) + val t = CollectionDataSets.get3TupleDataSet(env) + .as('a, 'b, 'c) + .select('a, 'b, 'c) val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + @@ -64,7 +66,9 @@ class AsITCase( SomeCaseClass("Anna", 56, 10000.00, "Engineering"), SomeCaseClass("Lucy", 42, 6000.00, "HR")) - val t = env.fromCollection(data).as('a, 'b, 'c, 'd) + val t = env.fromCollection(data) + .as('a, 'b, 'c, 'd) + .select('a, 'b, 'c, 'd) val expected: String = "Peter,28,4000.0,Sales\n" + @@ -83,7 +87,9 @@ class AsITCase( SomeCaseClass("Anna", 56, 10000.00, "Engineering"), SomeCaseClass("Lucy", 42, 6000.00, "HR")) - val t = env.fromCollection(data).as('a, 'b, 'c, 'd) + val t = env.fromCollection(data) + .as('a, 'b, 'c, 'd) + .select('a, 'b, 'c, 'd) val expected: String = "SomeCaseClass(Peter,28,4000.0,Sales)\n" + @@ -97,33 +103,24 @@ class AsITCase( def testAsWithToFewFields(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment - val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b) - - val expected = "no" - val results = t.toDataSet[Row](getConfig).collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) + val t = CollectionDataSets.get3TupleDataSet(env) + .as('a, 'b) } @Test(expected = classOf[IllegalArgumentException]) def testAsWithToManyFields(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment - val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd) - - val expected = "no" - val results = t.toDataSet[Row](getConfig).collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) + val t = CollectionDataSets.get3TupleDataSet(env) + .as('a, 'b, 'c, 'd) } @Test(expected = classOf[IllegalArgumentException]) def testAsWithAmbiguousFields(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment - val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b) - - val expected = "no" - val results = t.toDataSet[Row](getConfig).collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) + val t = CollectionDataSets.get3TupleDataSet(env) + .as('a, 'b, 'b) } @Test(expected = classOf[IllegalArgumentException]) @@ -131,11 +128,9 @@ class AsITCase( val env = ExecutionEnvironment.getExecutionEnvironment // as can only have field references - val t = CollectionDataSets.get3TupleDataSet(env).as('a + 1, 'b, 'b) - - val expected = "no" - val results = t.toDataSet[Row](getConfig).collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) + val t = CollectionDataSets.get3TupleDataSet(env) + .as('a, 'b, 'c) + .as('a + 1, 'b, 'c) } @Test(expected = classOf[IllegalArgumentException]) @@ -143,11 +138,9 @@ class AsITCase( val env = ExecutionEnvironment.getExecutionEnvironment // as can only have field references - val t = CollectionDataSets.get3TupleDataSet(env).as('a as 'foo, 'b, 'b) - - val expected = "no" - val results = t.toDataSet[Row](getConfig).collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) + val t = CollectionDataSets.get3TupleDataSet(env) + .as('a, 'b, 'c) + .as('a as 'foo, 'b, 'c) } } http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala index 8e11f76..111525f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala @@ -66,9 +66,8 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo TestBaseUtils.compareResultAsText(results.asJava, expected) } - // TODO support advanced String operations - @Ignore + @Ignore // TODO support advanced String operations @Test def testAutoCastToString(): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala index 0febd4d..ae8ebef 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala @@ -74,20 +74,18 @@ class FilterITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } - // TODO test broken does not test Table API - @Ignore @Test def testFilterOnStringTupleField(): Unit = { /* * Test filter on String tuple field. */ val env = ExecutionEnvironment.getExecutionEnvironment - val ds = CollectionDataSets.get3TupleDataSet(env) - val filterDs = ds.filter( _._3.contains("world") ) + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) + val filterDs = ds.filter( 'c.like("%world%") ) -// val expected = "(3,2,Hello world)\n" + "(4,3,Hello world, how are you?)\n" -// val results = filterDs.toDataSet[Row](getConfig).collect() -// TestBaseUtils.compareResultAsText(results.asJava, expected) + val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n" + val results = filterDs.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) } @Test @@ -152,27 +150,21 @@ class FilterITCase( TestBaseUtils.compareResultAsText(results.asJava, expected) } - // These two not yet done, but are planned - // TODO test broken does not test Table API - @Ignore @Test def testFilterBasicType(): Unit = { /* * Test filter on basic type */ - val env = ExecutionEnvironment.getExecutionEnvironment val ds = CollectionDataSets.getStringDataSet(env) - val filterDs = ds.filter( _.startsWith("H") ) + val filterDs = ds.as('a).filter( 'a.like("H%") ) -// val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n" -// val results = filterDs.toDataSet[Row](getConfig).collect() -// TestBaseUtils.compareResultAsText(results.asJava, expected) + val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n" + val results = filterDs.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) } - // TODO test broken does not test Table API - @Ignore @Test def testFilterOnCustomType(): Unit = { /* @@ -180,11 +172,12 @@ class FilterITCase( */ val env = ExecutionEnvironment.getExecutionEnvironment val ds = CollectionDataSets.getCustomTypeDataSet(env) - val filterDs = ds.filter( _.myString.contains("a") ) + val filterDs = ds.as('myInt as 'i, 'myLong as 'l, 'myString as 's) + .filter( 's.like("%a%") ) -// val expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n" -// val results = filterDs.toDataSet[Row](getConfig).collect() -// TestBaseUtils.compareResultAsText(results.asJava, expected) + val expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n" + val results = filterDs.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) } } http://git-wip-us.apache.org/repos/asf/flink/blob/3f8cea74/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala index c4fc346..7977547 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala @@ -21,6 +21,7 @@ package org.apache.flink.api.scala.table.test import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ import org.apache.flink.api.table.Row +import org.apache.flink.api.table.codegen.CodeGenException import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} import org.junit._ @@ -54,32 +55,24 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT TestBaseUtils.compareResultAsText(results.asJava, expected) } - // Calcite does eagerly check expression types - @Ignore - @Test(expected = classOf[IllegalArgumentException]) + @Test(expected = classOf[CodeGenException]) def testNonWorkingSubstring1(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val t = env.fromElements(("AAAA", 2.0), ("BBBB", 1.0)).as('a, 'b) .select('a.substring(0, 'b)) - val expected = "AAA\nBB" val results = t.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) } - // Calcite does eagerly check expression types - @Ignore - @Test(expected = classOf[IllegalArgumentException]) + @Test(expected = classOf[CodeGenException]) def testNonWorkingSubstring2(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val t = env.fromElements(("AAAA", "c"), ("BBBB", "d")).as('a, 'b) .select('a.substring('b, 15)) - val expected = "AAA\nBB" val results = t.toDataSet[Row].collect() - TestBaseUtils.compareResultAsText(results.asJava, expected) }