Repository: flink
Updated Branches:
  refs/heads/master 885fbaf66 -> 343d05a40


[hotfix] [tableAPI] Moved tests to correct package.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/343d05a4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/343d05a4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/343d05a4

Branch: refs/heads/master
Commit: 343d05a4067dd154b45e58b6bfce0ae6a4ebd5f4
Parents: 885fbaf
Author: Fabian Hueske <fhue...@apache.org>
Authored: Sun May 22 19:32:01 2016 +0200
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Mon May 23 18:44:02 2016 +0200

----------------------------------------------------------------------
 .../scala/expression/ScalarFunctionsTest.scala  | 490 -------------------
 .../expression/utils/ExpressionEvaluator.scala  | 119 -----
 .../api/scala/typeutils/RowComparatorTest.scala | 136 -----
 .../api/scala/typeutils/RowSerializerTest.scala | 194 --------
 .../table/expressions/ScalarFunctionsTest.scala | 490 +++++++++++++++++++
 .../expressions/utils/ExpressionEvaluator.scala | 119 +++++
 .../api/table/typeutils/RowComparatorTest.scala | 136 +++++
 .../api/table/typeutils/RowSerializerTest.scala | 194 ++++++++
 8 files changed, 939 insertions(+), 939 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/343d05a4/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/ScalarFunctionsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/ScalarFunctionsTest.scala
deleted file mode 100644
index 8d1cfa2..0000000
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/ScalarFunctionsTest.scala
+++ /dev/null
@@ -1,490 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expression
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.expression.utils.ExpressionEvaluator
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.expressions.{Expression, ExpressionParser}
-import org.apache.flink.api.table.typeutils.RowTypeInfo
-import org.junit.Assert.assertEquals
-import org.junit.Test
-
-class ScalarFunctionsTest {
-
-  // 
----------------------------------------------------------------------------------------------
-  // String functions
-  // 
----------------------------------------------------------------------------------------------
-
-  @Test
-  def testSubstring(): Unit = {
-    testFunction(
-      'f0.substring(2),
-      "f0.substring(2)",
-      "SUBSTRING(f0, 2)",
-      "his is a test String.")
-
-    testFunction(
-      'f0.substring(2, 5),
-      "f0.substring(2, 5)",
-      "SUBSTRING(f0, 2, 5)",
-      "his i")
-
-    testFunction(
-      'f0.substring(1, 'f7),
-      "f0.substring(1, f7)",
-      "SUBSTRING(f0, 1, f7)",
-      "Thi")
-  }
-
-  @Test
-  def testTrim(): Unit = {
-    testFunction(
-      'f8.trim(),
-      "f8.trim()",
-      "TRIM(f8)",
-      "This is a test String.")
-
-    testFunction(
-      'f8.trim(removeLeading = true, removeTrailing = true, " "),
-      "trim(f8)",
-      "TRIM(f8)",
-      "This is a test String.")
-
-    testFunction(
-      'f8.trim(removeLeading = false, removeTrailing = true, " "),
-      "f8.trim(TRAILING, ' ')",
-      "TRIM(TRAILING FROM f8)",
-      " This is a test String.")
-
-    testFunction(
-      'f0.trim(removeLeading = true, removeTrailing = true, "."),
-      "trim(BOTH, '.', f0)",
-      "TRIM(BOTH '.' FROM f0)",
-      "This is a test String")
-  }
-
-  @Test
-  def testCharLength(): Unit = {
-    testFunction(
-      'f0.charLength(),
-      "f0.charLength()",
-      "CHAR_LENGTH(f0)",
-      "22")
-
-    testFunction(
-      'f0.charLength(),
-      "charLength(f0)",
-      "CHARACTER_LENGTH(f0)",
-      "22")
-  }
-
-  @Test
-  def testUpperCase(): Unit = {
-    testFunction(
-      'f0.upperCase(),
-      "f0.upperCase()",
-      "UPPER(f0)",
-      "THIS IS A TEST STRING.")
-  }
-
-  @Test
-  def testLowerCase(): Unit = {
-    testFunction(
-      'f0.lowerCase(),
-      "f0.lowerCase()",
-      "LOWER(f0)",
-      "this is a test string.")
-  }
-
-  @Test
-  def testInitCap(): Unit = {
-    testFunction(
-      'f0.initCap(),
-      "f0.initCap()",
-      "INITCAP(f0)",
-      "This Is A Test String.")
-  }
-
-  @Test
-  def testConcat(): Unit = {
-    testFunction(
-      'f0 + 'f0,
-      "f0 + f0",
-      "f0||f0",
-      "This is a test String.This is a test String.")
-  }
-
-  @Test
-  def testLike(): Unit = {
-    testFunction(
-      'f0.like("Th_s%"),
-      "f0.like('Th_s%')",
-      "f0 LIKE 'Th_s%'",
-      "true")
-
-    testFunction(
-      'f0.like("%is a%"),
-      "f0.like('%is a%')",
-      "f0 LIKE '%is a%'",
-      "true")
-  }
-
-  @Test
-  def testNotLike(): Unit = {
-    testFunction(
-      !'f0.like("Th_s%"),
-      "!f0.like('Th_s%')",
-      "f0 NOT LIKE 'Th_s%'",
-      "false")
-
-    testFunction(
-      !'f0.like("%is a%"),
-      "!f0.like('%is a%')",
-      "f0 NOT LIKE '%is a%'",
-      "false")
-  }
-
-  @Test
-  def testSimilar(): Unit = {
-    testFunction(
-      'f0.similar("_*"),
-      "f0.similar('_*')",
-      "f0 SIMILAR TO '_*'",
-      "true")
-
-    testFunction(
-      'f0.similar("This (is)? a (test)+ Strin_*"),
-      "f0.similar('This (is)? a (test)+ Strin_*')",
-      "f0 SIMILAR TO 'This (is)? a (test)+ Strin_*'",
-      "true")
-  }
-
-  @Test
-  def testNotSimilar(): Unit = {
-    testFunction(
-      !'f0.similar("_*"),
-      "!f0.similar('_*')",
-      "f0 NOT SIMILAR TO '_*'",
-      "false")
-
-    testFunction(
-      !'f0.similar("This (is)? a (test)+ Strin_*"),
-      "!f0.similar('This (is)? a (test)+ Strin_*')",
-      "f0 NOT SIMILAR TO 'This (is)? a (test)+ Strin_*'",
-      "false")
-  }
-
-  @Test
-  def testMod(): Unit = {
-    testFunction(
-      'f4.mod('f7),
-      "f4.mod(f7)",
-      "MOD(f4, f7)",
-      "2")
-
-    testFunction(
-      'f4.mod(3),
-      "mod(f4, 3)",
-      "MOD(f4, 3)",
-      "2")
-
-    testFunction(
-      'f4 % 3,
-      "mod(44, 3)",
-      "MOD(44, 3)",
-      "2")
-
-  }
-
-  @Test
-  def testExp(): Unit = {
-    testFunction(
-      'f2.exp(),
-      "f2.exp()",
-      "EXP(f2)",
-      math.exp(42.toByte).toString)
-
-    testFunction(
-      'f3.exp(),
-      "f3.exp()",
-      "EXP(f3)",
-      math.exp(43.toShort).toString)
-
-    testFunction(
-      'f4.exp(),
-      "f4.exp()",
-      "EXP(f4)",
-      math.exp(44.toLong).toString)
-
-    testFunction(
-      'f5.exp(),
-      "f5.exp()",
-      "EXP(f5)",
-      math.exp(4.5.toFloat).toString)
-
-    testFunction(
-      'f6.exp(),
-      "f6.exp()",
-      "EXP(f6)",
-      math.exp(4.6).toString)
-
-    testFunction(
-      'f7.exp(),
-      "exp(3)",
-      "EXP(3)",
-      math.exp(3).toString)
-  }
-
-  @Test
-  def testLog10(): Unit = {
-    testFunction(
-      'f2.log10(),
-      "f2.log10()",
-      "LOG10(f2)",
-      math.log10(42.toByte).toString)
-
-    testFunction(
-      'f3.log10(),
-      "f3.log10()",
-      "LOG10(f3)",
-      math.log10(43.toShort).toString)
-
-    testFunction(
-      'f4.log10(),
-      "f4.log10()",
-      "LOG10(f4)",
-      math.log10(44.toLong).toString)
-
-    testFunction(
-      'f5.log10(),
-      "f5.log10()",
-      "LOG10(f5)",
-      math.log10(4.5.toFloat).toString)
-
-    testFunction(
-      'f6.log10(),
-      "f6.log10()",
-      "LOG10(f6)",
-      math.log10(4.6).toString)
-  }
-
-  @Test
-  def testPower(): Unit = {
-    testFunction(
-      'f2.power('f7),
-      "f2.power(f7)",
-      "POWER(f2, f7)",
-      math.pow(42.toByte, 3).toString)
-
-    testFunction(
-      'f3.power('f6),
-      "f3.power(f6)",
-      "POWER(f3, f6)",
-      math.pow(43.toShort, 4.6D).toString)
-
-    testFunction(
-      'f4.power('f5),
-      "f4.power(f5)",
-      "POWER(f4, f5)",
-      math.pow(44.toLong, 4.5.toFloat).toString)
-  }
-
-  @Test
-  def testLn(): Unit = {
-    testFunction(
-      'f2.ln(),
-      "f2.ln()",
-      "LN(f2)",
-      math.log(42.toByte).toString)
-
-    testFunction(
-      'f3.ln(),
-      "f3.ln()",
-      "LN(f3)",
-      math.log(43.toShort).toString)
-
-    testFunction(
-      'f4.ln(),
-      "f4.ln()",
-      "LN(f4)",
-      math.log(44.toLong).toString)
-
-    testFunction(
-      'f5.ln(),
-      "f5.ln()",
-      "LN(f5)",
-      math.log(4.5.toFloat).toString)
-
-    testFunction(
-      'f6.ln(),
-      "f6.ln()",
-      "LN(f6)",
-      math.log(4.6).toString)
-  }
-
-  @Test
-  def testAbs(): Unit = {
-    testFunction(
-      'f2.abs(),
-      "f2.abs()",
-      "ABS(f2)",
-      "42")
-
-    testFunction(
-      'f3.abs(),
-      "f3.abs()",
-      "ABS(f3)",
-      "43")
-
-    testFunction(
-      'f4.abs(),
-      "f4.abs()",
-      "ABS(f4)",
-      "44")
-
-    testFunction(
-      'f5.abs(),
-      "f5.abs()",
-      "ABS(f5)",
-      "4.5")
-
-    testFunction(
-      'f6.abs(),
-      "f6.abs()",
-      "ABS(f6)",
-      "4.6")
-
-    testFunction(
-      'f9.abs(),
-      "f9.abs()",
-      "ABS(f9)",
-      "42")
-
-    testFunction(
-      'f10.abs(),
-      "f10.abs()",
-      "ABS(f10)",
-      "43")
-
-    testFunction(
-      'f11.abs(),
-      "f11.abs()",
-      "ABS(f11)",
-      "44")
-
-    testFunction(
-      'f12.abs(),
-      "f12.abs()",
-      "ABS(f12)",
-      "4.5")
-
-    testFunction(
-      'f13.abs(),
-      "f13.abs()",
-      "ABS(f13)",
-      "4.6")
-  }
-
-  @Test
-  def testArithmeticFloorCeil(): Unit = {
-    testFunction(
-      'f5.floor(),
-      "f5.floor()",
-      "FLOOR(f5)",
-      "4.0")
-
-    testFunction(
-     'f5.ceil(),
-      "f5.ceil()",
-      "CEIL(f5)",
-      "5.0")
-
-    testFunction(
-      'f3.floor(),
-      "f3.floor()",
-      "FLOOR(f3)",
-      "43")
-
-    testFunction(
-      'f3.ceil(),
-      "f3.ceil()",
-      "CEIL(f3)",
-      "43")
-  }
-
-  // 
----------------------------------------------------------------------------------------------
-
-  def testFunction(
-      expr: Expression,
-      exprString: String,
-      sqlExpr: String,
-      expected: String): Unit = {
-    val testData = new Row(15)
-    testData.setField(0, "This is a test String.")
-    testData.setField(1, true)
-    testData.setField(2, 42.toByte)
-    testData.setField(3, 43.toShort)
-    testData.setField(4, 44.toLong)
-    testData.setField(5, 4.5.toFloat)
-    testData.setField(6, 4.6)
-    testData.setField(7, 3)
-    testData.setField(8, " This is a test String. ")
-    testData.setField(9, -42.toByte)
-    testData.setField(10, -43.toShort)
-    testData.setField(11, -44.toLong)
-    testData.setField(12, -4.5.toFloat)
-    testData.setField(13, -4.6)
-    testData.setField(14, -3)
-
-    val typeInfo = new RowTypeInfo(Seq(
-      STRING_TYPE_INFO,
-      BOOLEAN_TYPE_INFO,
-      BYTE_TYPE_INFO,
-      SHORT_TYPE_INFO,
-      LONG_TYPE_INFO,
-      FLOAT_TYPE_INFO,
-      DOUBLE_TYPE_INFO,
-      INT_TYPE_INFO,
-      STRING_TYPE_INFO,
-      BYTE_TYPE_INFO,
-      SHORT_TYPE_INFO,
-      LONG_TYPE_INFO,
-      FLOAT_TYPE_INFO,
-      DOUBLE_TYPE_INFO,
-      INT_TYPE_INFO)).asInstanceOf[TypeInformation[Any]]
-
-    val exprResult = ExpressionEvaluator.evaluate(testData, typeInfo, expr)
-    assertEquals(expected, exprResult)
-
-    val exprStringResult = ExpressionEvaluator.evaluate(
-      testData,
-      typeInfo,
-      ExpressionParser.parseExpression(exprString))
-    assertEquals(expected, exprStringResult)
-
-    val exprSqlResult = ExpressionEvaluator.evaluate(testData, typeInfo, 
sqlExpr)
-    assertEquals(expected, exprSqlResult)
-  }
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/343d05a4/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/utils/ExpressionEvaluator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/utils/ExpressionEvaluator.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/utils/ExpressionEvaluator.scala
deleted file mode 100644
index fe606e0..0000000
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/expression/utils/ExpressionEvaluator.scala
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.expression.utils
-
-import org.apache.calcite.rel.logical.LogicalProject
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.`type`.SqlTypeName.VARCHAR
-import org.apache.calcite.tools.{Frameworks, RelBuilder}
-import org.apache.flink.api.common.functions.{Function, MapFunction}
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.{DataSet => JDataSet}
-import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
-import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedFunction}
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.table.runtime.FunctionCompiler
-import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, 
TableEnvironment}
-import org.mockito.Mockito._
-
-/**
-  * Utility to translate and evaluate an RexNode or Table API expression to a 
String.
-  */
-object ExpressionEvaluator {
-
-  // TestCompiler that uses current class loader
-  class TestCompiler[T <: Function] extends FunctionCompiler[T] {
-    def compile(genFunc: GeneratedFunction[T]): Class[T] =
-      compile(getClass.getClassLoader, genFunc.name, genFunc.code)
-  }
-
-  private def prepareTable(
-    typeInfo: TypeInformation[Any]): (String, RelBuilder, TableEnvironment) = {
-
-    // create DataSetTable
-    val dataSetMock = mock(classOf[DataSet[Any]])
-    val jDataSetMock = mock(classOf[JDataSet[Any]])
-    when(dataSetMock.javaSet).thenReturn(jDataSetMock)
-    when(jDataSetMock.getType).thenReturn(typeInfo)
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val tableName = "myTable"
-    tEnv.registerDataSet(tableName, dataSetMock)
-
-    // prepare RelBuilder
-    val relBuilder = tEnv.getRelBuilder
-    relBuilder.scan(tableName)
-
-    (tableName, relBuilder, tEnv)
-  }
-
-  def evaluate(data: Any, typeInfo: TypeInformation[Any], sqlExpr: String): 
String = {
-    // create DataSetTable
-    val table = prepareTable(typeInfo)
-
-    // create RelNode from SQL expression
-    val planner = Frameworks.getPlanner(table._3.getFrameworkConfig)
-    val parsed = planner.parse("SELECT " + sqlExpr + " FROM " + table._1)
-    val validated = planner.validate(parsed)
-    val converted = planner.rel(validated)
-
-    val expr: RexNode = 
converted.rel.asInstanceOf[LogicalProject].getChildExps.get(0)
-
-    evaluate(data, typeInfo, table._2, expr)
-  }
-
-  def evaluate(data: Any, typeInfo: TypeInformation[Any], expr: Expression): 
String = {
-    val table = prepareTable(typeInfo)
-    val env = table._3
-    val resolvedExpr =
-      env.asInstanceOf[BatchTableEnvironment].scan("myTable").select(expr).
-        getRelNode.asInstanceOf[LogicalProject].getChildExps.get(0)
-    evaluate(data, typeInfo, table._2, resolvedExpr)
-  }
-
-  def evaluate(
-      data: Any,
-      typeInfo: TypeInformation[Any],
-      relBuilder: RelBuilder,
-      rexNode: RexNode): String = {
-    // generate code for Mapper
-    val config = new TableConfig()
-    val generator = new CodeGenerator(config, false, typeInfo)
-    val genExpr = generator.generateExpression(relBuilder.cast(rexNode, 
VARCHAR)) // cast to String
-    val bodyCode =
-      s"""
-        |${genExpr.code}
-        |return ${genExpr.resultTerm};
-        |""".stripMargin
-    val genFunc = generator.generateFunction[MapFunction[Any, String]](
-      "TestFunction",
-      classOf[MapFunction[Any, String]],
-      bodyCode,
-      STRING_TYPE_INFO.asInstanceOf[TypeInformation[Any]])
-
-    // compile and evaluate
-    val clazz = new TestCompiler[MapFunction[Any, String]]().compile(genFunc)
-    val mapper = clazz.newInstance()
-    mapper.map(data)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/343d05a4/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/typeutils/RowComparatorTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/typeutils/RowComparatorTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/typeutils/RowComparatorTest.scala
deleted file mode 100644
index 9ceb9d2..0000000
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/typeutils/RowComparatorTest.scala
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.typeutils
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.common.typeutils.{ComparatorTestBase, 
TypeComparator, TypeSerializer}
-import org.apache.flink.api.java.tuple
-import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
-import org.apache.flink.api.scala.typeutils.RowComparatorTest.MyPojo
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.typeutils.RowTypeInfo
-import org.junit.Assert._
-
-class RowComparatorTest extends ComparatorTestBase[Row] {
-
-  val typeInfo = new RowTypeInfo(
-    Array(
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.DOUBLE_TYPE_INFO,
-      BasicTypeInfo.STRING_TYPE_INFO,
-      new TupleTypeInfo[tuple.Tuple2[Int, Boolean]](
-        BasicTypeInfo.INT_TYPE_INFO,
-        BasicTypeInfo.BOOLEAN_TYPE_INFO,
-        BasicTypeInfo.SHORT_TYPE_INFO),
-      TypeExtractor.createTypeInfo(classOf[MyPojo])))
-
-  val testPojo1 = new MyPojo()
-  // TODO we cannot test null here as PojoComparator has no support for null 
keys
-  testPojo1.name = ""
-  val testPojo2 = new MyPojo()
-  testPojo2.name = "Test1"
-  val testPojo3 = new MyPojo()
-  testPojo3.name = "Test2"
-
-  val data: Array[Row] = Array(
-    createRow(null, null, null, null, null),
-    createRow(0, null, null, null, null),
-    createRow(0, 0.0, null, null, null),
-    createRow(0, 0.0, "a", null, null),
-    createRow(1, 0.0, "a", null, null),
-    createRow(1, 1.0, "a", null, null),
-    createRow(1, 1.0, "b", null, null),
-    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](1, false, 2), 
null),
-    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, false, 2), 
null),
-    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 2), 
null),
-    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), 
null),
-    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), 
testPojo1),
-    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), 
testPojo2),
-    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), 
testPojo3)
-    )
-
-  override protected def deepEquals(message: String, should: Row, is: Row): 
Unit = {
-    val arity = should.productArity
-    assertEquals(message, arity, is.productArity)
-    var index = 0
-    while (index < arity) {
-      val copiedValue: Any = should.productElement(index)
-      val element: Any = is.productElement(index)
-      assertEquals(message, element, copiedValue)
-      index += 1
-    }
-  }
-
-  override protected def createComparator(ascending: Boolean): 
TypeComparator[Row] = {
-    typeInfo.createComparator(
-      Array(0, 1, 2, 3, 4, 5, 6),
-      Array(ascending, ascending, ascending, ascending, ascending, ascending, 
ascending),
-      0,
-      new ExecutionConfig())
-  }
-
-  override protected def createSerializer(): TypeSerializer[Row] = {
-    typeInfo.createSerializer(new ExecutionConfig())
-  }
-
-  override protected def getSortedTestData: Array[Row] = {
-    data
-  }
-
-  override protected def supportsNullKeys: Boolean = true
-
-  def createRow(f0: Any, f1: Any, f2: Any, f3: Any, f4: Any): Row = {
-    val r: Row = new Row(5)
-    r.setField(0, f0)
-    r.setField(1, f1)
-    r.setField(2, f2)
-    r.setField(3, f3)
-    r.setField(4, f4)
-    r
-  }
-}
-
-object RowComparatorTest {
-
-  class MyPojo() extends Serializable with Comparable[MyPojo] {
-    // we cannot use null because the PojoComparator does not support null 
properly
-    var name: String = ""
-
-    override def compareTo(o: MyPojo): Int = {
-      if (name == null && o.name == null) {
-        0
-      }
-      else if (name == null) {
-        -1
-      }
-      else if (o.name == null) {
-        1
-      }
-      else {
-        name.compareTo(o.name)
-      }
-    }
-
-    override def equals(other: Any): Boolean = other match {
-      case that: MyPojo => compareTo(that) == 0
-      case _ => false
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/343d05a4/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/typeutils/RowSerializerTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/typeutils/RowSerializerTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/typeutils/RowSerializerTest.scala
deleted file mode 100644
index 95a1bb5..0000000
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/typeutils/RowSerializerTest.scala
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.table.typeutils
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.common.typeutils.{SerializerTestInstance, 
TypeSerializer}
-import org.apache.flink.api.java.tuple
-import org.apache.flink.api.java.typeutils.{TypeExtractor, TupleTypeInfo}
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.table.typeutils.RowSerializerTest.MyPojo
-import org.junit.Assert._
-import org.junit.Test
-
-class RowSerializerTest {
-
-  class RowSerializerTestInstance(
-      serializer: TypeSerializer[Row],
-      testData: Array[Row])
-    extends SerializerTestInstance[Row](serializer, classOf[Row], -1, 
testData: _*) {
-
-    override protected def deepEquals(message: String, should: Row, is: Row): 
Unit = {
-      val arity = should.productArity
-      assertEquals(message, arity, is.productArity)
-      var index = 0
-      while (index < arity) {
-        val copiedValue: Any = should.productElement(index)
-        val element: Any = is.productElement(index)
-        assertEquals(message, element, copiedValue)
-        index += 1
-      }
-    }
-  }
-
-  @Test
-  def testRowSerializer(): Unit = {
-    val rowInfo: TypeInformation[Row] = new RowTypeInfo(
-      Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO))
-
-    val row1 = new Row(2)
-    row1.setField(0, 1)
-    row1.setField(1, "a")
-
-    val row2 = new Row(2)
-    row2.setField(0, 2)
-    row2.setField(1, null)
-
-    val testData: Array[Row] = Array(row1, row2)
-
-    val rowSerializer: TypeSerializer[Row] = rowInfo.createSerializer(new 
ExecutionConfig)
-
-    val testInstance = new RowSerializerTestInstance(rowSerializer, testData)
-
-    testInstance.testAll()
-  }
-
-  @Test
-  def testLargeRowSerializer(): Unit = {
-    val rowInfo: TypeInformation[Row] = new RowTypeInfo(Seq(
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.STRING_TYPE_INFO))
-
-    val row = new Row(13)
-    row.setField(0, 2)
-    row.setField(1, null)
-    row.setField(3, null)
-    row.setField(4, null)
-    row.setField(5, null)
-    row.setField(6, null)
-    row.setField(7, null)
-    row.setField(8, null)
-    row.setField(9, null)
-    row.setField(10, null)
-    row.setField(11, null)
-    row.setField(12, "Test")
-
-    val testData: Array[Row] = Array(row)
-
-    val rowSerializer: TypeSerializer[Row] = rowInfo.createSerializer(new 
ExecutionConfig)
-
-    val testInstance = new RowSerializerTestInstance(rowSerializer, testData)
-
-    testInstance.testAll()
-  }
-
-  @Test
-  def testRowSerializerWithComplexTypes(): Unit = {
-    val rowInfo = new RowTypeInfo(
-      Array(
-        BasicTypeInfo.INT_TYPE_INFO,
-        BasicTypeInfo.DOUBLE_TYPE_INFO,
-        BasicTypeInfo.STRING_TYPE_INFO,
-        new TupleTypeInfo[tuple.Tuple2[Int, Boolean]](
-          BasicTypeInfo.INT_TYPE_INFO,
-          BasicTypeInfo.BOOLEAN_TYPE_INFO,
-          BasicTypeInfo.SHORT_TYPE_INFO),
-        TypeExtractor.createTypeInfo(classOf[MyPojo])))
-
-    val testPojo1 = new MyPojo()
-    testPojo1.name = null
-    val testPojo2 = new MyPojo()
-    testPojo2.name = "Test1"
-    val testPojo3 = new MyPojo()
-    testPojo3.name = "Test2"
-
-    val testData: Array[Row] = Array(
-      createRow(null, null, null, null, null),
-      createRow(0, null, null, null, null),
-      createRow(0, 0.0, null, null, null),
-      createRow(0, 0.0, "a", null, null),
-      createRow(1, 0.0, "a", null, null),
-      createRow(1, 1.0, "a", null, null),
-      createRow(1, 1.0, "b", null, null),
-      createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](1, false, 
2), null),
-      createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, false, 
2), null),
-      createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 
2), null),
-      createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 
3), null),
-      createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 
3), testPojo1),
-      createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 
3), testPojo2),
-      createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 
3), testPojo3)
-    )
-
-    val rowSerializer: TypeSerializer[Row] = rowInfo.createSerializer(new 
ExecutionConfig)
-
-    val testInstance = new RowSerializerTestInstance(rowSerializer, testData)
-
-    testInstance.testAll()
-  }
-
-  // 
----------------------------------------------------------------------------------------------
-
-  def createRow(f0: Any, f1: Any, f2: Any, f3: Any, f4: Any): Row = {
-    val r: Row = new Row(5)
-    r.setField(0, f0)
-    r.setField(1, f1)
-    r.setField(2, f2)
-    r.setField(3, f3)
-    r.setField(4, f4)
-    r
-  }
-}
-
-object RowSerializerTest {
-  class MyPojo() extends Serializable with Comparable[MyPojo] {
-    var name: String = null
-
-    override def compareTo(o: MyPojo): Int = {
-      if (name == null && o.name == null) {
-        0
-      }
-      else if (name == null) {
-        -1
-      }
-      else if (o.name == null) {
-        1
-      }
-      else {
-        name.compareTo(o.name)
-      }
-    }
-
-    override def equals(other: Any): Boolean = other match {
-      case that: MyPojo => compareTo(that) == 0
-      case _ => false
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/343d05a4/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
new file mode 100644
index 0000000..a3b67f5
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/ScalarFunctionsTest.scala
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.expressions.utils.ExpressionEvaluator
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.expressions.{Expression, ExpressionParser}
+import org.apache.flink.api.table.typeutils.RowTypeInfo
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class ScalarFunctionsTest {
+
+  // 
----------------------------------------------------------------------------------------------
+  // String functions
+  // 
----------------------------------------------------------------------------------------------
+
+  @Test
+  def testSubstring(): Unit = {
+    testFunction(
+      'f0.substring(2),
+      "f0.substring(2)",
+      "SUBSTRING(f0, 2)",
+      "his is a test String.")
+
+    testFunction(
+      'f0.substring(2, 5),
+      "f0.substring(2, 5)",
+      "SUBSTRING(f0, 2, 5)",
+      "his i")
+
+    testFunction(
+      'f0.substring(1, 'f7),
+      "f0.substring(1, f7)",
+      "SUBSTRING(f0, 1, f7)",
+      "Thi")
+  }
+
+  @Test
+  def testTrim(): Unit = {
+    testFunction(
+      'f8.trim(),
+      "f8.trim()",
+      "TRIM(f8)",
+      "This is a test String.")
+
+    testFunction(
+      'f8.trim(removeLeading = true, removeTrailing = true, " "),
+      "trim(f8)",
+      "TRIM(f8)",
+      "This is a test String.")
+
+    testFunction(
+      'f8.trim(removeLeading = false, removeTrailing = true, " "),
+      "f8.trim(TRAILING, ' ')",
+      "TRIM(TRAILING FROM f8)",
+      " This is a test String.")
+
+    testFunction(
+      'f0.trim(removeLeading = true, removeTrailing = true, "."),
+      "trim(BOTH, '.', f0)",
+      "TRIM(BOTH '.' FROM f0)",
+      "This is a test String")
+  }
+
+  @Test
+  def testCharLength(): Unit = {
+    testFunction(
+      'f0.charLength(),
+      "f0.charLength()",
+      "CHAR_LENGTH(f0)",
+      "22")
+
+    testFunction(
+      'f0.charLength(),
+      "charLength(f0)",
+      "CHARACTER_LENGTH(f0)",
+      "22")
+  }
+
+  @Test
+  def testUpperCase(): Unit = {
+    testFunction(
+      'f0.upperCase(),
+      "f0.upperCase()",
+      "UPPER(f0)",
+      "THIS IS A TEST STRING.")
+  }
+
+  @Test
+  def testLowerCase(): Unit = {
+    testFunction(
+      'f0.lowerCase(),
+      "f0.lowerCase()",
+      "LOWER(f0)",
+      "this is a test string.")
+  }
+
+  @Test
+  def testInitCap(): Unit = {
+    testFunction(
+      'f0.initCap(),
+      "f0.initCap()",
+      "INITCAP(f0)",
+      "This Is A Test String.")
+  }
+
+  @Test
+  def testConcat(): Unit = {
+    testFunction(
+      'f0 + 'f0,
+      "f0 + f0",
+      "f0||f0",
+      "This is a test String.This is a test String.")
+  }
+
+  @Test
+  def testLike(): Unit = {
+    testFunction(
+      'f0.like("Th_s%"),
+      "f0.like('Th_s%')",
+      "f0 LIKE 'Th_s%'",
+      "true")
+
+    testFunction(
+      'f0.like("%is a%"),
+      "f0.like('%is a%')",
+      "f0 LIKE '%is a%'",
+      "true")
+  }
+
+  @Test
+  def testNotLike(): Unit = {
+    testFunction(
+      !'f0.like("Th_s%"),
+      "!f0.like('Th_s%')",
+      "f0 NOT LIKE 'Th_s%'",
+      "false")
+
+    testFunction(
+      !'f0.like("%is a%"),
+      "!f0.like('%is a%')",
+      "f0 NOT LIKE '%is a%'",
+      "false")
+  }
+
+  @Test
+  def testSimilar(): Unit = {
+    testFunction(
+      'f0.similar("_*"),
+      "f0.similar('_*')",
+      "f0 SIMILAR TO '_*'",
+      "true")
+
+    testFunction(
+      'f0.similar("This (is)? a (test)+ Strin_*"),
+      "f0.similar('This (is)? a (test)+ Strin_*')",
+      "f0 SIMILAR TO 'This (is)? a (test)+ Strin_*'",
+      "true")
+  }
+
+  @Test
+  def testNotSimilar(): Unit = {
+    testFunction(
+      !'f0.similar("_*"),
+      "!f0.similar('_*')",
+      "f0 NOT SIMILAR TO '_*'",
+      "false")
+
+    testFunction(
+      !'f0.similar("This (is)? a (test)+ Strin_*"),
+      "!f0.similar('This (is)? a (test)+ Strin_*')",
+      "f0 NOT SIMILAR TO 'This (is)? a (test)+ Strin_*'",
+      "false")
+  }
+
+  @Test
+  def testMod(): Unit = {
+    testFunction(
+      'f4.mod('f7),
+      "f4.mod(f7)",
+      "MOD(f4, f7)",
+      "2")
+
+    testFunction(
+      'f4.mod(3),
+      "mod(f4, 3)",
+      "MOD(f4, 3)",
+      "2")
+
+    testFunction(
+      'f4 % 3,
+      "mod(44, 3)",
+      "MOD(44, 3)",
+      "2")
+
+  }
+
+  @Test
+  def testExp(): Unit = {
+    testFunction(
+      'f2.exp(),
+      "f2.exp()",
+      "EXP(f2)",
+      math.exp(42.toByte).toString)
+
+    testFunction(
+      'f3.exp(),
+      "f3.exp()",
+      "EXP(f3)",
+      math.exp(43.toShort).toString)
+
+    testFunction(
+      'f4.exp(),
+      "f4.exp()",
+      "EXP(f4)",
+      math.exp(44.toLong).toString)
+
+    testFunction(
+      'f5.exp(),
+      "f5.exp()",
+      "EXP(f5)",
+      math.exp(4.5.toFloat).toString)
+
+    testFunction(
+      'f6.exp(),
+      "f6.exp()",
+      "EXP(f6)",
+      math.exp(4.6).toString)
+
+    testFunction(
+      'f7.exp(),
+      "exp(3)",
+      "EXP(3)",
+      math.exp(3).toString)
+  }
+
+  @Test
+  def testLog10(): Unit = {
+    testFunction(
+      'f2.log10(),
+      "f2.log10()",
+      "LOG10(f2)",
+      math.log10(42.toByte).toString)
+
+    testFunction(
+      'f3.log10(),
+      "f3.log10()",
+      "LOG10(f3)",
+      math.log10(43.toShort).toString)
+
+    testFunction(
+      'f4.log10(),
+      "f4.log10()",
+      "LOG10(f4)",
+      math.log10(44.toLong).toString)
+
+    testFunction(
+      'f5.log10(),
+      "f5.log10()",
+      "LOG10(f5)",
+      math.log10(4.5.toFloat).toString)
+
+    testFunction(
+      'f6.log10(),
+      "f6.log10()",
+      "LOG10(f6)",
+      math.log10(4.6).toString)
+  }
+
+  @Test
+  def testPower(): Unit = {
+    testFunction(
+      'f2.power('f7),
+      "f2.power(f7)",
+      "POWER(f2, f7)",
+      math.pow(42.toByte, 3).toString)
+
+    testFunction(
+      'f3.power('f6),
+      "f3.power(f6)",
+      "POWER(f3, f6)",
+      math.pow(43.toShort, 4.6D).toString)
+
+    testFunction(
+      'f4.power('f5),
+      "f4.power(f5)",
+      "POWER(f4, f5)",
+      math.pow(44.toLong, 4.5.toFloat).toString)
+  }
+
+  @Test
+  def testLn(): Unit = {
+    testFunction(
+      'f2.ln(),
+      "f2.ln()",
+      "LN(f2)",
+      math.log(42.toByte).toString)
+
+    testFunction(
+      'f3.ln(),
+      "f3.ln()",
+      "LN(f3)",
+      math.log(43.toShort).toString)
+
+    testFunction(
+      'f4.ln(),
+      "f4.ln()",
+      "LN(f4)",
+      math.log(44.toLong).toString)
+
+    testFunction(
+      'f5.ln(),
+      "f5.ln()",
+      "LN(f5)",
+      math.log(4.5.toFloat).toString)
+
+    testFunction(
+      'f6.ln(),
+      "f6.ln()",
+      "LN(f6)",
+      math.log(4.6).toString)
+  }
+
+  @Test
+  def testAbs(): Unit = {
+    testFunction(
+      'f2.abs(),
+      "f2.abs()",
+      "ABS(f2)",
+      "42")
+
+    testFunction(
+      'f3.abs(),
+      "f3.abs()",
+      "ABS(f3)",
+      "43")
+
+    testFunction(
+      'f4.abs(),
+      "f4.abs()",
+      "ABS(f4)",
+      "44")
+
+    testFunction(
+      'f5.abs(),
+      "f5.abs()",
+      "ABS(f5)",
+      "4.5")
+
+    testFunction(
+      'f6.abs(),
+      "f6.abs()",
+      "ABS(f6)",
+      "4.6")
+
+    testFunction(
+      'f9.abs(),
+      "f9.abs()",
+      "ABS(f9)",
+      "42")
+
+    testFunction(
+      'f10.abs(),
+      "f10.abs()",
+      "ABS(f10)",
+      "43")
+
+    testFunction(
+      'f11.abs(),
+      "f11.abs()",
+      "ABS(f11)",
+      "44")
+
+    testFunction(
+      'f12.abs(),
+      "f12.abs()",
+      "ABS(f12)",
+      "4.5")
+
+    testFunction(
+      'f13.abs(),
+      "f13.abs()",
+      "ABS(f13)",
+      "4.6")
+  }
+
+  @Test
+  def testArithmeticFloorCeil(): Unit = {
+    testFunction(
+      'f5.floor(),
+      "f5.floor()",
+      "FLOOR(f5)",
+      "4.0")
+
+    testFunction(
+     'f5.ceil(),
+      "f5.ceil()",
+      "CEIL(f5)",
+      "5.0")
+
+    testFunction(
+      'f3.floor(),
+      "f3.floor()",
+      "FLOOR(f3)",
+      "43")
+
+    testFunction(
+      'f3.ceil(),
+      "f3.ceil()",
+      "CEIL(f3)",
+      "43")
+  }
+
+  // 
----------------------------------------------------------------------------------------------
+
+  def testFunction(
+      expr: Expression,
+      exprString: String,
+      sqlExpr: String,
+      expected: String): Unit = {
+    val testData = new Row(15)
+    testData.setField(0, "This is a test String.")
+    testData.setField(1, true)
+    testData.setField(2, 42.toByte)
+    testData.setField(3, 43.toShort)
+    testData.setField(4, 44.toLong)
+    testData.setField(5, 4.5.toFloat)
+    testData.setField(6, 4.6)
+    testData.setField(7, 3)
+    testData.setField(8, " This is a test String. ")
+    testData.setField(9, -42.toByte)
+    testData.setField(10, -43.toShort)
+    testData.setField(11, -44.toLong)
+    testData.setField(12, -4.5.toFloat)
+    testData.setField(13, -4.6)
+    testData.setField(14, -3)
+
+    val typeInfo = new RowTypeInfo(Seq(
+      STRING_TYPE_INFO,
+      BOOLEAN_TYPE_INFO,
+      BYTE_TYPE_INFO,
+      SHORT_TYPE_INFO,
+      LONG_TYPE_INFO,
+      FLOAT_TYPE_INFO,
+      DOUBLE_TYPE_INFO,
+      INT_TYPE_INFO,
+      STRING_TYPE_INFO,
+      BYTE_TYPE_INFO,
+      SHORT_TYPE_INFO,
+      LONG_TYPE_INFO,
+      FLOAT_TYPE_INFO,
+      DOUBLE_TYPE_INFO,
+      INT_TYPE_INFO)).asInstanceOf[TypeInformation[Any]]
+
+    val exprResult = ExpressionEvaluator.evaluate(testData, typeInfo, expr)
+    assertEquals(expected, exprResult)
+
+    val exprStringResult = ExpressionEvaluator.evaluate(
+      testData,
+      typeInfo,
+      ExpressionParser.parseExpression(exprString))
+    assertEquals(expected, exprStringResult)
+
+    val exprSqlResult = ExpressionEvaluator.evaluate(testData, typeInfo, 
sqlExpr)
+    assertEquals(expected, exprSqlResult)
+  }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/343d05a4/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionEvaluator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionEvaluator.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionEvaluator.scala
new file mode 100644
index 0000000..0b5a2de
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/expressions/utils/ExpressionEvaluator.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.expressions.utils
+
+import org.apache.calcite.rel.logical.LogicalProject
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.`type`.SqlTypeName.VARCHAR
+import org.apache.calcite.tools.{Frameworks, RelBuilder}
+import org.apache.flink.api.common.functions.{Function, MapFunction}
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.{DataSet => JDataSet}
+import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
+import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedFunction}
+import org.apache.flink.api.table.expressions.Expression
+import org.apache.flink.api.table.runtime.FunctionCompiler
+import org.apache.flink.api.table.{BatchTableEnvironment, TableConfig, 
TableEnvironment}
+import org.mockito.Mockito._
+
+/**
+  * Utility to translate and evaluate an RexNode or Table API expression to a 
String.
+  */
+object ExpressionEvaluator {
+
+  // TestCompiler that uses current class loader
+  class TestCompiler[T <: Function] extends FunctionCompiler[T] {
+    def compile(genFunc: GeneratedFunction[T]): Class[T] =
+      compile(getClass.getClassLoader, genFunc.name, genFunc.code)
+  }
+
+  private def prepareTable(
+    typeInfo: TypeInformation[Any]): (String, RelBuilder, TableEnvironment) = {
+
+    // create DataSetTable
+    val dataSetMock = mock(classOf[DataSet[Any]])
+    val jDataSetMock = mock(classOf[JDataSet[Any]])
+    when(dataSetMock.javaSet).thenReturn(jDataSetMock)
+    when(jDataSetMock.getType).thenReturn(typeInfo)
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val tableName = "myTable"
+    tEnv.registerDataSet(tableName, dataSetMock)
+
+    // prepare RelBuilder
+    val relBuilder = tEnv.getRelBuilder
+    relBuilder.scan(tableName)
+
+    (tableName, relBuilder, tEnv)
+  }
+
+  def evaluate(data: Any, typeInfo: TypeInformation[Any], sqlExpr: String): 
String = {
+    // create DataSetTable
+    val table = prepareTable(typeInfo)
+
+    // create RelNode from SQL expression
+    val planner = Frameworks.getPlanner(table._3.getFrameworkConfig)
+    val parsed = planner.parse("SELECT " + sqlExpr + " FROM " + table._1)
+    val validated = planner.validate(parsed)
+    val converted = planner.rel(validated)
+
+    val expr: RexNode = 
converted.rel.asInstanceOf[LogicalProject].getChildExps.get(0)
+
+    evaluate(data, typeInfo, table._2, expr)
+  }
+
+  def evaluate(data: Any, typeInfo: TypeInformation[Any], expr: Expression): 
String = {
+    val table = prepareTable(typeInfo)
+    val env = table._3
+    val resolvedExpr =
+      env.asInstanceOf[BatchTableEnvironment].scan("myTable").select(expr).
+        getRelNode.asInstanceOf[LogicalProject].getChildExps.get(0)
+    evaluate(data, typeInfo, table._2, resolvedExpr)
+  }
+
+  def evaluate(
+      data: Any,
+      typeInfo: TypeInformation[Any],
+      relBuilder: RelBuilder,
+      rexNode: RexNode): String = {
+    // generate code for Mapper
+    val config = new TableConfig()
+    val generator = new CodeGenerator(config, false, typeInfo)
+    val genExpr = generator.generateExpression(relBuilder.cast(rexNode, 
VARCHAR)) // cast to String
+    val bodyCode =
+      s"""
+        |${genExpr.code}
+        |return ${genExpr.resultTerm};
+        |""".stripMargin
+    val genFunc = generator.generateFunction[MapFunction[Any, String]](
+      "TestFunction",
+      classOf[MapFunction[Any, String]],
+      bodyCode,
+      STRING_TYPE_INFO.asInstanceOf[TypeInformation[Any]])
+
+    // compile and evaluate
+    val clazz = new TestCompiler[MapFunction[Any, String]]().compile(genFunc)
+    val mapper = clazz.newInstance()
+    mapper.map(data)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/343d05a4/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorTest.scala
new file mode 100644
index 0000000..557db3a
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowComparatorTest.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.typeutils
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.typeutils.{ComparatorTestBase, 
TypeComparator, TypeSerializer}
+import org.apache.flink.api.java.tuple
+import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.typeutils.RowComparatorTest.MyPojo
+import org.junit.Assert._
+
+class RowComparatorTest extends ComparatorTestBase[Row] {
+
+  val typeInfo = new RowTypeInfo(
+    Array(
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.DOUBLE_TYPE_INFO,
+      BasicTypeInfo.STRING_TYPE_INFO,
+      new TupleTypeInfo[tuple.Tuple2[Int, Boolean]](
+        BasicTypeInfo.INT_TYPE_INFO,
+        BasicTypeInfo.BOOLEAN_TYPE_INFO,
+        BasicTypeInfo.SHORT_TYPE_INFO),
+      TypeExtractor.createTypeInfo(classOf[MyPojo])))
+
+  val testPojo1 = new MyPojo()
+  // TODO we cannot test null here as PojoComparator has no support for null 
keys
+  testPojo1.name = ""
+  val testPojo2 = new MyPojo()
+  testPojo2.name = "Test1"
+  val testPojo3 = new MyPojo()
+  testPojo3.name = "Test2"
+
+  val data: Array[Row] = Array(
+    createRow(null, null, null, null, null),
+    createRow(0, null, null, null, null),
+    createRow(0, 0.0, null, null, null),
+    createRow(0, 0.0, "a", null, null),
+    createRow(1, 0.0, "a", null, null),
+    createRow(1, 1.0, "a", null, null),
+    createRow(1, 1.0, "b", null, null),
+    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](1, false, 2), 
null),
+    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, false, 2), 
null),
+    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 2), 
null),
+    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), 
null),
+    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), 
testPojo1),
+    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), 
testPojo2),
+    createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 3), 
testPojo3)
+    )
+
+  override protected def deepEquals(message: String, should: Row, is: Row): 
Unit = {
+    val arity = should.productArity
+    assertEquals(message, arity, is.productArity)
+    var index = 0
+    while (index < arity) {
+      val copiedValue: Any = should.productElement(index)
+      val element: Any = is.productElement(index)
+      assertEquals(message, element, copiedValue)
+      index += 1
+    }
+  }
+
+  override protected def createComparator(ascending: Boolean): 
TypeComparator[Row] = {
+    typeInfo.createComparator(
+      Array(0, 1, 2, 3, 4, 5, 6),
+      Array(ascending, ascending, ascending, ascending, ascending, ascending, 
ascending),
+      0,
+      new ExecutionConfig())
+  }
+
+  override protected def createSerializer(): TypeSerializer[Row] = {
+    typeInfo.createSerializer(new ExecutionConfig())
+  }
+
+  override protected def getSortedTestData: Array[Row] = {
+    data
+  }
+
+  override protected def supportsNullKeys: Boolean = true
+
+  def createRow(f0: Any, f1: Any, f2: Any, f3: Any, f4: Any): Row = {
+    val r: Row = new Row(5)
+    r.setField(0, f0)
+    r.setField(1, f1)
+    r.setField(2, f2)
+    r.setField(3, f3)
+    r.setField(4, f4)
+    r
+  }
+}
+
+object RowComparatorTest {
+
+  class MyPojo() extends Serializable with Comparable[MyPojo] {
+    // we cannot use null because the PojoComparator does not support null 
properly
+    var name: String = ""
+
+    override def compareTo(o: MyPojo): Int = {
+      if (name == null && o.name == null) {
+        0
+      }
+      else if (name == null) {
+        -1
+      }
+      else if (o.name == null) {
+        1
+      }
+      else {
+        name.compareTo(o.name)
+      }
+    }
+
+    override def equals(other: Any): Boolean = other match {
+      case that: MyPojo => compareTo(that) == 0
+      case _ => false
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/343d05a4/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowSerializerTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowSerializerTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowSerializerTest.scala
new file mode 100644
index 0000000..b52dd4d
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/typeutils/RowSerializerTest.scala
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.typeutils
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeutils.{SerializerTestInstance, 
TypeSerializer}
+import org.apache.flink.api.java.tuple
+import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
+import org.apache.flink.api.table.Row
+import org.apache.flink.api.table.typeutils.RowSerializerTest.MyPojo
+import org.junit.Assert._
+import org.junit.Test
+
+class RowSerializerTest {
+
+  class RowSerializerTestInstance(
+      serializer: TypeSerializer[Row],
+      testData: Array[Row])
+    extends SerializerTestInstance[Row](serializer, classOf[Row], -1, 
testData: _*) {
+
+    override protected def deepEquals(message: String, should: Row, is: Row): 
Unit = {
+      val arity = should.productArity
+      assertEquals(message, arity, is.productArity)
+      var index = 0
+      while (index < arity) {
+        val copiedValue: Any = should.productElement(index)
+        val element: Any = is.productElement(index)
+        assertEquals(message, element, copiedValue)
+        index += 1
+      }
+    }
+  }
+
+  @Test
+  def testRowSerializer(): Unit = {
+    val rowInfo: TypeInformation[Row] = new RowTypeInfo(
+      Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO))
+
+    val row1 = new Row(2)
+    row1.setField(0, 1)
+    row1.setField(1, "a")
+
+    val row2 = new Row(2)
+    row2.setField(0, 2)
+    row2.setField(1, null)
+
+    val testData: Array[Row] = Array(row1, row2)
+
+    val rowSerializer: TypeSerializer[Row] = rowInfo.createSerializer(new 
ExecutionConfig)
+
+    val testInstance = new RowSerializerTestInstance(rowSerializer, testData)
+
+    testInstance.testAll()
+  }
+
+  @Test
+  def testLargeRowSerializer(): Unit = {
+    val rowInfo: TypeInformation[Row] = new RowTypeInfo(Seq(
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.INT_TYPE_INFO,
+      BasicTypeInfo.STRING_TYPE_INFO))
+
+    val row = new Row(13)
+    row.setField(0, 2)
+    row.setField(1, null)
+    row.setField(3, null)
+    row.setField(4, null)
+    row.setField(5, null)
+    row.setField(6, null)
+    row.setField(7, null)
+    row.setField(8, null)
+    row.setField(9, null)
+    row.setField(10, null)
+    row.setField(11, null)
+    row.setField(12, "Test")
+
+    val testData: Array[Row] = Array(row)
+
+    val rowSerializer: TypeSerializer[Row] = rowInfo.createSerializer(new 
ExecutionConfig)
+
+    val testInstance = new RowSerializerTestInstance(rowSerializer, testData)
+
+    testInstance.testAll()
+  }
+
+  @Test
+  def testRowSerializerWithComplexTypes(): Unit = {
+    val rowInfo = new RowTypeInfo(
+      Array(
+        BasicTypeInfo.INT_TYPE_INFO,
+        BasicTypeInfo.DOUBLE_TYPE_INFO,
+        BasicTypeInfo.STRING_TYPE_INFO,
+        new TupleTypeInfo[tuple.Tuple2[Int, Boolean]](
+          BasicTypeInfo.INT_TYPE_INFO,
+          BasicTypeInfo.BOOLEAN_TYPE_INFO,
+          BasicTypeInfo.SHORT_TYPE_INFO),
+        TypeExtractor.createTypeInfo(classOf[MyPojo])))
+
+    val testPojo1 = new MyPojo()
+    testPojo1.name = null
+    val testPojo2 = new MyPojo()
+    testPojo2.name = "Test1"
+    val testPojo3 = new MyPojo()
+    testPojo3.name = "Test2"
+
+    val testData: Array[Row] = Array(
+      createRow(null, null, null, null, null),
+      createRow(0, null, null, null, null),
+      createRow(0, 0.0, null, null, null),
+      createRow(0, 0.0, "a", null, null),
+      createRow(1, 0.0, "a", null, null),
+      createRow(1, 1.0, "a", null, null),
+      createRow(1, 1.0, "b", null, null),
+      createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](1, false, 
2), null),
+      createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, false, 
2), null),
+      createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 
2), null),
+      createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 
3), null),
+      createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 
3), testPojo1),
+      createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 
3), testPojo2),
+      createRow(1, 1.0, "b", new tuple.Tuple3[Int, Boolean, Short](2, true, 
3), testPojo3)
+    )
+
+    val rowSerializer: TypeSerializer[Row] = rowInfo.createSerializer(new 
ExecutionConfig)
+
+    val testInstance = new RowSerializerTestInstance(rowSerializer, testData)
+
+    testInstance.testAll()
+  }
+
+  // 
----------------------------------------------------------------------------------------------
+
+  def createRow(f0: Any, f1: Any, f2: Any, f3: Any, f4: Any): Row = {
+    val r: Row = new Row(5)
+    r.setField(0, f0)
+    r.setField(1, f1)
+    r.setField(2, f2)
+    r.setField(3, f3)
+    r.setField(4, f4)
+    r
+  }
+}
+
+object RowSerializerTest {
+  class MyPojo() extends Serializable with Comparable[MyPojo] {
+    var name: String = null
+
+    override def compareTo(o: MyPojo): Int = {
+      if (name == null && o.name == null) {
+        0
+      }
+      else if (name == null) {
+        -1
+      }
+      else if (o.name == null) {
+        1
+      }
+      else {
+        name.compareTo(o.name)
+      }
+    }
+
+    override def equals(other: Any): Boolean = other match {
+      case that: MyPojo => compareTo(that) == 0
+      case _ => false
+    }
+  }
+}

Reply via email to