Repository: flink Updated Branches: refs/heads/master b368cb2d5 -> ed1e52a10
[FLINK-3640] Add support for SQL in DataSet programs - add EnumerableToLogicalScan rule - in order to be able to mix TableAPI and SQL, we need our own copy of PlannerImpl - create a dummy RelNode in the reset() method, in order to retrieve the RelOptPlanner This closes #1862 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed1e52a1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed1e52a1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed1e52a1 Branch: refs/heads/master Commit: ed1e52a106e9ce3789fab975e34715de2d923bec Parents: b368cb2 Author: vasia <va...@apache.org> Authored: Wed Mar 23 19:35:06 2016 +0100 Committer: vasia <va...@apache.org> Committed: Mon Apr 11 12:27:35 2016 +0200 ---------------------------------------------------------------------- .../api/java/table/JavaBatchTranslator.scala | 30 ++- .../api/table/AbstractTableEnvironment.scala | 15 ++ .../api/table/FlinkCalciteSqlValidator.scala | 47 ++++ .../flink/api/table/FlinkPlannerImpl.scala | 180 +++++++++++++ .../flink/api/table/plan/PlanTranslator.scala | 2 - .../api/table/plan/TranslationContext.scala | 24 +- .../rules/EnumerableToLogicalTableScan.scala | 50 ++++ .../api/table/plan/rules/FlinkRuleSets.scala | 1 + .../api/scala/sql/test/AggregationsITCase.scala | 224 ++++++++++++++++ .../flink/api/scala/sql/test/FilterITCase.scala | 166 ++++++++++++ .../flink/api/scala/sql/test/JoinITCase.scala | 259 +++++++++++++++++++ .../flink/api/scala/sql/test/SelectITCase.scala | 155 +++++++++++ .../api/scala/sql/test/TableWithSQLITCase.scala | 103 ++++++++ .../flink/api/scala/sql/test/UnionITCase.scala | 109 ++++++++ .../api/scala/table/test/FilterITCase.scala | 1 - 15 files changed, 1348 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala index 028711b..4688c82 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/JavaBatchTranslator.scala @@ -19,14 +19,14 @@ package org.apache.flink.api.java.table import org.apache.calcite.plan.RelOptPlanner.CannotPlanException -import org.apache.calcite.plan.{RelTraitSet, RelOptUtil} -import org.apache.calcite.rel.{RelCollations, RelNode} +import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rel.RelNode import org.apache.calcite.sql2rel.RelDecorrelator import org.apache.calcite.tools.Programs import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.{DataSet => JavaDataSet} import org.apache.flink.api.table.plan._ -import org.apache.flink.api.table.{TableConfig, Table} +import org.apache.flink.api.table.{FlinkPlannerImpl, TableConfig, Table} import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetRel} import org.apache.flink.api.table.plan.rules.FlinkRuleSets import org.apache.flink.api.table.plan.schema.DataSetTable @@ -56,7 +56,8 @@ class JavaBatchTranslator(config: TableConfig) extends PlanTranslator { // create table scan operator relBuilder.scan(tabName) - new Table(relBuilder.build(), relBuilder) + val relNode = relBuilder.build() + new Table(relNode, relBuilder) } override def translate[A](lPlan: RelNode)(implicit tpe: TypeInformation[A]): JavaDataSet[A] = { @@ -69,9 +70,7 @@ class JavaBatchTranslator(config: TableConfig) extends PlanTranslator { // optimize the logical Flink plan val optProgram = Programs.ofRules(FlinkRuleSets.DATASET_OPT_RULES) - val flinkOutputProps = RelTraitSet.createEmpty() - .plus(DataSetConvention.INSTANCE) - .plus(RelCollations.of()).simplify() + val flinkOutputProps = lPlan.getTraitSet.replace(DataSetConvention.INSTANCE).simplify() val dataSetPlan = try { optProgram.run(planner, decorPlan, flinkOutputProps) @@ -97,4 +96,21 @@ class JavaBatchTranslator(config: TableConfig) extends PlanTranslator { } + /** + * Parse, validate, and translate a SQL query into a relNode Table + */ + def translateSQL(query: String): Table = { + + val frameworkConfig = TranslationContext.getFrameworkConfig + val planner = new FlinkPlannerImpl(frameworkConfig, TranslationContext.getPlanner) + // parse the sql query + val parsed = planner.parse(query) + // validate the sql query + val validated = planner.validate(parsed) + // transform to a relational tree + val relational = planner.rel(validated) + + new Table(relational.rel, TranslationContext.getRelBuilder) + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/AbstractTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/AbstractTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/AbstractTableEnvironment.scala index 4dedc47..a0f162e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/AbstractTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/AbstractTableEnvironment.scala @@ -19,6 +19,7 @@ package org.apache.flink.api.table import org.apache.flink.api.java.DataSet +import org.apache.flink.api.java.table.JavaBatchTranslator import org.apache.flink.api.table.expressions.Expression import org.apache.flink.api.table.plan.TranslationContext import org.apache.flink.api.table.plan.schema.{DataSetTable, TableTable} @@ -83,4 +84,18 @@ class AbstractTableEnvironment { ) TranslationContext.registerTable(dataSetTable, name) } + + /** + * Execute a SQL query and retrieve the result as a [[Table]]. + * All input [[Table]]s have to be registered in the + * [[org.apache.flink.api.java.table.TableEnvironment]] with unique names, + * using [[registerTable()]] or + * [[org.apache.flink.api.java.table.TableEnvironment.registerDataSet()]] + * + * @param query the SQL query + * @return the result of the SQL query as a [[Table]] + */ + def sql(query: String): Table = { + new JavaBatchTranslator(config).translateSQL(query) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkCalciteSqlValidator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkCalciteSqlValidator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkCalciteSqlValidator.scala new file mode 100644 index 0000000..b1ccc09 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkCalciteSqlValidator.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table + +import org.apache.calcite.adapter.java.JavaTypeFactory +import org.apache.calcite.prepare.CalciteCatalogReader +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.sql.{SqlInsert, SqlOperatorTable} +import org.apache.calcite.sql.validate.{SqlValidatorImpl, SqlConformance} + +/** + * This is a copy of Calcite's CalciteSqlValidator to use with [[FlinkPlannerImpl]]. + */ +class FlinkCalciteSqlValidator( + opTab: SqlOperatorTable, + catalogReader: CalciteCatalogReader, + typeFactory: JavaTypeFactory) extends SqlValidatorImpl( + opTab, catalogReader, typeFactory, SqlConformance.DEFAULT) { + + override def getLogicalSourceRowType( + sourceRowType: RelDataType, + insert: SqlInsert): RelDataType = { + typeFactory.asInstanceOf[JavaTypeFactory].toSql(sourceRowType) + } + + override def getLogicalTargetRowType( + targetRowType: RelDataType, + insert: SqlInsert): RelDataType = { + typeFactory.asInstanceOf[JavaTypeFactory].toSql(targetRowType) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala new file mode 100644 index 0000000..5a1b3fe --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table + +import java.util + +import com.google.common.collect.ImmutableList +import org.apache.calcite.adapter.java.JavaTypeFactory +import org.apache.calcite.jdbc.CalciteSchema +import org.apache.calcite.plan.RelOptTable.ViewExpander +import org.apache.calcite.plan._ +import org.apache.calcite.prepare.CalciteCatalogReader +import org.apache.calcite.rel.RelRoot +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rex.RexBuilder +import org.apache.calcite.schema.SchemaPlus +import org.apache.calcite.sql.parser.{SqlParser, SqlParseException} +import org.apache.calcite.sql.validate.SqlValidator +import org.apache.calcite.sql.{SqlNode, SqlOperatorTable} +import org.apache.calcite.sql2rel.{RelDecorrelator, SqlToRelConverter, SqlRexConvertletTable} +import org.apache.calcite.tools.{RelConversionException, ValidationException, Frameworks, FrameworkConfig} +import org.apache.calcite.util.Util +import scala.collection.JavaConversions._ + +/** NOTE: this is heavily insipred by Calcite's PlannerImpl. + We need it in order to share the planner between the Table API relational plans + and the SQL relation plans that are created by the Calcite parser. + The only difference is that we initialize the RelOptPlanner planner + when instantiating, instead of creating a new one in the ready() method. **/ +class FlinkPlannerImpl(config: FrameworkConfig, var planner: RelOptPlanner) { + + val operatorTable: SqlOperatorTable = config.getOperatorTable + /** Holds the trait definitions to be registered with planner. May be null. */ + val traitDefs: ImmutableList[RelTraitDef[_ <: RelTrait]] = config.getTraitDefs + val parserConfig: SqlParser.Config = config.getParserConfig + val convertletTable: SqlRexConvertletTable = config.getConvertletTable + var defaultSchema: SchemaPlus = config.getDefaultSchema + + var typeFactory: JavaTypeFactory = null + var validator: FlinkCalciteSqlValidator = null + var validatedSqlNode: SqlNode = null + var root: RelRoot = null + + private def ready() { + Frameworks.withPlanner(new Frameworks.PlannerAction[Unit] { + def apply( + cluster: RelOptCluster, + relOptSchema: RelOptSchema, + rootSchema: SchemaPlus): Unit = { + + Util.discard(rootSchema) + typeFactory = cluster.getTypeFactory.asInstanceOf[JavaTypeFactory] + if (planner == null) { + planner = cluster.getPlanner + } + } + }, config) + if (this.traitDefs != null) { + planner.clearRelTraitDefs() + for (traitDef <- this.traitDefs) { + planner.addRelTraitDef(traitDef) + } + } + } + + @throws(classOf[SqlParseException]) + def parse(sql: String): SqlNode = { + ready() + val parser: SqlParser = SqlParser.create(sql, parserConfig) + val sqlNode: SqlNode = parser.parseStmt + sqlNode + } + + @throws(classOf[ValidationException]) + def validate(sqlNode: SqlNode): SqlNode = { + validator = new FlinkCalciteSqlValidator(operatorTable, createCatalogReader, typeFactory) + validator.setIdentifierExpansion(true) + try { + validatedSqlNode = validator.validate(sqlNode) + } + catch { + case e: RuntimeException => { + throw new ValidationException(e) + } + } + validatedSqlNode + } + + @throws(classOf[RelConversionException]) + def rel(sql: SqlNode): RelRoot = { + assert(validatedSqlNode != null) + val rexBuilder: RexBuilder = createRexBuilder + val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder) + val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter( + new ViewExpanderImpl, validator, createCatalogReader, cluster, convertletTable) + sqlToRelConverter.setTrimUnusedFields(false) + sqlToRelConverter.enableTableAccessConversion(false) + root = sqlToRelConverter.convertQuery(validatedSqlNode, false, true) + root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true)) + root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel)) + root + } + + /** Implements [[org.apache.calcite.plan.RelOptTable.ViewExpander]] + * interface for [[org.apache.calcite.tools.Planner]]. */ + class ViewExpanderImpl extends ViewExpander { + + override def expandView( + rowType: RelDataType, + queryString: String, + schemaPath: util.List[String]): RelRoot = { + + val parser: SqlParser = SqlParser.create(queryString, parserConfig) + var sqlNode: SqlNode = null + try { + sqlNode = parser.parseQuery + } + catch { + case e: SqlParseException => + throw new RuntimeException("parse failed", e) + } + val catalogReader: CalciteCatalogReader = createCatalogReader.withSchemaPath(schemaPath) + val validator: SqlValidator = + new FlinkCalciteSqlValidator(operatorTable, catalogReader, typeFactory) + validator.setIdentifierExpansion(true) + val validatedSqlNode: SqlNode = validator.validate(sqlNode) + val rexBuilder: RexBuilder = createRexBuilder + val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder) + val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter( + new ViewExpanderImpl, validator, catalogReader, cluster, convertletTable) + sqlToRelConverter.setTrimUnusedFields(false) + sqlToRelConverter.enableTableAccessConversion(false) + root = sqlToRelConverter.convertQuery(validatedSqlNode, true, false) + root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true)) + root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel)) + FlinkPlannerImpl.this.root + } + } + + private def createCatalogReader: CalciteCatalogReader = { + val rootSchema: SchemaPlus = FlinkPlannerImpl.rootSchema(defaultSchema) + new CalciteCatalogReader( + CalciteSchema.from(rootSchema), + parserConfig.caseSensitive, + CalciteSchema.from(defaultSchema).path(null), + typeFactory) + } + + private def createRexBuilder: RexBuilder = { + new RexBuilder(typeFactory) + } + +} + +object FlinkPlannerImpl { + private def rootSchema(schema: SchemaPlus): SchemaPlus = { + if (schema.getParentSchema == null) { + schema + } + else { + rootSchema(schema.getParentSchema) + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala index 410c570..2a82dc3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala @@ -73,8 +73,6 @@ abstract class PlanTranslator { */ def createTable[A](repr: Representation[A], exprs: Array[Expression]): Table = { - val inputType = repr.getType() - val (fieldNames, fieldIndexes) = TranslationContext.getFieldInfo(repr.getType(), exprs) createTable(repr, fieldIndexes.toArray, fieldNames.toArray) } http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala index cd5e2b0..c7d9f18 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala @@ -20,19 +20,20 @@ package org.apache.flink.api.table.plan import java.util.concurrent.atomic.AtomicInteger import org.apache.calcite.config.Lex -import org.apache.calcite.plan.ConventionTraitDef +import org.apache.calcite.plan.RelOptPlanner import org.apache.calcite.schema.impl.AbstractTable import org.apache.calcite.schema.SchemaPlus import org.apache.calcite.sql.parser.SqlParser -import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RelBuilder} +import org.apache.calcite.tools.{Programs, FrameworkConfig, Frameworks, RelBuilder} import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.api.table.TableException import org.apache.flink.api.table.expressions.{Naming, UnresolvedFieldReference, Expression} import org.apache.flink.api.table.plan.cost.DataSetCostFactory -import org.apache.flink.api.table.plan.schema.DataSetTable import org.apache.flink.api.table.plan.schema.DataStreamTable +import org.apache.flink.api.table.plan.rules.FlinkRuleSets +import org.apache.flink.api.table.plan.schema.DataSetTable object TranslationContext { @@ -41,6 +42,7 @@ object TranslationContext { private var tables: SchemaPlus = null private var tablesRegistry: Map[String, AbstractTable] = null private val nameCntr: AtomicInteger = new AtomicInteger(0) + private var relOptPlanner: RelOptPlanner = null reset() @@ -53,7 +55,10 @@ object TranslationContext { // configure sql parser // we use Java lex because back ticks are easier than double quotes in programming // and cases are preserved - val parserConfig = SqlParser.configBuilder().setLex(Lex.JAVA).build() + val parserConfig = SqlParser + .configBuilder() + .setLex(Lex.JAVA) + .build() // initialize RelBuilder frameworkConfig = Frameworks @@ -61,13 +66,14 @@ object TranslationContext { .defaultSchema(tables) .parserConfig(parserConfig) .costFactory(new DataSetCostFactory) - .traitDefs(ConventionTraitDef.INSTANCE) .build tablesRegistry = Map[String, AbstractTable]() relBuilder = RelBuilder.create(frameworkConfig) + // create a dummy RelNode, in order to retrieve the planner + val dummyRelNode = relBuilder.values(Array("dummy"), new Integer(1)).build() + relOptPlanner = dummyRelNode.getCluster.getPlanner nameCntr.set(0) - } /** @@ -133,6 +139,10 @@ object TranslationContext { relBuilder } + def getPlanner: RelOptPlanner = { + relOptPlanner + } + def getFrameworkConfig: FrameworkConfig = { frameworkConfig } @@ -207,5 +217,3 @@ object TranslationContext { (fieldNames.toArray, fieldIndexes.toArray) } } - - http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala new file mode 100644 index 0000000..02d2159 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/EnumerableToLogicalTableScan.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.plan.rules + + +import org.apache.calcite.plan.RelOptRule.{any, operand} +import org.apache.calcite.adapter.enumerable.EnumerableTableScan +import org.apache.calcite.plan.{RelOptRuleCall, RelOptRule, RelOptRuleOperand} +import org.apache.calcite.rel.logical.LogicalTableScan + +/** + * Rule that converts an EnumerableTableScan into a LogicalTableScan. + * We need this rule because Calcite creates an EnumerableTableScan + * when parsing a SQL query. We convert it into a LogicalTableScan + * so we can merge the optimization process with any plan that might be created + * by the Table API. + */ +class EnumerableToLogicalTableScan( + operand: RelOptRuleOperand, + description: String) extends RelOptRule(operand, description) { + + override def onMatch(call: RelOptRuleCall): Unit = { + val oldRel = call.rel(0).asInstanceOf[EnumerableTableScan] + val table = oldRel.getTable + val newRel = LogicalTableScan.create(oldRel.getCluster, table) + call.transformTo(newRel) + } +} + +object EnumerableToLogicalTableScan { + val INSTANCE = new EnumerableToLogicalTableScan( + operand(classOf[EnumerableTableScan], any), + "EnumerableToLogicalTableScan") +} http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala index 0324a0e..427530b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala @@ -34,6 +34,7 @@ object FlinkRuleSets { // convert a logical table scan to a relational expression TableScanRule.INSTANCE, + EnumerableToLogicalTableScan.INSTANCE, // push a filter into a join FilterJoinRule.FILTER_ON_JOIN, http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/AggregationsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/AggregationsITCase.scala new file mode 100644 index 0000000..d577564 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/AggregationsITCase.scala @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.sql.test + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.api.table.Row +import org.apache.flink.api.table.plan.TranslationContext +import org.apache.flink.api.table.test.utils.TableProgramsTestBase +import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class AggregationsITCase( + mode: TestExecutionMode, + configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { + + @Test + def testAggregationTypes(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT sum(_1), min(_1), max(_1), count(_1), avg(_1) FROM MyTable" + + val ds = CollectionDataSets.get3TupleDataSet(env) + tEnv.registerDataSet("MyTable", ds) + + val result = tEnv.sql(sqlQuery) + + val expected = "231,1,21,21,11" + val results = result.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testTableAggregation(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT sum(_1) FROM MyTable" + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable + tEnv.registerTable("MyTable", ds) + + val result = tEnv.sql(sqlQuery) + + val expected = "231" + val results = result.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testDataSetAggregation(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT sum(_1) FROM MyTable" + + val ds = CollectionDataSets.get3TupleDataSet(env) + tEnv.registerDataSet("MyTable", ds) + + val result = tEnv.sql(sqlQuery) + + val expected = "231" + val results = result.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testWorkingAggregationDataTypes(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT avg(_1), avg(_2), avg(_3), avg(_4), avg(_5), avg(_6), count(_7)" + + "FROM MyTable" + + val ds = env.fromElements( + (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"), + (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")) + tEnv.registerDataSet("MyTable", ds) + + val result = tEnv.sql(sqlQuery) + + val expected = "1,1,1,1,1.5,1.5,2" + val results = result.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testTableWorkingAggregationDataTypes(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT avg(a), avg(b), avg(c), avg(d), avg(e), avg(f), count(g)" + + "FROM MyTable" + + val ds = env.fromElements( + (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"), + (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).as('a, 'b, 'c, 'd, 'e, 'f, 'g) + tEnv.registerTable("MyTable", ds) + + val result = tEnv.sql(sqlQuery) + + val expected = "1,1,1,1,1.5,1.5,2" + val results = result.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testTableProjection(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT avg(a), sum(a), count(a), avg(b), sum(b) " + + "FROM MyTable" + + val ds = env.fromElements((1: Byte, 1: Short), (2: Byte, 2: Short)).as('a, 'b) + tEnv.registerTable("MyTable", ds) + + val result = tEnv.sql(sqlQuery) + + val expected = "1,3,2,1,3" + val results = result.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testTableAggregationWithArithmetic(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT avg(a + 2) + 2, count(b) + 5 " + + "FROM MyTable" + + val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).as('a, 'b) + tEnv.registerTable("MyTable", ds) + + val result = tEnv.sql(sqlQuery) + + val expected = "5.5,7" + val results = result.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testAggregationWithTwoCount(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT count(_1), count(_2) FROM MyTable" + + val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable + tEnv.registerTable("MyTable", ds) + + val result = tEnv.sql(sqlQuery) + + val expected = "2,2" + val results = result.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + + @Test + def testAggregationAfterProjection(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM " + + "(SELECT _1 as a, _2 as b, _3 as c FROM MyTable)" + + val ds = env.fromElements( + (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"), + (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable + tEnv.registerTable("MyTable", ds) + + val result = tEnv.sql(sqlQuery) + + val expected = "1,3,2" + val results = result.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala new file mode 100644 index 0000000..c89e25a --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/FilterITCase.scala @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.sql.test + +import org.apache.calcite.tools.ValidationException +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.api.table.Row +import org.apache.flink.api.table.plan.TranslationContext +import org.apache.flink.api.table.test.utils.TableProgramsTestBase +import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class FilterITCase( + mode: TestExecutionMode, + configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { + + @Test + def testAllRejectingFilter(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT * FROM MyTable WHERE false" + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable + tEnv.registerTable("MyTable", ds) + + val result = tEnv.sql(sqlQuery) + + val expected = "\n" + val results = result.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testAllPassingFilter(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT * FROM MyTable WHERE true" + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable + tEnv.registerTable("MyTable", ds) + + val result = tEnv.sql(sqlQuery) + + val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " + + "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," + + "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," + + "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," + + "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," + + "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" + val results = result.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testFilterOnString(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT * FROM MyTable WHERE c LIKE '%world%'" + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c) + tEnv.registerTable("MyTable", ds) + + val result = tEnv.sql(sqlQuery) + + val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n" + val results = result.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testFilterOnInteger(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)=0" + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c) + tEnv.registerTable("MyTable", ds) + + val result = tEnv.sql(sqlQuery) + + val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + + "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" + + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," + + "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n" + val results = result.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testDisjunctivePredicate(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT * FROM MyTable WHERE a < 2 OR a > 20" + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c) + tEnv.registerTable("MyTable", ds) + + val result = tEnv.sql(sqlQuery) + + val expected = "1,1,Hi\n" + "21,6,Comment#15\n" + val results = result.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testFilterWithAnd(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)<>0 AND MOD(b,2)=0" + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c) + tEnv.registerTable("MyTable", ds) + + val result = tEnv.sql(sqlQuery) + + val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" + + "9,4,Comment#3\n" + "17,6,Comment#11\n" + + "19,6,Comment#13\n" + "21,6,Comment#15\n" + val results = result.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala new file mode 100644 index 0000000..74844ae --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/JoinITCase.scala @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.sql.test + +import org.apache.calcite.tools.ValidationException +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.api.table.{TableException, Row} +import org.apache.flink.api.table.plan.TranslationContext +import org.apache.flink.api.table.test.utils.TableProgramsTestBase +import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class JoinITCase( + mode: TestExecutionMode, + configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { + + @Test + def testJoin(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e" + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable.as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h) + tEnv.registerTable("Table3", ds1) + tEnv.registerTable("Table5", ds2) + + val result = tEnv.sql(sqlQuery) + + val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n" + val results = result.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testJoinWithFilter(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND b < 2" + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable.as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h) + tEnv.registerTable("Table3", ds1) + tEnv.registerTable("Table5", ds2) + + val result = tEnv.sql(sqlQuery) + + val expected = "Hi,Hallo\n" + val results = result.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testJoinWithJoinFilter(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 AND h < b" + + val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h) + tEnv.registerTable("Table3", ds1) + tEnv.registerTable("Table5", ds2) + + val result = tEnv.sql(sqlQuery) + + val expected = "Hello world, how are you?,Hallo Welt wie\n" + + "I am fine.,Hallo Welt wie\n" + val results = result.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testJoinWithMultipleKeys(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = d AND b = h" + + val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h) + tEnv.registerTable("Table3", ds1) + tEnv.registerTable("Table5", ds2) + + val result = tEnv.sql(sqlQuery) + + val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" + + "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n" + val results = result.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[ValidationException]) + def testJoinNonExistingKey(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE foo = e" + + val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h) + tEnv.registerTable("Table3", ds1) + tEnv.registerTable("Table5", ds2) + + tEnv.sql(sqlQuery) + } + + @Test(expected = classOf[TableException]) + def testJoinNonMatchingKeyTypes(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = g" + + val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h) + tEnv.registerTable("Table3", ds1) + tEnv.registerTable("Table5", ds2) + + tEnv.sql(sqlQuery).toDataSet[Row].collect() + } + + @Test(expected = classOf[ValidationException]) + def testJoinWithAmbiguousFields(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE a = d" + + val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'c) + tEnv.registerTable("Table3", ds1) + tEnv.registerTable("Table5", ds2) + + tEnv.sql(sqlQuery) + } + + @Test + def testJoinWithAlias(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT Table5.c, Table3.c FROM Table3, Table5 WHERE a = d AND a < 4" + + val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'c) + tEnv.registerTable("Table3", ds1) + tEnv.registerTable("Table5", ds2) + + val result = tEnv.sql(sqlQuery) + val expected = "1,Hi\n" + "2,Hello\n" + "1,Hello\n" + + "2,Hello world\n" + "2,Hello world\n" + "3,Hello world\n" + val results = result.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[TableException]) + def testJoinNoEqualityPredicate(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE d = f" + + val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable.as('d, 'e, 'f, 'g, 'h) + tEnv.registerTable("Table3", ds1) + tEnv.registerTable("Table5", ds2) + + tEnv.sql(sqlQuery).toDataSet[Row](getConfig).collect() + } + + @Test + def testDataSetJoinWithAggregation(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT COUNT(g), COUNT(b) FROM Table3, Table5 WHERE a = d" + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env) + val ds2 = CollectionDataSets.get5TupleDataSet(env) + tEnv.registerDataSet("Table3", ds1, 'a, 'b, 'c) + tEnv.registerDataSet("Table5", ds2, 'd, 'e, 'f, 'g, 'h) + + val result = tEnv.sql(sqlQuery) + + val expected = "6,6" + val results = result.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testTableJoinWithAggregation(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT COUNT(b), COUNT(g) FROM Table3, Table5 WHERE a = d" + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).as('a, 'b, 'c) + val ds2 = CollectionDataSets.get5TupleDataSet(env).as('d, 'e, 'f, 'g, 'h) + tEnv.registerTable("Table3", ds1) + tEnv.registerTable("Table5", ds2) + + val result = tEnv.sql(sqlQuery) + + val expected = "6,6" + val results = result.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/SelectITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/SelectITCase.scala new file mode 100644 index 0000000..f08c95c --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/SelectITCase.scala @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.sql.test + +import org.apache.calcite.tools.ValidationException +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.api.table.Row +import org.apache.flink.api.table.plan.TranslationContext +import org.apache.flink.api.table.test.utils.TableProgramsTestBase +import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class SelectITCase( + mode: TestExecutionMode, + configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { + + @Test + def testSelectStarFromTable(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT * FROM MyTable" + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c) + tEnv.registerTable("MyTable", ds) + + val result = tEnv.sql(sqlQuery) + + val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + + "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + + "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + + "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + + "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + + "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" + + val results = result.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testSelectStarFromDataSet(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT * FROM MyTable" + + val ds = CollectionDataSets.get3TupleDataSet(env) + tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c) + + val result = tEnv.sql(sqlQuery) + + val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + + "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + + "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + + "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + + "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + + "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" + + val results = result.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testSimpleSelectAll(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT a, b, c FROM MyTable" + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c) + tEnv.registerTable("MyTable", ds) + + val result = tEnv.sql(sqlQuery) + + val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + + "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + + "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + + "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + + "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + + "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n" + + val results = result.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testSelectWithNaming(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT _1 as a, _2 as b FROM MyTable" + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable + tEnv.registerTable("MyTable", ds) + + val result = tEnv.sql(sqlQuery) + + val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" + + "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" + + "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n" + + val results = result.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test(expected = classOf[ValidationException]) + def testInvalidFields(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT a, foo FROM MyTable" + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable.as('a, 'b, 'c) + tEnv.registerTable("MyTable", ds) + + tEnv.sql(sqlQuery) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/TableWithSQLITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/TableWithSQLITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/TableWithSQLITCase.scala new file mode 100644 index 0000000..153a9d0 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/TableWithSQLITCase.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.sql.test + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.api.table.Row +import org.apache.flink.api.table.plan.TranslationContext +import org.apache.flink.api.table.test.utils.TableProgramsTestBase +import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class TableWithSQLITCase( + mode: TestExecutionMode, + configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { + + @Test + def testSQLTable(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val ds = CollectionDataSets.get3TupleDataSet(env) + tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c) + + val sqlQuery = "SELECT * FROM MyTable WHERE a > 9" + + val result = tEnv.sql(sqlQuery).select('a.avg, 'b.sum, 'c.count) + + val expected = "15,65,12" + val results = result.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testTableSQLTable(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val ds = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) + val t1 = ds.filter('a > 9) + + tEnv.registerTable("MyTable", t1) + + val sqlQuery = "SELECT avg(a) as a1, sum(b) as b1, count(c) as c1 FROM MyTable" + + val result = tEnv.sql(sqlQuery).select('a1 + 1, 'b1 - 5, 'c1) + + val expected = "16,60,12" + val results = result.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testMultipleSQLQueries(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val t = CollectionDataSets.get3TupleDataSet(env).as('a, 'b, 'c) + tEnv.registerTable("MyTable", t) + + val sqlQuery = "SELECT a as aa FROM MyTable WHERE b = 6" + val result1 = tEnv.sql(sqlQuery) + tEnv.registerTable("ResTable", result1) + + val sqlQuery2 = "SELECT count(aa) FROM ResTable" + val result2 = tEnv.sql(sqlQuery2) + + val expected = "6" + val results = result2.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/UnionITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/UnionITCase.scala new file mode 100644 index 0000000..4a031a3 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/sql/test/UnionITCase.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.sql.test + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.api.table.Row +import org.apache.flink.api.table.plan.TranslationContext +import org.apache.flink.api.table.test.utils.TableProgramsTestBase +import org.apache.flink.api.table.test.utils.TableProgramsTestBase.TableConfigMode +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.TestBaseUtils +import org.junit._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class UnionITCase( + mode: TestExecutionMode, + configMode: TableConfigMode) + extends TableProgramsTestBase(mode, configMode) { + + @Test + def testUnion(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT c FROM t1 UNION ALL (SELECT c FROM t2)" + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env) + val ds2 = CollectionDataSets.getSmall3TupleDataSet(env) + tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c) + tEnv.registerDataSet("t2", ds2, 'a, 'b, 'c) + + val result = tEnv.sql(sqlQuery) + + val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n" + val results = result.toDataSet[Row](getConfig).collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + //TODO: activate for EFFICIENT mode + @Test + def testUnionWithFilter(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT c FROM (" + + "SELECT * FROM t1 UNION ALL (SELECT a, b, c FROM t2))" + + "WHERE b < 2" + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env) + val ds2 = CollectionDataSets.get5TupleDataSet(env) + tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c) + tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e) + + val result = tEnv.sql(sqlQuery) + + val expected = "Hi\n" + "Hallo\n" + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + //TODO: activate for EFFICIENT mode + @Test + def testUnionWithAggregation(): Unit = { + + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = getScalaTableEnvironment + TranslationContext.reset() + + val sqlQuery = "SELECT count(c) FROM (" + + "SELECT * FROM t1 UNION ALL (SELECT a, b, c FROM t2))" + + val ds1 = CollectionDataSets.getSmall3TupleDataSet(env) + val ds2 = CollectionDataSets.get5TupleDataSet(env) + tEnv.registerDataSet("t1", ds1, 'a, 'b, 'c) + tEnv.registerDataSet("t2", ds2, 'a, 'b, 'd, 'c, 'e) + + val result = tEnv.sql(sqlQuery) + + val expected = "18" + val results = result.toDataSet[Row].collect() + TestBaseUtils.compareResultAsText(results.asJava, expected) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ed1e52a1/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala index 3582c33..946f584 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/table/test/FilterITCase.scala @@ -22,7 +22,6 @@ import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._ import org.apache.flink.api.scala.util.CollectionDataSets import org.apache.flink.api.table.Row -import org.apache.flink.api.table.codegen.CodeGenException import org.apache.flink.api.table.expressions.Literal import org.apache.flink.api.table.test.utils.TableProgramsTestBase import TableProgramsTestBase.TableConfigMode