http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala deleted file mode 100644 index 4927fb2..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.typeinfo - -import java.util - -import org.apache.flink.api.common.ExecutionConfig -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.CompositeType.{TypeComparatorBuilder, -FlatFieldDescriptor} -import org.apache.flink.api.common.typeutils.{CompositeType, TypeSerializer} - -/** - * A TypeInformation that is used to rename fields of an underlying CompositeType. This - * allows the system to translate "as" Table API operations to a [[RenameOperator]] - * that does not get translated to a runtime operator. - */ -class RenamingProxyTypeInfo[T]( - val tpe: CompositeType[T], - val fieldNames: Array[String]) - extends CompositeType[T](tpe.getTypeClass) { - - def getUnderlyingType: CompositeType[T] = tpe - - if (tpe.getArity != fieldNames.length) { - throw new IllegalArgumentException(s"Number of field names '${fieldNames.mkString(",")}' and " + - s"number of fields in underlying type $tpe do not match.") - } - - if (fieldNames.toSet.size != fieldNames.length) { - throw new IllegalArgumentException(s"New field names must be unique. " + - s"Names: ${fieldNames.mkString(",")}.") - } - - override def getFieldIndex(fieldName: String): Int = { - val result = fieldNames.indexOf(fieldName) - if (result != fieldNames.lastIndexOf(fieldName)) { - -2 - } else { - result - } - } - override def getFieldNames: Array[String] = fieldNames - - override def isBasicType: Boolean = tpe.isBasicType - - override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = - tpe.createSerializer(executionConfig) - - override def getArity: Int = tpe.getArity - - override def isKeyType: Boolean = tpe.isKeyType - - override def getTypeClass: Class[T] = tpe.getTypeClass - - override def getGenericParameters: java.util.List[TypeInformation[_]] = tpe.getGenericParameters - - override def isTupleType: Boolean = tpe.isTupleType - - override def toString = { - s"RenamingType(type: ${tpe.getTypeClass.getSimpleName}; " + - s"fields: ${fieldNames.mkString(", ")})" - } - - override def getTypeAt[X](pos: Int): TypeInformation[X] = tpe.getTypeAt(pos) - - override def getTotalFields: Int = tpe.getTotalFields - - override def createComparator( - logicalKeyFields: Array[Int], - orders: Array[Boolean], - logicalFieldOffset: Int, - executionConfig: ExecutionConfig) = - tpe.createComparator(logicalKeyFields, orders, logicalFieldOffset, executionConfig) - - override def getFlatFields( - fieldExpression: String, - offset: Int, - result: util.List[FlatFieldDescriptor]): Unit = { - - // split of head of field expression - val (head, tail) = if (fieldExpression.indexOf('.') >= 0) { - fieldExpression.splitAt(fieldExpression.indexOf('.')) - } else { - (fieldExpression, "") - } - - // replace proxy field name by original field name of wrapped type - val headPos = getFieldIndex(head) - if (headPos >= 0) { - val resolvedHead = tpe.getFieldNames()(headPos) - val resolvedFieldExpr = resolvedHead + tail - - // get flat fields with wrapped field name - tpe.getFlatFields(resolvedFieldExpr, offset, result) - } - else { - throw new IllegalArgumentException(s"Invalid field expression: ${fieldExpression}") - } - } - - override def getTypeAt[X](fieldExpression: String): TypeInformation[X] = { - tpe.getTypeAt(fieldExpression) - } - - override protected def createTypeComparatorBuilder(): TypeComparatorBuilder[T] = { - throw new RuntimeException("This method should never be called because createComparator is " + - "overwritten.") - } - - override def equals(obj: Any): Boolean = { - obj match { - case renamingProxy: RenamingProxyTypeInfo[_] => - renamingProxy.canEqual(this) && - tpe.equals(renamingProxy.tpe) && - fieldNames.sameElements(renamingProxy.fieldNames) - case _ => false - } - } - - override def hashCode(): Int = { - 31 * tpe.hashCode() + util.Arrays.hashCode(fieldNames.asInstanceOf[Array[AnyRef]]) - } - - override def canEqual(obj: Any): Boolean = { - obj.isInstanceOf[RenamingProxyTypeInfo[_]] - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala index 39fc1d8..522c7f3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala @@ -20,32 +20,29 @@ package org.apache.flink.api.table.typeinfo import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.CompositeType.TypeComparatorBuilder -import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer} +import org.apache.flink.api.common.typeutils.TypeComparator import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo -import org.apache.flink.api.table.Row -import org.apache.flink.api.table.expressions.Expression import scala.collection.mutable.ArrayBuffer +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.table.Row /** * TypeInformation for [[Row]]. */ -class RowTypeInfo( - fieldTypes: Seq[TypeInformation[_]], - fieldNames: Seq[String]) - extends CaseClassTypeInfo[Row](classOf[Row], Array(), fieldTypes, fieldNames) { +class RowTypeInfo(fieldTypes: Seq[TypeInformation[_]]) + extends CaseClassTypeInfo[Row]( + classOf[Row], + Array(), + fieldTypes, + for (i <- fieldTypes.indices) yield "f" + i) +{ /** * Temporary variable for directly passing orders to comparators. */ var comparatorOrders: Option[Array[Boolean]] = None - def this(fields: Seq[Expression]) = this(fields.map(_.typeInfo), fields.map(_.name)) - - if (fieldNames.toSet.size != fieldNames.size) { - throw new IllegalArgumentException("Field names must be unique.") - } - override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[Row] = { val fieldSerializers: Array[TypeSerializer[Any]] = new Array[TypeSerializer[Any]](getArity) for (i <- 0 until getArity) { http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java index a3d5edc..dd51b14 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AggregationsITCase.java @@ -57,7 +57,6 @@ import java.util.List; @RunWith(Parameterized.class) public class AggregationsITCase extends MultipleProgramsTestBase { - public AggregationsITCase(TestExecutionMode mode){ super(mode); } http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java index 0f35d75..492596c 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/AsITCase.java @@ -20,6 +20,7 @@ package org.apache.flink.api.java.table.test; import org.apache.flink.api.table.Table; import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.codegen.CodeGenException; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.table.TableEnvironment; @@ -35,7 +36,6 @@ import java.util.List; @RunWith(Parameterized.class) public class AsITCase extends MultipleProgramsTestBase { - public AsITCase(TestExecutionMode mode){ super(mode); } @@ -60,7 +60,7 @@ public class AsITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - @Test + @Test(expected = CodeGenException.class) public void testAsFromPojo() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java index cde78ce..957c093 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/CastingITCase.java @@ -23,15 +23,18 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.table.Table; import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.codegen.CodeGenException; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.table.TableEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple7; import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; + import scala.NotImplementedError; import java.util.List; @@ -39,11 +42,11 @@ import java.util.List; @RunWith(Parameterized.class) public class CastingITCase extends MultipleProgramsTestBase { - public CastingITCase(TestExecutionMode mode){ super(mode); } + @Ignore @Test(expected = NotImplementedError.class) public void testNumericAutocastInArithmetic() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); @@ -64,7 +67,7 @@ public class CastingITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = NotImplementedError.class) + @Test public void testNumericAutocastInComparison() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -86,7 +89,7 @@ public class CastingITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = NotImplementedError.class) + @Test(expected = CodeGenException.class) public void testCastFromString() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -106,7 +109,7 @@ public class CastingITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = NotImplementedError.class) + @Test(expected = CodeGenException.class) public void testCastDateFromString() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -128,7 +131,7 @@ public class CastingITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = NotImplementedError.class) + @Test(expected = CodeGenException.class) public void testCastDateToStringAndLong() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java index 2a17087..51f666e 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/ExpressionsITCase.java @@ -26,6 +26,7 @@ import org.apache.flink.api.java.table.TableEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.table.test.TableProgramsTestBase; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.junit.Test; import org.junit.runner.RunWith; @@ -35,17 +36,16 @@ import scala.NotImplementedError; import java.util.List; @RunWith(Parameterized.class) -public class ExpressionsITCase extends MultipleProgramsTestBase { +public class ExpressionsITCase extends TableProgramsTestBase { - - public ExpressionsITCase(TestExecutionMode mode){ - super(mode); + public ExpressionsITCase(TestExecutionMode mode, TableConfigMode configMode) { + super(mode, configMode); } - @Test(expected = NotImplementedError.class) + @Test public void testArithmetic() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); + TableEnvironment tableEnv = getJavaTableEnvironment(); DataSource<Tuple2<Integer, Integer>> input = env.fromElements(new Tuple2<>(5, 10)); @@ -62,10 +62,10 @@ public class ExpressionsITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = NotImplementedError.class) + @Test public void testLogic() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); + TableEnvironment tableEnv = getJavaTableEnvironment(); DataSource<Tuple2<Integer, Boolean>> input = env.fromElements(new Tuple2<>(5, true)); @@ -82,10 +82,10 @@ public class ExpressionsITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = NotImplementedError.class) + @Test public void testComparisons() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); + TableEnvironment tableEnv = getJavaTableEnvironment(); DataSource<Tuple3<Integer, Integer, Integer>> input = env.fromElements(new Tuple3<>(5, 5, 4)); http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java index cd08879..f48be48 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/FilterITCase.java @@ -24,6 +24,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.table.TableEnvironment; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.table.test.TableProgramsTestBase; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.junit.Test; @@ -34,17 +35,16 @@ import scala.NotImplementedError; import java.util.List; @RunWith(Parameterized.class) -public class FilterITCase extends MultipleProgramsTestBase { +public class FilterITCase extends TableProgramsTestBase { - - public FilterITCase(TestExecutionMode mode){ - super(mode); + public FilterITCase(TestExecutionMode mode, TableConfigMode configMode){ + super(mode, configMode); } - @Test(expected = NotImplementedError.class) + @Test public void testAllRejectingFilter() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); + TableEnvironment tableEnv = getJavaTableEnvironment(); DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); @@ -63,7 +63,7 @@ public class FilterITCase extends MultipleProgramsTestBase { @Test public void testAllPassingFilter() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); + TableEnvironment tableEnv = getJavaTableEnvironment(); DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); @@ -85,10 +85,10 @@ public class FilterITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = NotImplementedError.class) + @Test public void testFilterOnIntegerTupleField() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); + TableEnvironment tableEnv = getJavaTableEnvironment(); DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); @@ -106,10 +106,10 @@ public class FilterITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = NotImplementedError.class) + @Test public void testNotEquals() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); + TableEnvironment tableEnv = getJavaTableEnvironment(); DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env); @@ -127,10 +127,10 @@ public class FilterITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = NotImplementedError.class) + @Test public void testIntegerBiggerThan128() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); + TableEnvironment tableEnv = getJavaTableEnvironment(); DataSet<Tuple3<Integer, Long, String>> input = env.fromElements(new Tuple3<>(300, 1L, "Hello")); http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java index d5dc56a..524dd4e 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/GroupedAggregationsITCase.java @@ -36,7 +36,6 @@ import java.util.List; @RunWith(Parameterized.class) public class GroupedAggregationsITCase extends MultipleProgramsTestBase { - public GroupedAggregationsITCase(TestExecutionMode mode){ super(mode); } http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java index 2e1f4a7..95213ee 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/JoinITCase.java @@ -39,7 +39,6 @@ import java.util.List; @RunWith(Parameterized.class) public class JoinITCase extends MultipleProgramsTestBase { - public JoinITCase(TestExecutionMode mode) { super(mode); } http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java index 5ef0235..993638d 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/PojoGroupingITCase.java @@ -26,11 +26,11 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.table.TableEnvironment; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.table.Table; +import org.apache.flink.api.table.TableException; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import scala.NotImplementedError; @RunWith(Parameterized.class) public class PojoGroupingITCase extends MultipleProgramsTestBase { @@ -39,7 +39,7 @@ public class PojoGroupingITCase extends MultipleProgramsTestBase { super(mode); } - @Test(expected = NotImplementedError.class) + @Test(expected = TableException.class) public void testPojoGrouping() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java index a66219c..ada0e06 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/SelectITCase.java @@ -24,6 +24,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.table.TableEnvironment; import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.table.test.TableProgramsTestBase; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.junit.Test; @@ -34,17 +35,16 @@ import scala.NotImplementedError; import java.util.List; @RunWith(Parameterized.class) -public class SelectITCase extends MultipleProgramsTestBase { +public class SelectITCase extends TableProgramsTestBase { - - public SelectITCase(TestExecutionMode mode) { - super(mode); + public SelectITCase(TestExecutionMode mode, TableConfigMode configMode) { + super(mode, configMode); } @Test public void testSimpleSelectAllWithAs() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); + TableEnvironment tableEnv = getJavaTableEnvironment(); DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); @@ -66,10 +66,10 @@ public class SelectITCase extends MultipleProgramsTestBase { } - @Test(expected = NotImplementedError.class) + @Test public void testSimpleSelectWithNaming() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); + TableEnvironment tableEnv = getJavaTableEnvironment(); DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); @@ -90,7 +90,7 @@ public class SelectITCase extends MultipleProgramsTestBase { @Test(expected = IllegalArgumentException.class) public void testAsWithToFewFields() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); + TableEnvironment tableEnv = getJavaTableEnvironment(); DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); @@ -105,7 +105,7 @@ public class SelectITCase extends MultipleProgramsTestBase { @Test(expected = IllegalArgumentException.class) public void testAsWithToManyFields() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); + TableEnvironment tableEnv = getJavaTableEnvironment(); DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); @@ -120,7 +120,7 @@ public class SelectITCase extends MultipleProgramsTestBase { @Test(expected = IllegalArgumentException.class) public void testAsWithAmbiguousFields() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); + TableEnvironment tableEnv = getJavaTableEnvironment(); DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); @@ -135,7 +135,7 @@ public class SelectITCase extends MultipleProgramsTestBase { @Test(expected = IllegalArgumentException.class) public void testOnlyFieldRefInAs() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - TableEnvironment tableEnv = new TableEnvironment(); + TableEnvironment tableEnv = getJavaTableEnvironment(); DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env); http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java index c4b2f01..315fe9f 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/StringExpressionsITCase.java @@ -20,6 +20,7 @@ package org.apache.flink.api.java.table.test; import org.apache.flink.api.table.Row; import org.apache.flink.api.table.Table; +import org.apache.flink.api.table.codegen.CodeGenException; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.table.TableEnvironment; @@ -29,19 +30,17 @@ import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import scala.NotImplementedError; import java.util.List; @RunWith(Parameterized.class) public class StringExpressionsITCase extends MultipleProgramsTestBase { - public StringExpressionsITCase(TestExecutionMode mode) { super(mode); } - @Test(expected = NotImplementedError.class) + @Test(expected = CodeGenException.class) public void testSubstring() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); @@ -61,7 +60,7 @@ public class StringExpressionsITCase extends MultipleProgramsTestBase { compareResultAsText(results, expected); } - @Test(expected = NotImplementedError.class) + @Test(expected = CodeGenException.class) public void testSubstringWithMaxEnd() throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); TableEnvironment tableEnv = new TableEnvironment(); http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java index d6297d9..ec4cd1c 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/table/test/UnionITCase.java @@ -38,7 +38,6 @@ import java.util.List; @RunWith(Parameterized.class) public class UnionITCase extends MultipleProgramsTestBase { - public UnionITCase(TestExecutionMode mode) { super(mode); } http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala index 4a37737..d6a853d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/CastingITCase.scala @@ -19,19 +19,17 @@ package org.apache.flink.api.scala.table.test import java.util.Date - import org.junit._ import org.junit.runner.RunWith import org.junit.runners.Parameterized - import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ import org.apache.flink.api.table.Row import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase} import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode - import scala.collection.JavaConverters._ +import org.apache.flink.api.table.codegen.CodeGenException @RunWith(classOf[Parameterized]) class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { @@ -49,6 +47,7 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo TestBaseUtils.compareResultAsText(results.asJava, expected) } + @Ignore // gives different types of exceptions for cluster and collection modes @Test(expected = classOf[NotImplementedError]) def testNumericAutoCastInArithmetic(): Unit = { @@ -63,7 +62,7 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[NotImplementedError]) + @Test def testNumericAutoCastInComparison(): Unit = { // don't test everything, just some common cast directions @@ -79,7 +78,7 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[NotImplementedError]) + @Test(expected = classOf[CodeGenException]) def testCastFromString: Unit = { val env = ExecutionEnvironment.getExecutionEnvironment @@ -106,7 +105,7 @@ class CastingITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mo TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[NotImplementedError]) + @Test(expected = classOf[CodeGenException]) def testCastDateToStringAndLong { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val ds = env.fromElements(("2011-05-03 15:51:36.000", "1304437896000")) http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala index 9f20043..f300547 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/ExpressionsITCase.scala @@ -21,12 +21,14 @@ package org.apache.flink.api.scala.table.test import java.util.Date import org.apache.flink.api.common.typeinfo.BasicTypeInfo -import org.apache.flink.api.table.expressions.Literal -import org.apache.flink.api.table.Row import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ -import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase} +import org.apache.flink.api.table.Row +import org.apache.flink.api.table.expressions.Literal +import org.apache.flink.api.table.test.TableProgramsTestBase +import org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils import org.junit._ import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -34,9 +36,12 @@ import org.junit.runners.Parameterized import scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) -class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { +class ExpressionsITCase( + mode: TestExecutionMode, + config: TableConfigMode) + extends TableProgramsTestBase(mode, config) { - @Test(expected = classOf[NotImplementedError]) + @Test def testArithmetic(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment @@ -44,11 +49,11 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas .select('a - 5, 'a + 5, 'a / 2, 'a * 2, 'a % 2, -'a) val expected = "0,10,2,10,1,-5" - val results = t.toDataSet[Row].collect() + val results = t.toDataSet[Row](getConfig).collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[NotImplementedError]) + @Test def testLogic(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment @@ -56,11 +61,11 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas .select('b && true, 'b && false, 'b || false, !'b) val expected = "true,false,true,false" - val results = t.toDataSet[Row].collect() + val results = t.toDataSet[Row](getConfig).collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[NotImplementedError]) + @Test def testComparisons(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment @@ -68,11 +73,13 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas .select('a > 'c, 'a >= 'b, 'a < 'c, 'a.isNull, 'a.isNotNull) val expected = "true,true,false,false,true" - val results = t.toDataSet[Row].collect() + val results = t.toDataSet[Row](getConfig).collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[NotImplementedError]) + // advanced functions not supported yet + @Ignore + @Test def testCaseInsensitiveForAs(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment @@ -81,7 +88,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas .groupBy("a").select("a, a.count As cnt") val expected = "3,1" - val results = t.toDataSet[Row].collect() + val results = t.toDataSet[Row](getConfig).collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } @@ -97,7 +104,7 @@ class ExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBas 'a.cast(BasicTypeInfo.DATE_TYPE_INFO).cast(BasicTypeInfo.STRING_TYPE_INFO)) val expected = "0,1970-01-01 00:00:00.000,1970-01-01 00:00:00.000" - val results = t.toDataSet[Row].collect() + val results = t.toDataSet[Row](getConfig).collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala index 7b1c5de..96b9341 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala @@ -18,12 +18,15 @@ package org.apache.flink.api.scala.table.test -import org.apache.flink.api.table.expressions.Literal import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase} +import org.apache.flink.api.table.Row +import org.apache.flink.api.table.expressions.Literal +import org.apache.flink.api.table.test.TableProgramsTestBase +import org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils import org.junit._ import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -32,7 +35,10 @@ import scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) -class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { +class FilterITCase( + mode: TestExecutionMode, + configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { @Test def testAllRejectingFilter(): Unit = { @@ -44,9 +50,9 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod val filterDs = ds.filter( Literal(false) ) -// val expected = "\n" -// val results = filterDs.collect() -// TestBaseUtils.compareResultAsText(results.asJava, expected) + val expected = "\n" + val results = filterDs.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) } @Test @@ -57,17 +63,19 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod val env = ExecutionEnvironment.getExecutionEnvironment val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) -// val filterDs = ds.filter( Literal(true) ) -// val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + -// "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + -// "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + -// "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + -// "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + -// "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" -// val results = filterDs.collect() -// TestBaseUtils.compareResultAsText(results.asJava, expected) + val filterDs = ds.filter( Literal(true) ) + val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + + "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + + "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + + "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + + "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + + "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" + val results = filterDs.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) } + // TODO test broken does not test Table API + @Ignore @Test def testFilterOnStringTupleField(): Unit = { /* @@ -78,7 +86,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod val filterDs = ds.filter( _._3.contains("world") ) // val expected = "(3,2,Hello world)\n" + "(4,3,Hello world, how are you?)\n" -// val results = filterDs.collect() +// val results = filterDs.toDataSet[Row](getConfig).collect() // TestBaseUtils.compareResultAsText(results.asJava, expected) } @@ -92,12 +100,12 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod val filterDs = ds.filter( 'a % 2 === 0 ) -// val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + -// "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" + -// "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," + -// "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n" -// val results = filterDs.collect() -// TestBaseUtils.compareResultAsText(results.asJava, expected) + val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + + "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" + + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," + + "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n" + val results = filterDs.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) } @Test @@ -118,7 +126,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod } // These two not yet done, but are planned - + // TODO test broken does not test Table API @Ignore @Test def testFilterBasicType(): Unit = { @@ -132,10 +140,11 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod val filterDs = ds.filter( _.startsWith("H") ) // val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n" -// val results = filterDs.collect() +// val results = filterDs.toDataSet[Row](getConfig).collect() // TestBaseUtils.compareResultAsText(results.asJava, expected) } + // TODO test broken does not test Table API @Ignore @Test def testFilterOnCustomType(): Unit = { @@ -147,7 +156,7 @@ class FilterITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod val filterDs = ds.filter( _.myString.contains("a") ) // val expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n" -// val results = filterDs.collect() +// val results = filterDs.toDataSet[Row](getConfig).collect() // TestBaseUtils.compareResultAsText(results.asJava, expected) } http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala index 3700d67..e181e0b 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/SelectITCase.scala @@ -18,12 +18,14 @@ package org.apache.flink.api.scala.table.test -import org.apache.flink.api.table.{Row, ExpressionException} import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.test.util.{TestBaseUtils, MultipleProgramsTestBase} +import org.apache.flink.api.table.Row +import org.apache.flink.api.table.test.TableProgramsTestBase +import org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils import org.junit._ import org.junit.runner.RunWith import org.junit.runners.Parameterized @@ -31,7 +33,10 @@ import org.junit.runners.Parameterized import scala.collection.JavaConverters._ @RunWith(classOf[Parameterized]) -class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { +class SelectITCase( + mode: TestExecutionMode, + configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { @Test def testSimpleSelectAll(): Unit = { @@ -45,7 +50,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" - val results = t.toDataSet[Row].collect() + val results = t.toDataSet[Row](getConfig).collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } @@ -61,11 +66,11 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" - val results = t.toDataSet[Row].collect() + val results = t.toDataSet[Row](getConfig).collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[NotImplementedError]) + @Test def testSimpleSelectWithNaming(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment @@ -76,7 +81,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" + "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" + "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n" - val results = t.toDataSet[Row].collect() + val results = t.toDataSet[Row](getConfig).collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } @@ -87,7 +92,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b) val expected = "no" - val results = t.toDataSet[Row].collect() + val results = t.toDataSet[Row](getConfig).collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } @@ -98,7 +103,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c, 'd) val expected = "no" - val results = t.toDataSet[Row].collect() + val results = t.toDataSet[Row](getConfig).collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } @@ -109,7 +114,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'b) val expected = "no" - val results = t.toDataSet[Row].collect() + val results = t.toDataSet[Row](getConfig).collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } @@ -121,7 +126,7 @@ class SelectITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mod val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b as 'c, 'd) val expected = "no" - val results = t.toDataSet[Row].collect() + val results = t.toDataSet[Row](getConfig).collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } } http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala index 565f444..f2e9aca 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/StringExpressionsITCase.scala @@ -26,13 +26,13 @@ import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode import org.junit._ import org.junit.runner.RunWith import org.junit.runners.Parameterized - import scala.collection.JavaConverters._ +import org.apache.flink.api.table.codegen.CodeGenException @RunWith(classOf[Parameterized]) class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { - @Test(expected = classOf[NotImplementedError]) + @Test(expected = classOf[CodeGenException]) def testSubstring(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val t = env.fromElements(("AAAA", 2), ("BBBB", 1)).as('a, 'b) @@ -43,7 +43,7 @@ class StringExpressionsITCase(mode: TestExecutionMode) extends MultipleProgramsT TestBaseUtils.compareResultAsText(results.asJava, expected) } - @Test(expected = classOf[NotImplementedError]) + @Test(expected = classOf[CodeGenException]) def testSubstringWithMaxEnd(): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val t = env.fromElements(("ABCD", 2), ("ABCD", 1)).as('a, 'b) http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/TableProgramsTestBase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/TableProgramsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/TableProgramsTestBase.scala new file mode 100644 index 0000000..0962f4e --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/TableProgramsTestBase.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.test + +import java.util + +import org.apache.flink.api.java.table.{TableEnvironment => JavaTableEnv} +import org.apache.flink.api.java.{ExecutionEnvironment => JavaEnv} +import org.apache.flink.api.scala.table.{TableEnvironment => ScalaTableEnv} +import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaEnv} +import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode +import org.apache.flink.api.table.test.TableProgramsTestBase.TableConfigMode.{EFFICIENT, NULL} +import org.apache.flink.test.util.MultipleProgramsTestBase +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.junit.runners.Parameterized + +import scala.collection.JavaConversions._ + +class TableProgramsTestBase( + mode: TestExecutionMode, + tableConfigMode: TableConfigMode) + extends MultipleProgramsTestBase(mode) { + + def getJavaTableEnvironment: JavaTableEnv = { + val env = JavaEnv.getExecutionEnvironment // TODO pass it to tableEnv + val tableEnv = new JavaTableEnv + configure(tableEnv.getConfig) + tableEnv + } + + def getScalaTableEnvironment: ScalaTableEnv = { + val env = ScalaEnv.getExecutionEnvironment // TODO pass it to tableEnv + val tableEnv = new ScalaTableEnv + configure(tableEnv.getConfig) + tableEnv + } + + def getConfig: TableConfig = { + val config = new TableConfig() + configure(config) + config + } + + def configure(config: TableConfig): Unit = { + tableConfigMode match { + case NULL => + config.setNullCheck(true) + case EFFICIENT => + config.setEfficientTypeUsage(true) + case _ => // keep default + } + } + +} + +object TableProgramsTestBase { + sealed trait TableConfigMode { def nullCheck: Boolean; def efficientTypes: Boolean } + object TableConfigMode { + case object DEFAULT extends TableConfigMode { + val nullCheck = false; val efficientTypes = false + } + case object NULL extends TableConfigMode { + val nullCheck = true; val efficientTypes = false + } + case object EFFICIENT extends TableConfigMode { + val nullCheck = false; val efficientTypes = true + } + } + + @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}") + def tableConfigs(): util.Collection[Array[java.lang.Object]] = { + Seq( + // TODO more tests in cluster mode? + Array[AnyRef](TestExecutionMode.CLUSTER, TableConfigMode.DEFAULT), + Array[AnyRef](TestExecutionMode.COLLECTION, TableConfigMode.DEFAULT), + Array[AnyRef](TestExecutionMode.COLLECTION, TableConfigMode.NULL), + Array[AnyRef](TestExecutionMode.COLLECTION, TableConfigMode.EFFICIENT) + ) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala deleted file mode 100644 index ef616a9..0000000 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfoTest.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.table.typeinfo - -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.typeutils.TypeExtractor -import org.apache.flink.util.TestLogger -import org.junit.Test -import org.scalatest.junit.JUnitSuiteLike - -class RenamingProxyTypeInfoTest extends TestLogger with JUnitSuiteLike { - - @Test - def testRenamingProxyTypeEquality(): Unit = { - val pojoTypeInfo1 = TypeExtractor.createTypeInfo(classOf[TestPojo]) - .asInstanceOf[CompositeType[TestPojo]] - - val tpeInfo1 = new RenamingProxyTypeInfo[TestPojo]( - pojoTypeInfo1, - Array("someInt", "aString", "doubleArray")) - - val tpeInfo2 = new RenamingProxyTypeInfo[TestPojo]( - pojoTypeInfo1, - Array("someInt", "aString", "doubleArray")) - - assert(tpeInfo1.equals(tpeInfo2)) - assert(tpeInfo1.hashCode() == tpeInfo2.hashCode()) - } - - @Test - def testRenamingProxyTypeInequality(): Unit = { - val pojoTypeInfo1 = TypeExtractor.createTypeInfo(classOf[TestPojo]) - .asInstanceOf[CompositeType[TestPojo]] - - val tpeInfo1 = new RenamingProxyTypeInfo[TestPojo]( - pojoTypeInfo1, - Array("someInt", "aString", "doubleArray")) - - val tpeInfo2 = new RenamingProxyTypeInfo[TestPojo]( - pojoTypeInfo1, - Array("foobar", "aString", "doubleArray")) - - assert(!tpeInfo1.equals(tpeInfo2)) - } -} - -final class TestPojo { - var someInt: Int = 0 - private var aString: String = null - var doubleArray: Array[Double] = null - - def setaString(aString: String) { - this.aString = aString - } - - def getaString: String = { - return aString - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowComparatorTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowComparatorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowComparatorTest.scala index a58d0b7..11d1a8a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowComparatorTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowComparatorTest.scala @@ -38,8 +38,7 @@ class RowComparatorTest extends ComparatorTestBase[Row] { BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO, BasicTypeInfo.SHORT_TYPE_INFO), - TypeExtractor.createTypeInfo(classOf[MyPojo])), - Array("f0", "f1", "f2", "f3", "f4")) + TypeExtractor.createTypeInfo(classOf[MyPojo]))) val testPojo1 = new MyPojo() // TODO we cannot test null here as PojoComparator has no support for null keys http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala index 60a02ae..fc000fd 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala @@ -51,7 +51,7 @@ class RowSerializerTest { @Test def testRowSerializer(): Unit = { val rowInfo: TypeInformation[Row] = new RowTypeInfo( - Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), Seq("id", "name")) + Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)) val row1 = new Row(2) row1.setField(0, 1) @@ -85,20 +85,7 @@ class RowSerializerTest { BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, - BasicTypeInfo.STRING_TYPE_INFO), Seq( - "id0", - "id1", - "id2", - "id3", - "id4", - "id5", - "id6", - "id7", - "id8", - "id9", - "id10", - "id11", - "name")) + BasicTypeInfo.STRING_TYPE_INFO)) val row = new Row(13) row.setField(0, 2) @@ -134,8 +121,7 @@ class RowSerializerTest { BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO, BasicTypeInfo.SHORT_TYPE_INFO), - TypeExtractor.createTypeInfo(classOf[MyPojo])), - Array("f0", "f1", "f2", "f3", "f4")) + TypeExtractor.createTypeInfo(classOf[MyPojo]))) val testPojo1 = new MyPojo() testPojo1.name = null