[FLINK-3498] Implement TRIM, SUBSTRING as reference design for Table API
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d940c366 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d940c366 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d940c366 Branch: refs/heads/tableOnCalcite Commit: d940c36639f31fa36b062a4683300af3b11a2ccb Parents: 5fe9357 Author: twalthr <twal...@apache.org> Authored: Tue Mar 1 10:24:41 2016 +0100 Committer: Fabian Hueske <fhue...@apache.org> Committed: Thu Mar 3 01:12:21 2016 +0100 ---------------------------------------------------------------------- .../flink/api/scala/table/expressionDsl.scala | 62 +++++++++++++-- .../flink/api/table/codegen/CodeGenerator.scala | 5 +- .../table/codegen/calls/ScalarFunctions.scala | 13 ++++ .../table/codegen/calls/TrimCallGenerator.scala | 78 +++++++++++++++++++ .../flink/api/table/expressions/call.scala | 57 ++++++++++++++ .../table/expressions/stringExpressions.scala | 55 -------------- .../api/table/parser/ExpressionParser.scala | 80 ++++++++++++++++---- .../api/table/plan/RexNodeTranslator.scala | 41 +++++----- .../api/table/plan/TranslationContext.scala | 18 ++++- .../flink/api/table/plan/TypeConverter.scala | 5 ++ .../api/table/test/ScalarFunctionsTest.scala | 36 ++++++++- .../table/test/utils/ExpressionEvaluator.scala | 25 +++++- 12 files changed, 368 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d940c366/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala index 68914da..a3484a1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala @@ -17,8 +17,8 @@ */ package org.apache.flink.api.scala.table -import org.apache.flink.api.table.expressions._ import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.expressions._ import scala.language.implicitConversions @@ -63,17 +63,67 @@ trait ImplicitExpressionOperations { def count = Count(expr) def avg = Avg(expr) + def cast(toType: TypeInformation[_]) = Cast(expr, toType) + + def as(name: Symbol) = Naming(expr, name.name) + + // scalar functions + + /** + * Creates a substring of the given string between the given indices. + * + * @param beginIndex first character of the substring (starting at 1, inclusive) + * @param endIndex last character of the substring (starting at 1, inclusive) + * @return substring + */ def substring(beginIndex: Expression, endIndex: Expression) = { - Substring(expr, beginIndex, Some(endIndex)) + Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex, endIndex) } + /** + * Creates a substring of the given string beginning at the given index to the end. + * + * @param beginIndex first character of the substring (starting at 1, inclusive) + * @return substring + */ def substring(beginIndex: Expression) = { - Substring(expr, beginIndex) + Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex) } - def cast(toType: TypeInformation[_]) = Cast(expr, toType) - - def as(name: Symbol) = Naming(expr, name.name) + /** + * Removes leading and/or trailing characters from the given string. + * + * @param removeLeading if true, remove leading characters (default: true) + * @param removeTrailing if true, remove trailing characters (default: true) + * @param character String containing the character (default: " ") + * @return trimmed string + */ + def trim( + removeLeading: Boolean = true, + removeTrailing: Boolean = true, + character: Expression = BuiltInFunctionConstants.TRIM_DEFAULT_CHAR) = { + if (removeLeading && removeTrailing) { + Call( + BuiltInFunctionNames.TRIM, + BuiltInFunctionConstants.TRIM_BOTH, + character, + expr) + } else if (removeLeading) { + Call( + BuiltInFunctionNames.TRIM, + BuiltInFunctionConstants.TRIM_LEADING, + character, + expr) + } else if (removeTrailing) { + Call( + BuiltInFunctionNames.TRIM, + BuiltInFunctionConstants.TRIM_TRAILING, + character, + expr) + } else { + expr + } + } } /** http://git-wip-us.apache.org/repos/asf/flink/blob/d940c366/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala index a052618..64f98e8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala @@ -19,7 +19,7 @@ package org.apache.flink.api.table.codegen import org.apache.calcite.rex._ -import org.apache.calcite.sql.SqlOperator +import org.apache.calcite.sql.{SqlLiteral, SqlOperator} import org.apache.calcite.sql.`type`.SqlTypeName._ import org.apache.calcite.sql.fun.SqlStdOperatorTable._ import org.apache.flink.api.common.functions.{FlatJoinFunction, FlatMapFunction, Function, MapFunction} @@ -576,6 +576,9 @@ class CodeGenerator( generateNonNullLiteral(resultType, "\"" + value.toString + "\"") case NULL => generateNullLiteral(resultType) + case SYMBOL => + val symbolOrdinal = value.asInstanceOf[SqlLiteral.SqlSymbol].ordinal() + generateNonNullLiteral(resultType, symbolOrdinal.toString) case _ => ??? // TODO more types } } http://git-wip-us.apache.org/repos/asf/flink/blob/d940c366/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala index 8eda314..ec0d00b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala @@ -49,6 +49,11 @@ object ScalarFunctions { STRING_TYPE_INFO, BuiltInMethod.SUBSTRING.method) + addSqlFunctionTrim( + TRIM, + Seq(INT_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO), + STRING_TYPE_INFO) + // ---------------------------------------------------------------------------------------------- def getCallGenerator( @@ -69,4 +74,12 @@ object ScalarFunctions { sqlFunctions((sqlOperator, operandTypes)) = new MethodCallGenerator(returnType, method) } + private def addSqlFunctionTrim( + sqlOperator: SqlOperator, + operandTypes: Seq[TypeInformation[_]], + returnType: TypeInformation[_]) + : Unit = { + sqlFunctions((sqlOperator, operandTypes)) = new TrimCallGenerator(returnType) + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/d940c366/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TrimCallGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TrimCallGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TrimCallGenerator.scala new file mode 100644 index 0000000..dcf6107 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TrimCallGenerator.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.codegen.calls + +import org.apache.calcite.sql.fun.SqlTrimFunction.Flag.{BOTH, LEADING, TRAILING} +import org.apache.calcite.util.BuiltInMethod +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.codegen.CodeGenUtils._ +import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression} + +/** + * Generates a TRIM function call. The first operand determines the type of trimming + * (see [[org.apache.calcite.sql.fun.SqlTrimFunction.Flag]]). Second operand determines + * the String to be removed. Third operand is the String to be trimmed. + */ +class TrimCallGenerator(returnType: TypeInformation[_]) extends CallGenerator { + + override def generate( + codeGenerator: CodeGenerator, + operands: Seq[GeneratedExpression]) + : GeneratedExpression = { + val method = BuiltInMethod.TRIM.method + val resultTerm = newName("result") + val nullTerm = newName("isNull") + val resultTypeTerm = primitiveTypeTermForTypeInfo(returnType) + val defaultValue = primitiveDefaultValue(returnType) + + val methodCall = + s""" + |${method.getDeclaringClass.getCanonicalName}.${method.getName}( + | ${operands.head.resultTerm} == ${BOTH.ordinal()} || + | ${operands.head.resultTerm} == ${LEADING.ordinal()}, + | ${operands.head.resultTerm} == ${BOTH.ordinal()} || + | ${operands.head.resultTerm} == ${TRAILING.ordinal()}, + | ${operands(1).resultTerm}, + | ${operands(2).resultTerm}) + |""".stripMargin + + val resultCode = if (codeGenerator.nullCheck) { + s""" + |${operands.map(_.code).mkString("\n")} + |boolean $nullTerm = ${operands.map(_.nullTerm).mkString(" || ")}; + |$resultTypeTerm $resultTerm; + |if ($nullTerm) { + | $resultTerm = $defaultValue; + |} + |else { + | $resultTerm = $methodCall; + |} + |""".stripMargin + } + else { + s""" + |${operands.map(_.code).mkString("\n")} + |$resultTypeTerm $resultTerm = $methodCall; + |""".stripMargin + } + + GeneratedExpression(resultTerm, nullTerm, resultCode, returnType) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/d940c366/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala new file mode 100644 index 0000000..6d26946 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.expressions + +/** + * General expression for unresolved function calls. The function can be a built-in + * scalar function or a user-defined scalar function. + */ +case class Call(functionName: String, args: Expression*) extends Expression { + + def typeInfo = ??? + + override def children: Seq[Expression] = args + + override def toString = s"\\$functionName(${args.mkString(", ")})" + + override def makeCopy(newArgs: Seq[AnyRef]): this.type = { + val copy = Call( + newArgs.head.asInstanceOf[String], + newArgs.drop(1).asInstanceOf[Seq[Expression]]: _*) + + copy.asInstanceOf[this.type] + } +} + +/** + * Enumeration of common function names. + */ +object BuiltInFunctionNames { + val SUBSTRING = "SUBSTRING" + val TRIM = "TRIM" +} + +/** + * Enumeration of common function flags. + */ +object BuiltInFunctionConstants { + val TRIM_BOTH = Literal(0) + val TRIM_LEADING = Literal(1) + val TRIM_TRAILING = Literal(2) + val TRIM_DEFAULT_CHAR = Literal(" ") +} http://git-wip-us.apache.org/repos/asf/flink/blob/d940c366/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala deleted file mode 100644 index bf551dd..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.expressions - -import org.apache.flink.api.table.ExpressionException -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo} - -case class Substring( - str: Expression, - beginIndex: Expression, - endIndex: Option[Expression] = None) extends Expression { - def typeInfo = { - if (str.typeInfo != BasicTypeInfo.STRING_TYPE_INFO) { - throw new ExpressionException( - s"""Operand must be of type String in $this, is ${str.typeInfo}.""") - } - if (!beginIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { - throw new ExpressionException( - s"""Begin index must be an integer type in $this, is ${beginIndex.typeInfo}.""") - } - endIndex match { - case Some(endIdx) if !endIdx.typeInfo.isInstanceOf[IntegerTypeInfo[_]] => - throw new ExpressionException( - s"""End index must be an integer type in $this, is ${endIdx.typeInfo}.""") - case _ => // ok - } - - BasicTypeInfo.STRING_TYPE_INFO - } - - override def children: Seq[Expression] = endIndex match { - case Some(endIdx) => Seq(str, beginIndex, endIdx) - case None => Seq(str, beginIndex) - } - - override def toString = endIndex match { - case Some(endIdx) => s"($str).substring($beginIndex, $endIndex)" - case None => s"($str).substring($beginIndex)" - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d940c366/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala index ebbbf8b..810794e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala @@ -40,7 +40,7 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { ("""(?i)\Q""" + kw.key + """\E""").r } - // KeyWord + // Keyword lazy val AS: Keyword = Keyword("as") lazy val COUNT: Keyword = Keyword("count") @@ -90,7 +90,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { lazy val atom: PackratParser[Expression] = ( "(" ~> expression <~ ")" ) | literalExpr | fieldReference - // suffix ops + // suffix operators + lazy val isNull: PackratParser[Expression] = atom <~ ".isNull" ^^ { e => IsNull(e) } lazy val isNotNull: PackratParser[Expression] = atom <~ ".isNotNull" ^^ { e => IsNotNull(e) } @@ -120,26 +121,79 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers { atom <~ ".cast(DATE)" ^^ { e => Cast(e, BasicTypeInfo.DATE_TYPE_INFO) } lazy val as: PackratParser[Expression] = atom ~ ".as(" ~ fieldReference ~ ")" ^^ { - case e ~ _ ~ as ~ _ => Naming(e, as.name) + case e ~ _ ~ target ~ _ => Naming(e, target.name) } - lazy val substring: PackratParser[Expression] = - atom ~ ".substring(" ~ expression ~ "," ~ expression ~ ")" ^^ { - case e ~ _ ~ from ~ _ ~ to ~ _ => Substring(e, from, Some(to)) + // general function calls - } + lazy val functionCall = ident ~ "(" ~ rep1sep(expression, ",") ~ ")" ^^ { + case name ~ _ ~ args ~ _ => Call(name.toUpperCase, args: _*) + } - lazy val substringWithoutEnd: PackratParser[Expression] = - atom ~ ".substring(" ~ expression ~ ")" ^^ { - case e ~ _ ~ from ~ _ => Substring(e, from) + lazy val functionCallWithoutArgs = ident ~ "()" ^^ { + case name ~ _ => Call(name.toUpperCase) + } - } + lazy val suffixFunctionCall = atom ~ "." ~ ident ~ "(" ~ rep1sep(expression, ",") ~ ")" ^^ { + case operand ~ _ ~ name ~ _ ~ args ~ _ => Call(name.toUpperCase, operand :: args : _*) + } + + lazy val suffixFunctionCallWithoutArgs = atom ~ "." ~ ident ~ "()" ^^ { + case operand ~ _ ~ name ~ _ => Call(name.toUpperCase, operand) + } + + // special calls + + lazy val specialFunctionCalls = trim | trimWithoutArgs + + lazy val specialSuffixFunctionCalls = suffixTrim | suffixTrimWithoutArgs + + lazy val trimWithoutArgs = "trim(" ~ expression ~ ")" ^^ { + case _ ~ operand ~ _ => + Call( + BuiltInFunctionNames.TRIM, + BuiltInFunctionConstants.TRIM_BOTH, + BuiltInFunctionConstants.TRIM_DEFAULT_CHAR, + operand) + } + + lazy val suffixTrimWithoutArgs = atom ~ ".trim()" ^^ { + case operand ~ _ => + Call( + BuiltInFunctionNames.TRIM, + BuiltInFunctionConstants.TRIM_BOTH, + BuiltInFunctionConstants.TRIM_DEFAULT_CHAR, + operand) + } + + lazy val trim = "trim(" ~ ("BOTH" | "LEADING" | "TRAILING") ~ "," ~ expression ~ + "," ~ expression ~ ")" ^^ { + case _ ~ trimType ~ _ ~ trimCharacter ~ _ ~ operand ~ _ => + val flag = trimType match { + case "BOTH" => BuiltInFunctionConstants.TRIM_BOTH + case "LEADING" => BuiltInFunctionConstants.TRIM_LEADING + case "TRAILING" => BuiltInFunctionConstants.TRIM_TRAILING + } + Call(BuiltInFunctionNames.TRIM, flag, trimCharacter, operand) + } + + lazy val suffixTrim = atom ~ ".trim(" ~ ("BOTH" | "LEADING" | "TRAILING") ~ "," ~ + expression ~ ")" ^^ { + case operand ~ _ ~ trimType ~ _ ~ trimCharacter ~ _ => + val flag = trimType match { + case "BOTH" => BuiltInFunctionConstants.TRIM_BOTH + case "LEADING" => BuiltInFunctionConstants.TRIM_LEADING + case "TRAILING" => BuiltInFunctionConstants.TRIM_TRAILING + } + Call(BuiltInFunctionNames.TRIM, flag, trimCharacter, operand) + } lazy val suffix = isNull | isNotNull | abs | sum | min | max | count | avg | cast | - substring | substringWithoutEnd | atom - + specialFunctionCalls |functionCall | functionCallWithoutArgs | + specialSuffixFunctionCalls | suffixFunctionCall | suffixFunctionCallWithoutArgs | + atom // unary ops http://git-wip-us.apache.org/repos/asf/flink/blob/d940c366/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala index fb9d057..ac9c85a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala @@ -19,11 +19,14 @@ package org.apache.flink.api.table.plan import org.apache.calcite.rex.RexNode +import org.apache.calcite.sql.SqlOperator import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.tools.RelBuilder import org.apache.calcite.tools.RelBuilder.AggCall import org.apache.flink.api.table.expressions._ +import scala.collection.JavaConversions._ + object RexNodeTranslator { /** @@ -54,19 +57,9 @@ object RexNodeTranslator { (b.makeCopy(List(l._1, r._1)), l._2 ::: r._2) // Scalar functions - case s@Substring(_, _, Some(endIndex)) => - val str = extractAggCalls(s.str, relBuilder) - val sta = extractAggCalls(s.beginIndex, relBuilder) - val end = extractAggCalls(endIndex, relBuilder) - (s.makeCopy( - List(str._1, sta._1, Some(end._1))), - (str._2 ::: sta._2) ::: end._2 - ) - - case s@Substring(_, _, None) => - val str = extractAggCalls(s.str, relBuilder) - val sta = extractAggCalls(s.beginIndex, relBuilder) - (s.makeCopy(List(str._1, sta._1, None)), str._2 ::: sta._2) + case c@Call(name, args@_*) => + val newArgs = args.map(extractAggCalls(_, relBuilder)).toList + (c.makeCopy(name :: newArgs.map(_._1)), newArgs.flatMap(_._2)) case e@AnyRef => throw new IllegalArgumentException( @@ -162,16 +155,10 @@ object RexNodeTranslator { relBuilder.call(SqlStdOperatorTable.UNARY_MINUS, c) // Scalar functions - case Substring(string, start, Some(end)) => - val str = toRexNode(string, relBuilder) - val sta = toRexNode(start, relBuilder) - val en = toRexNode(end, relBuilder) - relBuilder.call(SqlStdOperatorTable.SUBSTRING, str, sta, en) - - case Substring(string, start, None) => - val str = toRexNode(string, relBuilder) - val sta = toRexNode(start, relBuilder) - relBuilder.call(SqlStdOperatorTable.SUBSTRING, str, sta) + case Call(name, args@_*) => + val rexArgs = args.map(toRexNode(_, relBuilder)) + val sqlOperator = toSqlOperator(name) + relBuilder.call(sqlOperator, rexArgs) case a: Aggregation => throw new IllegalArgumentException(s"Aggregation expression $a not allowed at this place") @@ -198,4 +185,12 @@ object RexNodeTranslator { } } + private def toSqlOperator(name: String): SqlOperator = { + name match { + case BuiltInFunctionNames.SUBSTRING => SqlStdOperatorTable.SUBSTRING + case BuiltInFunctionNames.TRIM => SqlStdOperatorTable.TRIM + case _ => ??? + } + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/d940c366/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala index 51af8d6..5e8e1bc 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala @@ -20,14 +20,17 @@ package org.apache.flink.api.table.plan import java.util.concurrent.atomic.AtomicInteger +import org.apache.calcite.config.Lex import org.apache.calcite.plan.ConventionTraitDef import org.apache.calcite.schema.impl.AbstractTable import org.apache.calcite.schema.SchemaPlus -import org.apache.calcite.tools.{Frameworks, RelBuilder} +import org.apache.calcite.sql.parser.SqlParser +import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RelBuilder} import org.apache.flink.api.table.plan.schema.DataSetTable object TranslationContext { + private var frameworkConfig: FrameworkConfig = null private var relBuilder: RelBuilder = null private var tables: SchemaPlus = null private var tabNames: Map[AbstractTable, String] = null @@ -40,10 +43,17 @@ object TranslationContext { // register table in Cascading schema tables = Frameworks.createRootSchema(true) + // TODO move this to TableConfig when we implement full SQL support + // configure sql parser + // we use Java lex because back ticks are easier than double quotes in programming + // and cases are preserved + val parserConfig = SqlParser.configBuilder().setLex(Lex.JAVA).build() + // initialize RelBuilder - val frameworkConfig = Frameworks + frameworkConfig = Frameworks .newConfigBuilder .defaultSchema(tables) + .parserConfig(parserConfig) .traitDefs(ConventionTraitDef.INSTANCE) .build @@ -78,6 +88,10 @@ object TranslationContext { relBuilder } + def getFrameworkConfig: FrameworkConfig = { + frameworkConfig + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/d940c366/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala index c8e999d..62c87a1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala @@ -80,6 +80,11 @@ object TypeConverter { case DOUBLE => DOUBLE_TYPE_INFO case VARCHAR | CHAR => STRING_TYPE_INFO case DATE => DATE_TYPE_INFO + + // symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING + // are represented as integer + case SYMBOL => INT_TYPE_INFO + case _ => println(sqlType) ??? // TODO more types http://git-wip-us.apache.org/repos/asf/flink/blob/d940c366/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala index 9989c1a..eb48b70 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala @@ -52,6 +52,33 @@ class ScalarFunctionsTest { "Thi") } + @Test + def testTrim(): Unit = { + testFunction( + 'f8.trim(), + "f8.trim()", + "TRIM(f8)", + "This is a test String.") + + testFunction( + 'f8.trim(removeLeading = true, removeTrailing = true, " "), + "trim(f8)", + "TRIM(f8)", + "This is a test String.") + + testFunction( + 'f8.trim(removeLeading = false, removeTrailing = true, " "), + "f8.trim(TRAILING, ' ')", + "TRIM(TRAILING FROM f8)", + " This is a test String.") + + testFunction( + 'f0.trim(removeLeading = true, removeTrailing = true, "."), + "trim(BOTH, '.', f0)", + "TRIM(BOTH '.' FROM f0)", + "This is a test String") + } + // ---------------------------------------------------------------------------------------------- def testFunction( @@ -59,7 +86,7 @@ class ScalarFunctionsTest { exprString: String, sqlExpr: String, expected: String): Unit = { - val testData = new Row(8) + val testData = new Row(9) testData.setField(0, "This is a test String.") testData.setField(1, true) testData.setField(2, 42.toByte) @@ -68,6 +95,7 @@ class ScalarFunctionsTest { testData.setField(5, 4.5.toFloat) testData.setField(6, 4.6) testData.setField(7, 3) + testData.setField(8, " This is a test String. ") val typeInfo = new RowTypeInfo(Seq( STRING_TYPE_INFO, @@ -77,7 +105,8 @@ class ScalarFunctionsTest { LONG_TYPE_INFO, FLOAT_TYPE_INFO, DOUBLE_TYPE_INFO, - INT_TYPE_INFO)).asInstanceOf[TypeInformation[Any]] + INT_TYPE_INFO, + STRING_TYPE_INFO)).asInstanceOf[TypeInformation[Any]] val exprResult = ExpressionEvaluator.evaluate(testData, typeInfo, expr) assertEquals(expected, exprResult) @@ -88,7 +117,8 @@ class ScalarFunctionsTest { ExpressionParser.parseExpression(exprString)) assertEquals(expected, exprStringResult) - // TODO test SQL expression + val exprSqlResult = ExpressionEvaluator.evaluate(testData, typeInfo, sqlExpr) + assertEquals(expected, exprSqlResult) } http://git-wip-us.apache.org/repos/asf/flink/blob/d940c366/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala index b06ac26..d05ac0d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala @@ -18,9 +18,10 @@ package org.apache.flink.api.table.test.utils +import org.apache.calcite.rel.logical.LogicalProject import org.apache.calcite.rex.RexNode import org.apache.calcite.sql.`type`.SqlTypeName.VARCHAR -import org.apache.calcite.tools.RelBuilder +import org.apache.calcite.tools.{Frameworks, RelBuilder} import org.apache.flink.api.common.functions.{Function, MapFunction} import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.TypeInformation @@ -44,7 +45,7 @@ object ExpressionEvaluator { compile(getClass.getClassLoader, genFunc.name, genFunc.code) } - private def prepareRelBuilder(typeInfo: TypeInformation[Any]): RelBuilder = { + private def prepareTable(typeInfo: TypeInformation[Any]): (String, RelBuilder) = { // create DataSetTable val dataSetMock = mock(classOf[DataSet[Any]]) when(dataSetMock.getType).thenReturn(typeInfo) @@ -56,11 +57,27 @@ object ExpressionEvaluator { // prepare RelBuilder val relBuilder = TranslationContext.getRelBuilder relBuilder.scan(tableName) - relBuilder + + (tableName, relBuilder) + } + + def evaluate(data: Any, typeInfo: TypeInformation[Any], sqlExpr: String): String = { + // create DataSetTable + val table = prepareTable(typeInfo) + + // create RelNode from SQL expression + val planner = Frameworks.getPlanner(TranslationContext.getFrameworkConfig) + val parsed = planner.parse("SELECT " + sqlExpr + " FROM " + table._1) + val validated = planner.validate(parsed) + val converted = planner.rel(validated) + + val expr: RexNode = converted.rel.asInstanceOf[LogicalProject].getChildExps.get(0) + + evaluate(data, typeInfo, table._2, expr) } def evaluate(data: Any, typeInfo: TypeInformation[Any], expr: Expression): String = { - val relBuilder = prepareRelBuilder(typeInfo) + val relBuilder = prepareTable(typeInfo)._2 evaluate(data, typeInfo, relBuilder, RexNodeTranslator.toRexNode(expr, relBuilder)) }