[FLINK-3498] Implement TRIM, SUBSTRING as reference design for Table API

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d940c366
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d940c366
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d940c366

Branch: refs/heads/tableOnCalcite
Commit: d940c36639f31fa36b062a4683300af3b11a2ccb
Parents: 5fe9357
Author: twalthr <twal...@apache.org>
Authored: Tue Mar 1 10:24:41 2016 +0100
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Thu Mar 3 01:12:21 2016 +0100

----------------------------------------------------------------------
 .../flink/api/scala/table/expressionDsl.scala   | 62 +++++++++++++--
 .../flink/api/table/codegen/CodeGenerator.scala |  5 +-
 .../table/codegen/calls/ScalarFunctions.scala   | 13 ++++
 .../table/codegen/calls/TrimCallGenerator.scala | 78 +++++++++++++++++++
 .../flink/api/table/expressions/call.scala      | 57 ++++++++++++++
 .../table/expressions/stringExpressions.scala   | 55 --------------
 .../api/table/parser/ExpressionParser.scala     | 80 ++++++++++++++++----
 .../api/table/plan/RexNodeTranslator.scala      | 41 +++++-----
 .../api/table/plan/TranslationContext.scala     | 18 ++++-
 .../flink/api/table/plan/TypeConverter.scala    |  5 ++
 .../api/table/test/ScalarFunctionsTest.scala    | 36 ++++++++-
 .../table/test/utils/ExpressionEvaluator.scala  | 25 +++++-
 12 files changed, 368 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d940c366/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
index 68914da..a3484a1 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
@@ -17,8 +17,8 @@
  */
 package org.apache.flink.api.scala.table
 
-import org.apache.flink.api.table.expressions._
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.expressions._
 
 import scala.language.implicitConversions
 
@@ -63,17 +63,67 @@ trait ImplicitExpressionOperations {
   def count = Count(expr)
   def avg = Avg(expr)
 
+  def cast(toType: TypeInformation[_]) = Cast(expr, toType)
+
+  def as(name: Symbol) = Naming(expr, name.name)
+
+  // scalar functions
+
+  /**
+    * Creates a substring of the given string between the given indices.
+    *
+    * @param beginIndex first character of the substring (starting at 1, 
inclusive)
+    * @param endIndex last character of the substring (starting at 1, 
inclusive)
+    * @return substring
+    */
   def substring(beginIndex: Expression, endIndex: Expression) = {
-    Substring(expr, beginIndex, Some(endIndex))
+    Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex, endIndex)
   }
 
+  /**
+    * Creates a substring of the given string beginning at the given index to 
the end.
+    *
+    * @param beginIndex first character of the substring (starting at 1, 
inclusive)
+    * @return substring
+    */
   def substring(beginIndex: Expression) = {
-    Substring(expr, beginIndex)
+    Call(BuiltInFunctionNames.SUBSTRING, expr, beginIndex)
   }
 
-  def cast(toType: TypeInformation[_]) = Cast(expr, toType)
-
-  def as(name: Symbol) = Naming(expr, name.name)
+  /**
+    * Removes leading and/or trailing characters from the given string.
+    *
+    * @param removeLeading if true, remove leading characters (default: true)
+    * @param removeTrailing if true, remove trailing characters (default: true)
+    * @param character String containing the character (default: " ")
+    * @return trimmed string
+    */
+  def trim(
+      removeLeading: Boolean = true,
+      removeTrailing: Boolean = true,
+      character: Expression = BuiltInFunctionConstants.TRIM_DEFAULT_CHAR) = {
+    if (removeLeading && removeTrailing) {
+      Call(
+        BuiltInFunctionNames.TRIM,
+        BuiltInFunctionConstants.TRIM_BOTH,
+        character,
+        expr)
+    } else if (removeLeading) {
+      Call(
+        BuiltInFunctionNames.TRIM,
+        BuiltInFunctionConstants.TRIM_LEADING,
+        character,
+        expr)
+    } else if (removeTrailing) {
+      Call(
+        BuiltInFunctionNames.TRIM,
+        BuiltInFunctionConstants.TRIM_TRAILING,
+        character,
+        expr)
+    } else {
+      expr
+    }
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/d940c366/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
index a052618..64f98e8 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
@@ -19,7 +19,7 @@
 package org.apache.flink.api.table.codegen
 
 import org.apache.calcite.rex._
-import org.apache.calcite.sql.SqlOperator
+import org.apache.calcite.sql.{SqlLiteral, SqlOperator}
 import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.sql.fun.SqlStdOperatorTable._
 import org.apache.flink.api.common.functions.{FlatJoinFunction, 
FlatMapFunction, Function, MapFunction}
@@ -576,6 +576,9 @@ class CodeGenerator(
         generateNonNullLiteral(resultType, "\"" + value.toString + "\"")
       case NULL =>
         generateNullLiteral(resultType)
+      case SYMBOL =>
+        val symbolOrdinal = value.asInstanceOf[SqlLiteral.SqlSymbol].ordinal()
+        generateNonNullLiteral(resultType, symbolOrdinal.toString)
       case _ => ??? // TODO more types
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/d940c366/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
index 8eda314..ec0d00b 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/ScalarFunctions.scala
@@ -49,6 +49,11 @@ object ScalarFunctions {
     STRING_TYPE_INFO,
     BuiltInMethod.SUBSTRING.method)
 
+  addSqlFunctionTrim(
+    TRIM,
+    Seq(INT_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO),
+    STRING_TYPE_INFO)
+
   // 
----------------------------------------------------------------------------------------------
 
   def getCallGenerator(
@@ -69,4 +74,12 @@ object ScalarFunctions {
     sqlFunctions((sqlOperator, operandTypes)) = new 
MethodCallGenerator(returnType, method)
   }
 
+  private def addSqlFunctionTrim(
+      sqlOperator: SqlOperator,
+      operandTypes: Seq[TypeInformation[_]],
+      returnType: TypeInformation[_])
+    : Unit = {
+    sqlFunctions((sqlOperator, operandTypes)) = new 
TrimCallGenerator(returnType)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d940c366/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TrimCallGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TrimCallGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TrimCallGenerator.scala
new file mode 100644
index 0000000..dcf6107
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TrimCallGenerator.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.codegen.calls
+
+import org.apache.calcite.sql.fun.SqlTrimFunction.Flag.{BOTH, LEADING, 
TRAILING}
+import org.apache.calcite.util.BuiltInMethod
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.codegen.CodeGenUtils._
+import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression}
+
+/**
+  * Generates a TRIM function call. The first operand determines the type of 
trimming
+  * (see [[org.apache.calcite.sql.fun.SqlTrimFunction.Flag]]). Second operand 
determines
+  * the String to be removed. Third operand is the String to be trimmed.
+  */
+class TrimCallGenerator(returnType: TypeInformation[_]) extends CallGenerator {
+
+  override def generate(
+      codeGenerator: CodeGenerator,
+      operands: Seq[GeneratedExpression])
+    : GeneratedExpression = {
+    val method = BuiltInMethod.TRIM.method
+    val resultTerm = newName("result")
+    val nullTerm = newName("isNull")
+    val resultTypeTerm = primitiveTypeTermForTypeInfo(returnType)
+    val defaultValue = primitiveDefaultValue(returnType)
+
+    val methodCall =
+      s"""
+        |${method.getDeclaringClass.getCanonicalName}.${method.getName}(
+        |  ${operands.head.resultTerm} == ${BOTH.ordinal()} ||
+        |    ${operands.head.resultTerm} == ${LEADING.ordinal()},
+        |  ${operands.head.resultTerm} == ${BOTH.ordinal()} ||
+        |    ${operands.head.resultTerm} == ${TRAILING.ordinal()},
+        |  ${operands(1).resultTerm},
+        |  ${operands(2).resultTerm})
+        |""".stripMargin
+
+    val resultCode = if (codeGenerator.nullCheck) {
+      s"""
+        |${operands.map(_.code).mkString("\n")}
+        |boolean $nullTerm = ${operands.map(_.nullTerm).mkString(" || ")};
+        |$resultTypeTerm $resultTerm;
+        |if ($nullTerm) {
+        |  $resultTerm = $defaultValue;
+        |}
+        |else {
+        |  $resultTerm = $methodCall;
+        |}
+        |""".stripMargin
+    }
+    else {
+      s"""
+        |${operands.map(_.code).mkString("\n")}
+        |$resultTypeTerm $resultTerm = $methodCall;
+        |""".stripMargin
+    }
+
+    GeneratedExpression(resultTerm, nullTerm, resultCode, returnType)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d940c366/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
new file mode 100644
index 0000000..6d26946
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.table.expressions
+
+/**
+  * General expression for unresolved function calls. The function can be a 
built-in
+  * scalar function or a user-defined scalar function.
+  */
+case class Call(functionName: String, args: Expression*) extends Expression {
+
+  def typeInfo = ???
+
+  override def children: Seq[Expression] = args
+
+  override def toString = s"\\$functionName(${args.mkString(", ")})"
+
+  override def makeCopy(newArgs: Seq[AnyRef]): this.type = {
+    val copy = Call(
+      newArgs.head.asInstanceOf[String],
+      newArgs.drop(1).asInstanceOf[Seq[Expression]]: _*)
+
+    copy.asInstanceOf[this.type]
+  }
+}
+
+/**
+  * Enumeration of common function names.
+  */
+object BuiltInFunctionNames {
+  val SUBSTRING = "SUBSTRING"
+  val TRIM = "TRIM"
+}
+
+/**
+  * Enumeration of common function flags.
+  */
+object BuiltInFunctionConstants {
+  val TRIM_BOTH = Literal(0)
+  val TRIM_LEADING = Literal(1)
+  val TRIM_TRAILING = Literal(2)
+  val TRIM_DEFAULT_CHAR = Literal(" ")
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d940c366/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
deleted file mode 100644
index bf551dd..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/stringExpressions.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.expressions
-
-import org.apache.flink.api.table.ExpressionException
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo}
-
-case class Substring(
-    str: Expression,
-    beginIndex: Expression,
-    endIndex: Option[Expression] = None) extends Expression {
-  def typeInfo = {
-    if (str.typeInfo != BasicTypeInfo.STRING_TYPE_INFO) {
-      throw new ExpressionException(
-        s"""Operand must be of type String in $this, is ${str.typeInfo}.""")
-    }
-    if (!beginIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) {
-      throw new ExpressionException(
-        s"""Begin index must be an integer type in $this, is 
${beginIndex.typeInfo}.""")
-    }
-    endIndex match {
-      case Some(endIdx) if !endIdx.typeInfo.isInstanceOf[IntegerTypeInfo[_]] =>
-        throw new ExpressionException(
-            s"""End index must be an integer type in $this, is 
${endIdx.typeInfo}.""")
-        case _ => // ok
-    }
-
-    BasicTypeInfo.STRING_TYPE_INFO
-  }
-
-  override def children: Seq[Expression] = endIndex match {
-    case Some(endIdx) => Seq(str, beginIndex, endIdx)
-    case None => Seq(str, beginIndex)
-  }
-
-  override def toString = endIndex match {
-    case Some(endIdx) => s"($str).substring($beginIndex, $endIndex)"
-    case None => s"($str).substring($beginIndex)"
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d940c366/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
index ebbbf8b..810794e 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
@@ -40,7 +40,7 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
     ("""(?i)\Q""" + kw.key + """\E""").r
   }
 
-  // KeyWord
+  // Keyword
 
   lazy val AS: Keyword = Keyword("as")
   lazy val COUNT: Keyword = Keyword("count")
@@ -90,7 +90,8 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
   lazy val atom: PackratParser[Expression] =
     ( "(" ~> expression <~ ")" ) | literalExpr | fieldReference
 
-  // suffix ops
+  // suffix operators
+
   lazy val isNull: PackratParser[Expression] = atom <~ ".isNull" ^^ { e => 
IsNull(e) }
   lazy val isNotNull: PackratParser[Expression] = atom <~ ".isNotNull" ^^ { e 
=> IsNotNull(e) }
 
@@ -120,26 +121,79 @@ object ExpressionParser extends JavaTokenParsers with 
PackratParsers {
     atom <~ ".cast(DATE)" ^^ { e => Cast(e, BasicTypeInfo.DATE_TYPE_INFO) }
 
   lazy val as: PackratParser[Expression] = atom ~ ".as(" ~ fieldReference ~ 
")" ^^ {
-    case e ~ _ ~ as ~ _ => Naming(e, as.name)
+    case e ~ _ ~ target ~ _ => Naming(e, target.name)
   }
 
-  lazy val substring: PackratParser[Expression] =
-    atom ~ ".substring(" ~ expression ~ "," ~ expression ~ ")" ^^ {
-      case e ~ _ ~ from ~ _ ~ to ~ _ => Substring(e, from, Some(to))
+  // general function calls
 
-    }
+  lazy val functionCall = ident ~ "(" ~ rep1sep(expression, ",") ~ ")" ^^ {
+    case name ~ _ ~ args ~ _ => Call(name.toUpperCase, args: _*)
+  }
 
-  lazy val substringWithoutEnd: PackratParser[Expression] =
-    atom ~ ".substring(" ~ expression ~ ")" ^^ {
-      case e ~ _ ~ from ~ _ => Substring(e, from)
+  lazy val functionCallWithoutArgs = ident ~ "()" ^^ {
+    case name ~ _ => Call(name.toUpperCase)
+  }
 
-    }
+  lazy val suffixFunctionCall = atom ~ "." ~ ident ~ "(" ~ rep1sep(expression, 
",") ~ ")" ^^ {
+    case operand ~ _ ~ name ~ _ ~ args ~ _ => Call(name.toUpperCase, operand 
:: args : _*)
+  }
+
+  lazy val suffixFunctionCallWithoutArgs = atom ~ "." ~ ident ~ "()" ^^ {
+    case operand ~ _ ~ name ~ _ => Call(name.toUpperCase, operand)
+  }
+
+  // special calls
+
+  lazy val specialFunctionCalls = trim | trimWithoutArgs
+
+  lazy val specialSuffixFunctionCalls = suffixTrim | suffixTrimWithoutArgs
+
+  lazy val trimWithoutArgs = "trim(" ~ expression ~ ")" ^^ {
+    case _ ~ operand ~ _ =>
+      Call(
+        BuiltInFunctionNames.TRIM,
+        BuiltInFunctionConstants.TRIM_BOTH,
+        BuiltInFunctionConstants.TRIM_DEFAULT_CHAR,
+        operand)
+  }
+
+  lazy val suffixTrimWithoutArgs = atom ~ ".trim()" ^^ {
+    case operand ~ _ =>
+      Call(
+        BuiltInFunctionNames.TRIM,
+        BuiltInFunctionConstants.TRIM_BOTH,
+        BuiltInFunctionConstants.TRIM_DEFAULT_CHAR,
+        operand)
+  }
+
+  lazy val trim = "trim(" ~ ("BOTH" | "LEADING" | "TRAILING") ~ "," ~ 
expression ~
+      "," ~ expression ~ ")" ^^ {
+    case _ ~ trimType ~ _ ~ trimCharacter ~ _ ~ operand ~ _ =>
+      val flag = trimType match {
+        case "BOTH" => BuiltInFunctionConstants.TRIM_BOTH
+        case "LEADING" => BuiltInFunctionConstants.TRIM_LEADING
+        case "TRAILING" => BuiltInFunctionConstants.TRIM_TRAILING
+      }
+      Call(BuiltInFunctionNames.TRIM, flag, trimCharacter, operand)
+  }
+
+  lazy val suffixTrim = atom ~ ".trim(" ~ ("BOTH" | "LEADING" | "TRAILING") ~ 
"," ~
+      expression ~ ")" ^^ {
+    case operand ~ _ ~ trimType ~ _ ~ trimCharacter ~ _ =>
+      val flag = trimType match {
+        case "BOTH" => BuiltInFunctionConstants.TRIM_BOTH
+        case "LEADING" => BuiltInFunctionConstants.TRIM_LEADING
+        case "TRAILING" => BuiltInFunctionConstants.TRIM_TRAILING
+      }
+      Call(BuiltInFunctionNames.TRIM, flag, trimCharacter, operand)
+  }
 
   lazy val suffix =
     isNull | isNotNull |
       abs | sum | min | max | count | avg | cast |
-      substring | substringWithoutEnd | atom
-
+      specialFunctionCalls |functionCall | functionCallWithoutArgs |
+      specialSuffixFunctionCalls | suffixFunctionCall | 
suffixFunctionCallWithoutArgs |
+      atom
 
   // unary ops
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d940c366/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
index fb9d057..ac9c85a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/RexNodeTranslator.scala
@@ -19,11 +19,14 @@
 package org.apache.flink.api.table.plan
 
 import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.SqlOperator
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.tools.RelBuilder
 import org.apache.calcite.tools.RelBuilder.AggCall
 import org.apache.flink.api.table.expressions._
 
+import scala.collection.JavaConversions._
+
 object RexNodeTranslator {
 
   /**
@@ -54,19 +57,9 @@ object RexNodeTranslator {
         (b.makeCopy(List(l._1, r._1)), l._2 ::: r._2)
 
       // Scalar functions
-      case s@Substring(_, _, Some(endIndex)) =>
-        val str = extractAggCalls(s.str, relBuilder)
-        val sta = extractAggCalls(s.beginIndex, relBuilder)
-        val end = extractAggCalls(endIndex, relBuilder)
-        (s.makeCopy(
-          List(str._1, sta._1, Some(end._1))),
-          (str._2 ::: sta._2) ::: end._2
-        )
-
-      case s@Substring(_, _, None) =>
-        val str = extractAggCalls(s.str, relBuilder)
-        val sta = extractAggCalls(s.beginIndex, relBuilder)
-        (s.makeCopy(List(str._1, sta._1, None)), str._2 ::: sta._2)
+      case c@Call(name, args@_*) =>
+        val newArgs = args.map(extractAggCalls(_, relBuilder)).toList
+        (c.makeCopy(name :: newArgs.map(_._1)), newArgs.flatMap(_._2))
 
       case e@AnyRef =>
         throw new IllegalArgumentException(
@@ -162,16 +155,10 @@ object RexNodeTranslator {
         relBuilder.call(SqlStdOperatorTable.UNARY_MINUS, c)
 
       // Scalar functions
-      case Substring(string, start, Some(end)) =>
-        val str = toRexNode(string, relBuilder)
-        val sta = toRexNode(start, relBuilder)
-        val en = toRexNode(end, relBuilder)
-        relBuilder.call(SqlStdOperatorTable.SUBSTRING, str, sta, en)
-
-      case Substring(string, start, None) =>
-        val str = toRexNode(string, relBuilder)
-        val sta = toRexNode(start, relBuilder)
-        relBuilder.call(SqlStdOperatorTable.SUBSTRING, str, sta)
+      case Call(name, args@_*) =>
+        val rexArgs = args.map(toRexNode(_, relBuilder))
+        val sqlOperator = toSqlOperator(name)
+        relBuilder.call(sqlOperator, rexArgs)
 
       case a: Aggregation =>
         throw new IllegalArgumentException(s"Aggregation expression $a not 
allowed at this place")
@@ -198,4 +185,12 @@ object RexNodeTranslator {
     }
   }
 
+  private def toSqlOperator(name: String): SqlOperator = {
+    name match {
+      case BuiltInFunctionNames.SUBSTRING => SqlStdOperatorTable.SUBSTRING
+      case BuiltInFunctionNames.TRIM => SqlStdOperatorTable.TRIM
+      case _ => ???
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d940c366/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
index 51af8d6..5e8e1bc 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TranslationContext.scala
@@ -20,14 +20,17 @@ package org.apache.flink.api.table.plan
 
 import java.util.concurrent.atomic.AtomicInteger
 
+import org.apache.calcite.config.Lex
 import org.apache.calcite.plan.ConventionTraitDef
 import org.apache.calcite.schema.impl.AbstractTable
 import org.apache.calcite.schema.SchemaPlus
-import org.apache.calcite.tools.{Frameworks, RelBuilder}
+import org.apache.calcite.sql.parser.SqlParser
+import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RelBuilder}
 import org.apache.flink.api.table.plan.schema.DataSetTable
 
 object TranslationContext {
 
+  private var frameworkConfig: FrameworkConfig = null
   private var relBuilder: RelBuilder = null
   private var tables: SchemaPlus = null
   private var tabNames: Map[AbstractTable, String] = null
@@ -40,10 +43,17 @@ object TranslationContext {
     // register table in Cascading schema
     tables = Frameworks.createRootSchema(true)
 
+    // TODO move this to TableConfig when we implement full SQL support
+    // configure sql parser
+    // we use Java lex because back ticks are easier than double quotes in 
programming
+    // and cases are preserved
+    val parserConfig = SqlParser.configBuilder().setLex(Lex.JAVA).build()
+
     // initialize RelBuilder
-    val frameworkConfig = Frameworks
+    frameworkConfig = Frameworks
       .newConfigBuilder
       .defaultSchema(tables)
+      .parserConfig(parserConfig)
       .traitDefs(ConventionTraitDef.INSTANCE)
       .build
 
@@ -78,6 +88,10 @@ object TranslationContext {
     relBuilder
   }
 
+  def getFrameworkConfig: FrameworkConfig = {
+    frameworkConfig
+  }
+
 }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d940c366/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
index c8e999d..62c87a1 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala
@@ -80,6 +80,11 @@ object TypeConverter {
     case DOUBLE => DOUBLE_TYPE_INFO
     case VARCHAR | CHAR => STRING_TYPE_INFO
     case DATE => DATE_TYPE_INFO
+
+    // symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING
+    // are represented as integer
+    case SYMBOL => INT_TYPE_INFO
+
     case _ =>
       println(sqlType)
       ??? // TODO more types

http://git-wip-us.apache.org/repos/asf/flink/blob/d940c366/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala
index 9989c1a..eb48b70 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/ScalarFunctionsTest.scala
@@ -52,6 +52,33 @@ class ScalarFunctionsTest {
       "Thi")
   }
 
+  @Test
+  def testTrim(): Unit = {
+    testFunction(
+      'f8.trim(),
+      "f8.trim()",
+      "TRIM(f8)",
+      "This is a test String.")
+
+    testFunction(
+      'f8.trim(removeLeading = true, removeTrailing = true, " "),
+      "trim(f8)",
+      "TRIM(f8)",
+      "This is a test String.")
+
+    testFunction(
+      'f8.trim(removeLeading = false, removeTrailing = true, " "),
+      "f8.trim(TRAILING, ' ')",
+      "TRIM(TRAILING FROM f8)",
+      " This is a test String.")
+
+    testFunction(
+      'f0.trim(removeLeading = true, removeTrailing = true, "."),
+      "trim(BOTH, '.', f0)",
+      "TRIM(BOTH '.' FROM f0)",
+      "This is a test String")
+  }
+
   // 
----------------------------------------------------------------------------------------------
 
   def testFunction(
@@ -59,7 +86,7 @@ class ScalarFunctionsTest {
       exprString: String,
       sqlExpr: String,
       expected: String): Unit = {
-    val testData = new Row(8)
+    val testData = new Row(9)
     testData.setField(0, "This is a test String.")
     testData.setField(1, true)
     testData.setField(2, 42.toByte)
@@ -68,6 +95,7 @@ class ScalarFunctionsTest {
     testData.setField(5, 4.5.toFloat)
     testData.setField(6, 4.6)
     testData.setField(7, 3)
+    testData.setField(8, " This is a test String. ")
 
     val typeInfo = new RowTypeInfo(Seq(
       STRING_TYPE_INFO,
@@ -77,7 +105,8 @@ class ScalarFunctionsTest {
       LONG_TYPE_INFO,
       FLOAT_TYPE_INFO,
       DOUBLE_TYPE_INFO,
-      INT_TYPE_INFO)).asInstanceOf[TypeInformation[Any]]
+      INT_TYPE_INFO,
+      STRING_TYPE_INFO)).asInstanceOf[TypeInformation[Any]]
 
     val exprResult = ExpressionEvaluator.evaluate(testData, typeInfo, expr)
     assertEquals(expected, exprResult)
@@ -88,7 +117,8 @@ class ScalarFunctionsTest {
       ExpressionParser.parseExpression(exprString))
     assertEquals(expected, exprStringResult)
 
-    // TODO test SQL expression
+    val exprSqlResult = ExpressionEvaluator.evaluate(testData, typeInfo, 
sqlExpr)
+    assertEquals(expected, exprSqlResult)
   }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d940c366/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
index b06ac26..d05ac0d 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/test/utils/ExpressionEvaluator.scala
@@ -18,9 +18,10 @@
 
 package org.apache.flink.api.table.test.utils
 
+import org.apache.calcite.rel.logical.LogicalProject
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.sql.`type`.SqlTypeName.VARCHAR
-import org.apache.calcite.tools.RelBuilder
+import org.apache.calcite.tools.{Frameworks, RelBuilder}
 import org.apache.flink.api.common.functions.{Function, MapFunction}
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -44,7 +45,7 @@ object ExpressionEvaluator {
       compile(getClass.getClassLoader, genFunc.name, genFunc.code)
   }
 
-  private def prepareRelBuilder(typeInfo: TypeInformation[Any]): RelBuilder = {
+  private def prepareTable(typeInfo: TypeInformation[Any]): (String, 
RelBuilder) = {
     // create DataSetTable
     val dataSetMock = mock(classOf[DataSet[Any]])
     when(dataSetMock.getType).thenReturn(typeInfo)
@@ -56,11 +57,27 @@ object ExpressionEvaluator {
     // prepare RelBuilder
     val relBuilder = TranslationContext.getRelBuilder
     relBuilder.scan(tableName)
-    relBuilder
+
+    (tableName, relBuilder)
+  }
+
+  def evaluate(data: Any, typeInfo: TypeInformation[Any], sqlExpr: String): 
String = {
+    // create DataSetTable
+    val table = prepareTable(typeInfo)
+
+    // create RelNode from SQL expression
+    val planner = Frameworks.getPlanner(TranslationContext.getFrameworkConfig)
+    val parsed = planner.parse("SELECT " + sqlExpr + " FROM " + table._1)
+    val validated = planner.validate(parsed)
+    val converted = planner.rel(validated)
+
+    val expr: RexNode = 
converted.rel.asInstanceOf[LogicalProject].getChildExps.get(0)
+
+    evaluate(data, typeInfo, table._2, expr)
   }
 
   def evaluate(data: Any, typeInfo: TypeInformation[Any], expr: Expression): 
String = {
-    val relBuilder = prepareRelBuilder(typeInfo)
+    val relBuilder = prepareTable(typeInfo)._2
     evaluate(data, typeInfo, relBuilder, RexNodeTranslator.toRexNode(expr, 
relBuilder))
   }
 

Reply via email to