http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/package.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/package.scala
deleted file mode 100644
index f12e7a0..0000000
--- 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/package.scala
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala
-
-import com.google.common.base.Preconditions
-import org.apache.flink.api.expressions.{Row, ExpressionOperation}
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.streaming.api.scala.DataStream
-
-import scala.language.implicitConversions
-
-/**
- * == Language Integrated Queries (aka Expression Operations) ==
- *
- * Importing this package with:
- *
- * {{{
- *   import org.apache.flink.api.scala.expressions._
- * }}}
- *
- * imports implicit conversions for converting a [[DataSet]] or [[DataStream]] 
to an
- * [[ExpressionOperation]]. This can be used to perform SQL-like queries on 
data. Please have
- * a look at [[ExpressionOperation]] to see which operations are supported and
- * [[org.apache.flink.api.scala.expressions.ImplicitExpressionOperations]] to 
see how an
- * expression can be specified.
- *
- * Inside an expression operation you can use Scala Symbols to refer to field 
names. One would
- * refer to field `a` by writing `'a`. Sometimes it is necessary to manually 
confert a
- * Scala literal to an Expression Literal, in those cases use `Literal`, as in 
`Literal(3)`.
- *
- * Example:
- *
- * {{{
- *   import org.apache.flink.api.scala._
- *   import org.apache.flink.api.scala.expressions._
- *
- *   val env = ExecutionEnvironment.getExecutionEnvironment
- *   val input = env.fromElements(("Hello", 2), ("Hello", 5), ("Ciao", 3))
- *   val result = input.as('word, 'count).groupBy('word).select('word, 
'count.avg)
- *   result.print()
- *
- *   env.execute()
- * }}}
- *
- * The result of an [[ExpressionOperation]] can be converted back to the 
underlying API
- * representation using `as`:
- *
- * {{{
- *   case class Word(word: String, count: Int)
- *
- *   val result = in.select(...).as('word, 'count)
- *   val set = result.as[Word]
- * }}}
- */
-package object expressions extends ImplicitExpressionConversions {
-
-  implicit def dataSet2DataSetConversions[T](set: DataSet[T]): 
DataSetConversions[T] = {
-    new DataSetConversions[T](set, set.getType.asInstanceOf[CompositeType[T]])
-  }
-
-  implicit def expressionOperation2RowDataSet(
-      expressionOperation: ExpressionOperation[ScalaBatchTranslator]): 
DataSet[Row] = {
-    expressionOperation.as[Row]
-  }
-
-  implicit def rowDataSet2ExpressionOperation(
-      rowDataSet: DataSet[Row]): ExpressionOperation[ScalaBatchTranslator] = {
-    rowDataSet.toExpression
-  }
-
-  implicit def dataStream2DataSetConversions[T](
-      stream: DataStream[T]): DataStreamConversions[T] = {
-    new DataStreamConversions[T](
-      stream,
-      stream.getJavaStream.getType.asInstanceOf[CompositeType[T]])
-  }
-
-  implicit def expressionOperation2RowDataStream(
-      expressionOperation: ExpressionOperation[ScalaStreamingTranslator]): 
DataStream[Row] = {
-    expressionOperation.as[Row]
-  }
-
-  implicit def rowDataStream2ExpressionOperation(
-      rowDataStream: DataStream[Row]): 
ExpressionOperation[ScalaStreamingTranslator] = {
-    rowDataStream.toExpression
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala
deleted file mode 100644
index dadfe09..0000000
--- 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/PageRankExpression.scala
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.scala
-
-import org.apache.flink.api.expressions.tree.Literal
-import org.apache.flink.api.common.functions.GroupReduceFunction
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.expressions._
-import org.apache.flink.examples.java.graph.util.PageRankData
-import org.apache.flink.util.Collector
-
-import _root_.scala.collection.JavaConverters._
-
-/**
-* A basic implementation of the Page Rank algorithm using a bulk iteration.
-*
-* This implementation requires a set of pages and a set of directed links as 
input and works as
-* follows.
-*
-* In each iteration, the rank of every page is evenly distributed to all pages 
it points to. Each
-* page collects the partial ranks of all pages that point to it, sums them up, 
and applies a
-* dampening factor to the sum. The result is the new rank of the page. A new 
iteration is started
-* with the new ranks of all pages. This implementation terminates after a 
fixed number of
-* iterations. This is the Wikipedia entry for the
-* [[http://en.wikipedia.org/wiki/Page_rank Page Rank algorithm]]
-*
-* Input files are plain text files and must be formatted as follows:
-*
-*  - Pages represented as an (long) ID separated by new-line characters.
-*    For example `"1\n2\n12\n42\n63\n"` gives five pages with IDs 1, 2, 12, 
42, and 63.
-*  - Links are represented as pairs of page IDs which are separated by space  
characters. Links
-*    are separated by new-line characters.
-*    For example `"1 2\n2 12\n1 12\n42 63\n"` gives four (directed) links 
(1)->(2), (2)->(12),
-*    (1)->(12), and (42)->(63). For this simple implementation it is required 
that each page has
-*    at least one incoming and one outgoing link (a page can point to itself).
-*
-* Usage:
-* {{{
-*   PageRankBasic <pages path> <links path> <output path> <num pages> <num 
iterations>
-* }}}
-*
-* If no parameters are provided, the program is run with default data from
-* [[org.apache.flink.examples.java.graph.util.PageRankData]] and 10 iterations.
-*
-* This example shows how to use:
-*
-*  - Bulk Iterations
-*  - Expression Operations
-*/
-object PageRankExpression {
-
-  private final val DAMPENING_FACTOR: Double = 0.85
-  private final val EPSILON: Double = 0.0001
-
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-
-    // set up execution environment
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    // read input data
-    val pagesWithRanks = getPagesDataSet(env).map { p => (p, 1.0 / numPages) }
-      .as('pageId, 'rank)
-
-    val links = getLinksDataSet(env)
-
-    // build adjacency list from link input
-    val adjacencyLists = links
-      .groupBy("sourceId").reduceGroup( new GroupReduceFunction[Link, 
AdjacencyList] {
-
-        override def reduce(
-            values: _root_.java.lang.Iterable[Link],
-            out: Collector[AdjacencyList]): Unit = {
-          var outputId = -1L
-          val outputList = values.asScala map { t => outputId = t.sourceId; 
t.targetId }
-          out.collect(new AdjacencyList(outputId, outputList.toArray))
-        }
-
-      }).as('sourceId, 'targetIds)
-
-    // start iteration
-    val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
-      currentRanks =>
-        val newRanks = currentRanks.toExpression
-          // distribute ranks to target pages
-          .join(adjacencyLists).where('pageId === 'sourceId)
-          .select('rank, 'targetIds).as[RankOutput]
-          .flatMap {
-            (in, out: Collector[(Long, Double)]) =>
-              val targets = in.targetIds
-              val len = targets.length
-              targets foreach { t => out.collect((t, in.rank / len )) }
-          }
-          .as('pageId, 'rank)
-          // collect ranks and sum them up
-          .groupBy('pageId).select('pageId, 'rank.sum as 'rank)
-          // apply dampening factor
-          .select(
-            'pageId,
-            ('rank * DAMPENING_FACTOR) + (Literal(1) - DAMPENING_FACTOR) / 
numPages as 'rank)
-
-
-        val termination = currentRanks.toExpression
-          .as('curId, 'curRank).join(newRanks.as('newId, 'newRank))
-          .where('curId === 'newId && ('curRank - 'newRank).abs > EPSILON)
-
-        (newRanks, termination)
-    }
-
-    val result = finalRanks
-
-    // emit result
-    if (fileOutput) {
-      result.writeAsCsv(outputPath, "\n", " ")
-    } else {
-      result.print()
-    }
-
-    // execute program
-    env.execute("Expression PageRank Example")
-  }
-
-  // *************************************************************************
-  //     USER TYPES
-  // *************************************************************************
-
-  case class Link(sourceId: Long, targetId: Long)
-
-  case class Page(pageId: Long, rank: Double)
-
-  case class AdjacencyList(sourceId: Long, targetIds: Array[Long])
-
-  case class RankOutput(rank: Double, targetIds: Array[Long])
-
-  // *************************************************************************
-  //     UTIL METHODS
-  // *************************************************************************
-
-  private def parseParameters(args: Array[String]): Boolean = {
-    if (args.length > 0) {
-      fileOutput = true
-      if (args.length == 5) {
-        pagesInputPath = args(0)
-        linksInputPath = args(1)
-        outputPath = args(2)
-        numPages = args(3).toLong
-        maxIterations = args(4).toInt
-      } else {
-        System.err.println("Usage: PageRankBasic <pages path> <links path> 
<output path> <num " +
-          "pages> <num iterations>")
-        false
-      }
-    } else {
-      System.out.println("Executing PageRank Basic example with default 
parameters and built-in " +
-        "default data.")
-      System.out.println("  Provide parameters to read input data from files.")
-      System.out.println("  See the documentation for the correct format of 
input files.")
-      System.out.println("  Usage: PageRankBasic <pages path> <links path> 
<output path> <num " +
-        "pages> <num iterations>")
-
-      numPages = PageRankData.getNumberOfPages
-    }
-    true
-  }
-
-  private def getPagesDataSet(env: ExecutionEnvironment): DataSet[Long] = {
-    if (fileOutput) {
-      env.readCsvFile[Tuple1[Long]](pagesInputPath, fieldDelimiter = " ", 
lineDelimiter = "\n")
-        .map(x => x._1)
-    } else {
-      env.generateSequence(1, 15)
-    }
-  }
-
-  private def getLinksDataSet(env: ExecutionEnvironment): DataSet[Link] = {
-    if (fileOutput) {
-      env.readCsvFile[Link](linksInputPath, fieldDelimiter = " ",
-        includedFields = Array(0, 1))
-    } else {
-      val edges = PageRankData.EDGES.map { case Array(v1, v2) => 
Link(v1.asInstanceOf[Long],
-        v2.asInstanceOf[Long])}
-      env.fromCollection(edges)
-    }
-  }
-
-  private var fileOutput: Boolean = false
-  private var pagesInputPath: String = null
-  private var linksInputPath: String = null
-  private var outputPath: String = null
-  private var numPages: Double = 0
-  private var maxIterations: Int = 10
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala
deleted file mode 100644
index 2d1d0ec..0000000
--- 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/StreamingExpressionFilter.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.scala
-
-import org.apache.flink.streaming.api.scala._
-
-import org.apache.flink.api.scala.expressions._
-
-import scala.Stream._
-import scala.math._
-import scala.language.postfixOps
-import scala.util.Random
-
-/**
- * Simple example for demonstrating the working streaming api expression 
operations.
- */
-object StreamingExpressionFilter {
-
-  case class CarEvent(carId: Int, speed: Int, distance: Double, time: Long) 
extends Serializable
-
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-
-    val cars = genCarStream().toExpression
-      .filter('carId === 0)
-      .select('carId, 'speed, 'distance + 1000 as 'distance, 'time % 5 as 
'time)
-      .as[CarEvent]
-
-    cars.print()
-
-    
StreamExecutionEnvironment.getExecutionEnvironment.execute("TopSpeedWindowing")
-
-  }
-
-  def genCarStream(): DataStream[CarEvent] = {
-
-    def nextSpeed(carEvent : CarEvent) : CarEvent =
-    {
-      val next =
-        if (Random.nextBoolean()) min(100, carEvent.speed + 5) else max(0, 
carEvent.speed - 5)
-      CarEvent(carEvent.carId, next, carEvent.distance + 
next/3.6d,System.currentTimeMillis)
-    }
-    def carStream(speeds : Stream[CarEvent]) : Stream[CarEvent] =
-    {
-      Thread.sleep(1000)
-      speeds.append(carStream(speeds.map(nextSpeed)))
-    }
-    carStream(range(0, 
numOfCars).map(CarEvent(_,50,0,System.currentTimeMillis())))
-  }
-
-  def parseParameters(args: Array[String]): Boolean = {
-    if (args.length > 0) {
-      if (args.length == 3) {
-        numOfCars = args(0).toInt
-        evictionSec = args(1).toInt
-        triggerMeters = args(2).toDouble
-        true
-      }
-      else {
-        System.err.println("Usage: TopSpeedWindowing <numCars> <evictSec> 
<triggerMeters>")
-        false
-      }
-    }else{
-      true
-    }
-  }
-
-  var numOfCars = 2
-  var evictionSec = 10
-  var triggerMeters = 50d
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Expression.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Expression.scala
 
b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Expression.scala
deleted file mode 100644
index 1803bdd..0000000
--- 
a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Expression.scala
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.scala
-
-import org.apache.flink.api.expressions.tree.Literal
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.expressions._
-
-/**
- * This program implements a modified version of the TPC-H query 3. The
- * example demonstrates how to assign names to fields by extending the Tuple 
class.
- * The original query can be found at
- * 
[http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf)
- * (page 29).
- *
- * This program implements the following SQL equivalent:
- *
- * {{{
- * SELECT 
- *      l_orderkey, 
- *      SUM(l_extendedprice*(1-l_discount)) AS revenue,
- *      o_orderdate, 
- *      o_shippriority 
- * FROM customer, 
- *      orders, 
- *      lineitem 
- * WHERE
- *      c_mktsegment = '[SEGMENT]' 
- *      AND c_custkey = o_custkey
- *      AND l_orderkey = o_orderkey
- *      AND o_orderdate < date '[DATE]'
- *      AND l_shipdate > date '[DATE]'
- * GROUP BY
- *      l_orderkey, 
- *      o_orderdate, 
- *      o_shippriority;
- * }}}
- *
- * Compared to the original TPC-H query this version does not sort the result 
by revenue
- * and orderdate.
- *
- * Input files are plain text CSV files using the pipe character ('|') as 
field separator 
- * as generated by the TPC-H data generator which is available at 
- * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/).
- *
- * Usage: 
- * {{{
- * TPCHQuery3Expression <lineitem-csv path> <customer-csv path> <orders-csv 
path> <result path>
- * }}}
- *  
- * This example shows how to use:
- *  - Expression Operations
- *
- */
-object TPCHQuery3Expression {
-
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-
-    // set filter date
-    val dateFormat = new _root_.java.text.SimpleDateFormat("yyyy-MM-dd")
-    val date = dateFormat.parse("1995-03-12")
-    
-    // get execution environment
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    val lineitems = getLineitemDataSet(env)
-      .filter( l => dateFormat.parse(l.shipDate).after(date) )
-      .as('id, 'extdPrice, 'discount, 'shipDate)
-
-    val customers = getCustomerDataSet(env)
-      .as('id, 'mktSegment)
-      .filter( 'mktSegment === "AUTOMOBILE" )
-
-    val orders = getOrdersDataSet(env)
-      .filter( o => dateFormat.parse(o.orderDate).before(date) )
-      .as('orderId, 'custId, 'orderDate, 'shipPrio)
-
-    val items =
-      orders.join(customers)
-        .where('custId === 'id)
-        .select('orderId, 'orderDate, 'shipPrio)
-      .join(lineitems)
-        .where('orderId === 'id)
-        .select(
-          'orderId,
-          'extdPrice * (Literal(1.0f) - 'discount) as 'revenue,
-          'orderDate,
-          'shipPrio)
-
-    val result = items
-      .groupBy('orderId, 'orderDate, 'shipPrio)
-      .select('orderId, 'revenue.sum, 'orderDate, 'shipPrio)
-
-    // emit result
-    result.writeAsCsv(outputPath, "\n", "|")
-
-    // execute program
-    env.execute("Scala TPCH Query 3 (Expression) Example")
-  }
-  
-  // *************************************************************************
-  //     USER DATA TYPES
-  // *************************************************************************
-  
-  case class Lineitem(id: Long, extdPrice: Double, discount: Double, shipDate: 
String)
-  case class Customer(id: Long, mktSegment: String)
-  case class Order(orderId: Long, custId: Long, orderDate: String, shipPrio: 
Long)
-
-  // *************************************************************************
-  //     UTIL METHODS
-  // *************************************************************************
-  
-  private var lineitemPath: String = null
-  private var customerPath: String = null
-  private var ordersPath: String = null
-  private var outputPath: String = null
-
-  private def parseParameters(args: Array[String]): Boolean = {
-    if (args.length == 4) {
-      lineitemPath = args(0)
-      customerPath = args(1)
-      ordersPath = args(2)
-      outputPath = args(3)
-      true
-    } else {
-      System.err.println("This program expects data from the TPC-H benchmark 
as input data.\n" +
-          " Due to legal restrictions, we can not ship generated data.\n" +
-          " You can find the TPC-H data generator at 
http://www.tpc.org/tpch/.\n"; +
-          " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path>" + 
-                             "<orders-csv path> <result path>");
-      false
-    }
-  }
-  
-  private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] 
= {
-    env.readCsvFile[Lineitem](
-        lineitemPath,
-        fieldDelimiter = "|",
-        includedFields = Array(0, 5, 6, 10) )
-  }
-
-  private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] 
= {
-    env.readCsvFile[Customer](
-        customerPath,
-        fieldDelimiter = "|",
-        includedFields = Array(0, 6) )
-  }
-  
-  private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = {
-    env.readCsvFile[Order](
-        ordersPath,
-        fieldDelimiter = "|",
-        includedFields = Array(0, 1, 4, 7) )
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AggregationsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AggregationsITCase.java
 
b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AggregationsITCase.java
deleted file mode 100644
index 76c7fed..0000000
--- 
a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AggregationsITCase.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.expressions.test;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import org.apache.flink.api.expressions.ExpressionException;
-import org.apache.flink.api.expressions.ExpressionOperation;
-import org.apache.flink.api.expressions.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.expressions.ExpressionUtil;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.api.scala.expressions.JavaBatchTranslator;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class AggregationsITCase extends MultipleProgramsTestBase {
-
-
-       public AggregationsITCase(TestExecutionMode mode){
-               super(mode);
-       }
-
-       private String resultPath;
-       private String expected = "";
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-
-       @Test
-       public void testAggregationTypes() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               ExpressionOperation<JavaBatchTranslator> expressionOperation =
-                               
ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env));
-
-               ExpressionOperation<JavaBatchTranslator> result =
-                               expressionOperation.select("f0.sum, f0.min, 
f0.max, f0.count, f0.avg");
-
-               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "231,1,21,21,11";
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testAggregationOnNonExistingField() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               ExpressionOperation<JavaBatchTranslator> expressionOperation =
-                               
ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env));
-
-               ExpressionOperation<JavaBatchTranslator> result =
-                               expressionOperation.select("'foo.avg");
-
-               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "";
-       }
-
-       @Test
-       public void testWorkingAggregationDataTypes() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, 
String>> input =
-                               env.fromElements(
-                                               new Tuple7<Byte, Short, 
Integer, Long, Float, Double, String>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, 
"Hello"),
-                                               new Tuple7<Byte, Short, 
Integer, Long, Float, Double, String>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, 
"Ciao"));
-
-               ExpressionOperation<JavaBatchTranslator> expressionOperation =
-                               ExpressionUtil.from(input);
-
-               ExpressionOperation<JavaBatchTranslator> result =
-                               expressionOperation.select("f0.avg, f1.avg, 
f2.avg, f3.avg, f4.avg, f5.avg, f6.count");
-
-               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "1,1,1,1,1.5,1.5,2";
-       }
-
-       @Test
-       public void testAggregationWithArithmetic() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSource<Tuple2<Float, String>> input =
-                               env.fromElements(
-                                               new Tuple2<Float, String>(1f, 
"Hello"),
-                                               new Tuple2<Float, String>(2f, 
"Ciao"));
-
-               ExpressionOperation<JavaBatchTranslator> expressionOperation =
-                               ExpressionUtil.from(input);
-
-               ExpressionOperation<JavaBatchTranslator> result =
-                               expressionOperation.select("(f0 + 2).avg + 2, 
f1.count + \" THE COUNT\"");
-
-
-               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "5.5,2 THE COUNT";
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testNonWorkingDataTypes() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSource<Tuple2<Float, String>> input = env.fromElements(new 
Tuple2<Float, String>(1f,
-                               "Hello"));
-
-               ExpressionOperation<JavaBatchTranslator> expressionOperation =
-                               ExpressionUtil.from(input);
-
-               ExpressionOperation<JavaBatchTranslator> result =
-                               expressionOperation.select("f1.sum");
-
-
-               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "";
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testNoNestedAggregation() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSource<Tuple2<Float, String>> input = env.fromElements(new 
Tuple2<Float, String>(1f, "Hello"));
-
-               ExpressionOperation<JavaBatchTranslator> expressionOperation =
-                               ExpressionUtil.from(input);
-
-               ExpressionOperation<JavaBatchTranslator> result =
-                               expressionOperation.select("f0.sum.sum");
-
-
-               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "";
-       }
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AsITCase.java
 
b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AsITCase.java
deleted file mode 100644
index 3b69be0..0000000
--- 
a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/AsITCase.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.expressions.test;
-
-import org.apache.flink.api.expressions.ExpressionException;
-import org.apache.flink.api.expressions.ExpressionOperation;
-import org.apache.flink.api.expressions.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.expressions.ExpressionUtil;
-import org.apache.flink.api.scala.expressions.JavaBatchTranslator;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class AsITCase extends MultipleProgramsTestBase {
-
-
-       public AsITCase(TestExecutionMode mode){
-               super(mode);
-       }
-
-       private String resultPath;
-       private String expected = "";
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-
-       @Test
-       public void testAs() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               ExpressionOperation<JavaBatchTranslator> expressionOperation =
-                               
ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env), "a, b, c");
-
-               DataSet<Row> ds = ExpressionUtil.toSet(expressionOperation, 
Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + 
"4,3,Hello world, " +
-                               "how are you?\n" + "5,3,I am fine.\n" + 
"6,3,Luke Skywalker\n" + "7,4," +
-                               "Comment#1\n" + "8,4,Comment#2\n" + 
"9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
-                               "Comment#5\n" + "12,5,Comment#6\n" + 
"13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
-                               "Comment#9\n" + "16,6,Comment#10\n" + 
"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
-                               "6,Comment#13\n" + "20,6,Comment#14\n" + 
"21,6,Comment#15\n";
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testAsWithToFewFields() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               ExpressionOperation<JavaBatchTranslator> expressionOperation =
-                               
ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env), "a, b");
-
-               DataSet<Row> ds = ExpressionUtil.toSet(expressionOperation, 
Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "";
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testAsWithToManyFields() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               ExpressionOperation<JavaBatchTranslator> expressionOperation =
-                               
ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d");
-
-               DataSet<Row> ds = ExpressionUtil.toSet(expressionOperation, 
Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "";
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testAsWithAmbiguousFields() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               ExpressionOperation<JavaBatchTranslator> expressionOperation =
-                               
ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env), "a, b, b");
-
-               DataSet<Row> ds = ExpressionUtil.toSet(expressionOperation, 
Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "";
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testAsWithNonFieldReference1() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               ExpressionOperation<JavaBatchTranslator> expressionOperation =
-                               
ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c");
-
-               DataSet<Row> ds = ExpressionUtil.toSet(expressionOperation, 
Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "";
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testAsWithNonFieldReference2() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               ExpressionOperation<JavaBatchTranslator> expressionOperation =
-                               
ExpressionUtil.from(CollectionDataSets.get3TupleDataSet(env), "a as foo, b, c");
-
-               DataSet<Row> ds = ExpressionUtil.toSet(expressionOperation, 
Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "";
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/CastingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/CastingITCase.java
 
b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/CastingITCase.java
deleted file mode 100644
index b4d1159..0000000
--- 
a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/CastingITCase.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.expressions.test;
-
-import org.apache.flink.api.expressions.ExpressionOperation;
-import org.apache.flink.api.expressions.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.expressions.ExpressionUtil;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.api.scala.expressions.JavaBatchTranslator;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class CastingITCase extends MultipleProgramsTestBase {
-
-
-       public CastingITCase(TestExecutionMode mode){
-               super(mode);
-       }
-
-       private String resultPath;
-       private String expected = "";
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-
-       @Test
-       public void testAutoCastToString() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, 
String>> input =
-                               env.fromElements(new Tuple7<Byte, Short, 
Integer, Long, Float, Double, String>(
-                                               (byte) 1, (short) 1, 1, 1L, 
1.0f, 1.0d, "Hello"));
-
-               ExpressionOperation<JavaBatchTranslator> expressionOperation =
-                               ExpressionUtil.from(input);
-
-               ExpressionOperation<JavaBatchTranslator> result = 
expressionOperation.select(
-                               "f0 + 'b', f1 + 's', f2 + 'i', f3 + 'L', f4 + 
'f', f5 + \"d\"");
-
-               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "1b,1s,1i,1L,1.0f,1.0d";
-       }
-
-       @Test
-       public void testNumericAutocastInArithmetic() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, 
String>> input =
-                               env.fromElements(new Tuple7<Byte, Short, 
Integer, Long, Float, Double, String>(
-                                               (byte) 1, (short) 1, 1, 1L, 
1.0f, 1.0d, "Hello"));
-
-               ExpressionOperation<JavaBatchTranslator> expressionOperation =
-                               ExpressionUtil.from(input);
-
-               ExpressionOperation<JavaBatchTranslator> result = 
expressionOperation.select("f0 + 1, f1 +" +
-                               " 1, f2 + 1L, f3 + 1.0f, f4 + 1.0d, f5 + 1");
-
-               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "2,2,2,2.0,2.0,2.0";
-       }
-
-       @Test
-       public void testNumericAutocastInComparison() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, 
String>> input =
-                               env.fromElements(
-                                               new Tuple7<Byte, Short, 
Integer, Long, Float, Double, String>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, 
"Hello"),
-                                               new Tuple7<Byte, Short, 
Integer, Long, Float, Double, String>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, 
"Hello"));
-
-               ExpressionOperation<JavaBatchTranslator> expressionOperation =
-                               ExpressionUtil.from(input, "a,b,c,d,e,f,g");
-
-               ExpressionOperation<JavaBatchTranslator> result = 
expressionOperation
-                               .filter("a > 1 && b > 1 && c > 1L && d > 1.0f 
&& e > 1.0d && f > 1");
-
-               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "2,2,2,2,2.0,2.0,Hello";
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/ExpressionsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/ExpressionsITCase.java
 
b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/ExpressionsITCase.java
deleted file mode 100644
index 5c3a92a..0000000
--- 
a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/ExpressionsITCase.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.expressions.test;
-
-import org.apache.flink.api.expressions.ExpressionException;
-import org.apache.flink.api.expressions.ExpressionOperation;
-import org.apache.flink.api.expressions.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.expressions.ExpressionUtil;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.api.scala.expressions.JavaBatchTranslator;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class ExpressionsITCase extends MultipleProgramsTestBase {
-
-
-       public ExpressionsITCase(TestExecutionMode mode){
-               super(mode);
-       }
-
-       private String resultPath;
-       private String expected = "";
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-
-       @Test
-       public void testArithmetic() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSource<Tuple2<Integer, Integer>> input =
-                               env.fromElements(new Tuple2<Integer, 
Integer>(5, 10));
-
-               ExpressionOperation<JavaBatchTranslator> expressionOperation =
-                               ExpressionUtil.from(input, "a, b");
-
-               ExpressionOperation<JavaBatchTranslator> result = 
expressionOperation.select(
-                               "a - 5, a + 5, a / 2, a * 2, a % 2, -a");
-
-               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "0,10,2,10,1,-5";
-       }
-
-       @Test
-       public void testLogic() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSource<Tuple2<Integer, Boolean>> input =
-                               env.fromElements(new Tuple2<Integer, 
Boolean>(5, true));
-
-               ExpressionOperation<JavaBatchTranslator> expressionOperation =
-                               ExpressionUtil.from(input, "a, b");
-
-               ExpressionOperation<JavaBatchTranslator> result = 
expressionOperation.select(
-                               "b && true, b && false, b || false, !b");
-
-               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "true,false,true,false";
-       }
-
-       @Test
-       public void testComparisons() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSource<Tuple3<Integer, Integer, Integer>> input =
-                               env.fromElements(new Tuple3<Integer, Integer, 
Integer>(5, 5, 4));
-
-               ExpressionOperation<JavaBatchTranslator> expressionOperation =
-                               ExpressionUtil.from(input, "a, b, c");
-
-               ExpressionOperation<JavaBatchTranslator> result = 
expressionOperation.select(
-                               "a > c, a >= b, a < c, a.isNull, a.isNotNull");
-
-               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "true,true,false,false,true";
-       }
-
-       @Test
-       public void testBitwiseOperation() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSource<Tuple2<Byte, Byte>> input =
-                               env.fromElements(new Tuple2<Byte, Byte>((byte) 
3, (byte) 5));
-
-               ExpressionOperation<JavaBatchTranslator> expressionOperation =
-                               ExpressionUtil.from(input, "a, b");
-
-               ExpressionOperation<JavaBatchTranslator> result = 
expressionOperation.select(
-                               "a & b, a | b, a ^ b, ~a");
-
-               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "1,7,6,-4";
-       }
-
-       @Test
-       public void testBitwiseWithAutocast() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSource<Tuple2<Integer, Byte>> input =
-                               env.fromElements(new Tuple2<Integer, Byte>(3, 
(byte) 5));
-
-               ExpressionOperation<JavaBatchTranslator> expressionOperation =
-                               ExpressionUtil.from(input, "a, b");
-
-               ExpressionOperation<JavaBatchTranslator> result = 
expressionOperation.select(
-                               "a & b, a | b, a ^ b, ~a");
-
-               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "1,7,6,-4";
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testBitwiseWithNonWorkingAutocast() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               DataSource<Tuple2<Float, Byte>> input =
-                               env.fromElements(new Tuple2<Float, Byte>(3.0f, 
(byte) 5));
-
-               ExpressionOperation<JavaBatchTranslator> expressionOperation =
-                               ExpressionUtil.from(input, "a, b");
-
-               ExpressionOperation<JavaBatchTranslator> result =
-                               expressionOperation.select("a & b, a | b, a ^ 
b, ~a");
-
-               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "";
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/FilterITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/FilterITCase.java
 
b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/FilterITCase.java
deleted file mode 100644
index 7da5fa3..0000000
--- 
a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/FilterITCase.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.expressions.test;
-
-import org.apache.flink.api.expressions.ExpressionException;
-import org.apache.flink.api.expressions.ExpressionOperation;
-import org.apache.flink.api.expressions.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.expressions.ExpressionUtil;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.scala.expressions.JavaBatchTranslator;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class FilterITCase extends MultipleProgramsTestBase {
-
-
-       public FilterITCase(TestExecutionMode mode){
-               super(mode);
-       }
-
-       private String resultPath;
-       private String expected = "";
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-
-       @Test
-       public void testAllRejectingFilter() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
-
-               ExpressionOperation<JavaBatchTranslator> expressionOperation =
-                               ExpressionUtil.from(input, "a, b, c");
-
-               ExpressionOperation<JavaBatchTranslator> result = 
expressionOperation
-                               .filter("false");
-
-               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "\n";
-       }
-
-       @Test
-       public void testAllPassingFilter() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
-
-               ExpressionOperation<JavaBatchTranslator> expressionOperation =
-                               ExpressionUtil.from(input, "a, b, c");
-
-               ExpressionOperation<JavaBatchTranslator> result = 
expressionOperation
-                               .filter("true");
-
-               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + 
"4,3,Hello world, " +
-                               "how are you?\n" + "5,3,I am fine.\n" + 
"6,3,Luke Skywalker\n" + "7,4," +
-                               "Comment#1\n" + "8,4,Comment#2\n" + 
"9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
-                               "Comment#5\n" + "12,5,Comment#6\n" + 
"13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
-                               "Comment#9\n" + "16,6,Comment#10\n" + 
"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
-                               "6,Comment#13\n" + "20,6,Comment#14\n" + 
"21,6,Comment#15\n";
-       }
-
-       @Test
-       public void testFilterOnIntegerTupleField() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
-
-               ExpressionOperation<JavaBatchTranslator> expressionOperation =
-                               ExpressionUtil.from(input, "a, b, c");
-
-               ExpressionOperation<JavaBatchTranslator> result = 
expressionOperation
-                               .filter(" a % 2 === 0 ");
-
-               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + 
"6,3,Luke Skywalker\n" + "8,4," +
-                               "Comment#2\n" + "10,4,Comment#4\n" + 
"12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
-                               "Comment#10\n" + "18,6,Comment#12\n" + 
"20,6,Comment#14\n";
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/GroupedAggregationsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/GroupedAggregationsITCase.java
 
b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/GroupedAggregationsITCase.java
deleted file mode 100644
index 8141dea..0000000
--- 
a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/GroupedAggregationsITCase.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.expressions.test;
-
-import org.apache.flink.api.expressions.ExpressionException;
-import org.apache.flink.api.expressions.ExpressionOperation;
-import org.apache.flink.api.expressions.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.expressions.ExpressionUtil;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.scala.expressions.JavaBatchTranslator;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class GroupedAggregationsITCase extends MultipleProgramsTestBase {
-
-
-       public GroupedAggregationsITCase(TestExecutionMode mode){
-               super(mode);
-       }
-
-       private String resultPath;
-       private String expected = "";
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception{
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception{
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testGroupingOnNonExistentField() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
-
-               ExpressionOperation<JavaBatchTranslator> expressionOperation =
-                               ExpressionUtil.from(input, "a, b, c");
-
-               ExpressionOperation<JavaBatchTranslator> result = 
expressionOperation
-                               .groupBy("foo").select("a.avg");
-
-               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "";
-       }
-
-       @Test
-       public void testGroupedAggregate() throws Exception {
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
-
-               ExpressionOperation<JavaBatchTranslator> expressionOperation =
-                               ExpressionUtil.from(input, "a, b, c");
-
-               ExpressionOperation<JavaBatchTranslator> result = 
expressionOperation
-                               .groupBy("b").select("b, a.sum");
-
-               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + 
"6,111\n";
-       }
-
-       @Test
-       public void testGroupingKeyForwardIfNotUsed() throws Exception {
-
-               // the grouping key needs to be forwarded to the intermediate 
DataSet, even
-               // if we don't want the key in the output
-
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<Tuple3<Integer, Long, String>> input = 
CollectionDataSets.get3TupleDataSet(env);
-
-               ExpressionOperation<JavaBatchTranslator> expressionOperation =
-                               ExpressionUtil.from(input, "a, b, c");
-
-               ExpressionOperation<JavaBatchTranslator> result = 
expressionOperation
-                               .groupBy("b").select("a.sum");
-
-               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n";
-       }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/JoinITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/JoinITCase.java
 
b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/JoinITCase.java
deleted file mode 100644
index 3ece3dc..0000000
--- 
a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/JoinITCase.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.expressions.test;
-
-import org.apache.flink.api.expressions.ExpressionException;
-import org.apache.flink.api.expressions.ExpressionOperation;
-import org.apache.flink.api.expressions.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.expressions.ExpressionUtil;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.scala.expressions.JavaBatchTranslator;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class JoinITCase extends MultipleProgramsTestBase {
-
-
-       public JoinITCase(TestExecutionMode mode) {
-               super(mode);
-       }
-
-       private String resultPath;
-       private String expected = "";
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception {
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception {
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-
-       @Test
-       public void testJoin() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-
-               ExpressionOperation<JavaBatchTranslator> in1 = 
ExpressionUtil.from(ds1, "a, b, c");
-               ExpressionOperation<JavaBatchTranslator> in2 = 
ExpressionUtil.from(ds2, "d, e, f, g, h");
-
-               ExpressionOperation<JavaBatchTranslator> result = 
in1.join(in2).where("b === e").select("c, g");
-
-               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello 
world,Hallo Welt\n";
-       }
-
-       @Test
-       public void testJoinWithFilter() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-
-               ExpressionOperation<JavaBatchTranslator> in1 = 
ExpressionUtil.from(ds1, "a, b, c");
-               ExpressionOperation<JavaBatchTranslator> in2 = 
ExpressionUtil.from(ds2, "d, e, f, g, h");
-
-               ExpressionOperation<JavaBatchTranslator> result = 
in1.join(in2).where("b === e && b < 2").select("c, g");
-
-               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "Hi,Hallo\n";
-       }
-
-       @Test
-       public void testJoinWithMultipleKeys() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.get3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-
-               ExpressionOperation<JavaBatchTranslator> in1 = 
ExpressionUtil.from(ds1, "a, b, c");
-               ExpressionOperation<JavaBatchTranslator> in2 = 
ExpressionUtil.from(ds2, "d, e, f, g, h");
-
-               ExpressionOperation<JavaBatchTranslator> result = 
in1.join(in2).where("a === d && b === h").select("c, g");
-
-               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello 
world,Hallo Welt wie gehts?\n" +
-                               "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I 
am fine.,IJK\n";
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testJoinNonExistingKey() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-
-               ExpressionOperation<JavaBatchTranslator> in1 = 
ExpressionUtil.from(ds1, "a, b, c");
-               ExpressionOperation<JavaBatchTranslator> in2 = 
ExpressionUtil.from(ds2, "d, e, f, g, h");
-
-               ExpressionOperation<JavaBatchTranslator> result = 
in1.join(in2).where("foo === e").select("c, g");
-
-               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "";
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testJoinWithNonMatchingKeyTypes() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-
-               ExpressionOperation<JavaBatchTranslator> in1 = 
ExpressionUtil.from(ds1, "a, b, c");
-               ExpressionOperation<JavaBatchTranslator> in2 = 
ExpressionUtil.from(ds2, "d, e, f, g, h");
-
-               ExpressionOperation<JavaBatchTranslator> result = in1
-                               .join(in2).where("a === g").select("c, g");
-
-               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "";
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testJoinWithAmbiguousFields() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-
-               ExpressionOperation<JavaBatchTranslator> in1 = 
ExpressionUtil.from(ds1, "a, b, c");
-               ExpressionOperation<JavaBatchTranslator> in2 = 
ExpressionUtil.from(ds2, "d, e, f, g, c");
-
-               ExpressionOperation<JavaBatchTranslator> result = in1
-                               .join(in2).where("a === d").select("c, g");
-
-               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "";
-       }
-
-       @Test
-       public void testJoinWithAggregation() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<Tuple3<Integer, Long, String>> ds1 = 
CollectionDataSets.getSmall3TupleDataSet(env);
-               DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = 
CollectionDataSets.get5TupleDataSet(env);
-
-               ExpressionOperation<JavaBatchTranslator> in1 = 
ExpressionUtil.from(ds1, "a, b, c");
-               ExpressionOperation<JavaBatchTranslator> in2 = 
ExpressionUtil.from(ds2, "d, e, f, g, h");
-
-               ExpressionOperation<JavaBatchTranslator> result = in1
-                               .join(in2).where("a === d").select("g.count");
-
-               DataSet<Row> ds = ExpressionUtil.toSet(result, Row.class);
-               ds.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "6";
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/SelectITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/SelectITCase.java
 
b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/SelectITCase.java
deleted file mode 100644
index 89ec2e5..0000000
--- 
a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/SelectITCase.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.expressions.test;
-
-import org.apache.flink.api.expressions.ExpressionException;
-import org.apache.flink.api.expressions.ExpressionOperation;
-import org.apache.flink.api.expressions.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.expressions.ExpressionUtil;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.scala.expressions.JavaBatchTranslator;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class SelectITCase extends MultipleProgramsTestBase {
-
-
-       public SelectITCase(TestExecutionMode mode) {
-               super(mode);
-       }
-
-       private String resultPath;
-       private String expected = "";
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception {
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception {
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-
-       @Test
-       public void testSimpleSelectAllWithAs() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-
-               ExpressionOperation<JavaBatchTranslator> in = 
ExpressionUtil.from(ds, "a,b,c");
-
-               ExpressionOperation<JavaBatchTranslator> result = in
-                               .select("a, b, c");
-
-               DataSet<Row> resultSet = ExpressionUtil.toSet(result, 
Row.class);
-               resultSet.writeAsText(resultPath, 
FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + 
"4,3,Hello world, " +
-                               "how are you?\n" + "5,3,I am fine.\n" + 
"6,3,Luke Skywalker\n" + "7,4," +
-                               "Comment#1\n" + "8,4,Comment#2\n" + 
"9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
-                               "Comment#5\n" + "12,5,Comment#6\n" + 
"13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
-                               "Comment#9\n" + "16,6,Comment#10\n" + 
"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
-                               "6,Comment#13\n" + "20,6,Comment#14\n" + 
"21,6,Comment#15\n";
-
-       }
-
-       @Test
-       public void testSimpleSelectWithNaming() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-
-               ExpressionOperation<JavaBatchTranslator> in = 
ExpressionUtil.from(ds);
-
-               ExpressionOperation<JavaBatchTranslator> result = in
-                               .select("f0 as a, f1 as b")
-                               .select("a, b");
-
-               DataSet<Row> resultSet = ExpressionUtil.toSet(result, 
Row.class);
-               resultSet.writeAsText(resultPath, 
FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + 
"6,3\n" + "7,4\n" +
-                               "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + 
"12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
-                               "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + 
"20,6\n" + "21,6\n";
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testAsWithToFewFields() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-
-               ExpressionOperation<JavaBatchTranslator> in = 
ExpressionUtil.from(ds, "a, b");
-
-               DataSet<Row> resultSet = ExpressionUtil.toSet(in, Row.class);
-               resultSet.writeAsText(resultPath, 
FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = " sorry dude ";
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testAsWithToManyFields() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-
-               ExpressionOperation<JavaBatchTranslator> in = 
ExpressionUtil.from(ds, "a, b, c, d");
-
-               DataSet<Row> resultSet = ExpressionUtil.toSet(in, Row.class);
-               resultSet.writeAsText(resultPath, 
FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = " sorry dude ";
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testAsWithAmbiguousFields() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-
-               ExpressionOperation<JavaBatchTranslator> in = 
ExpressionUtil.from(ds, "a, b, c, b");
-
-               DataSet<Row> resultSet = ExpressionUtil.toSet(in, Row.class);
-               resultSet.writeAsText(resultPath, 
FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = " today's not your day ";
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testOnlyFieldRefInAs() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<Tuple3<Integer, Long, String>> ds = 
CollectionDataSets.get3TupleDataSet(env);
-
-               ExpressionOperation<JavaBatchTranslator> in = 
ExpressionUtil.from(ds, "a, b as c, d");
-
-               DataSet<Row> resultSet = ExpressionUtil.toSet(in, Row.class);
-               resultSet.writeAsText(resultPath, 
FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "sorry bro";
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/StringExpressionsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/StringExpressionsITCase.java
 
b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/StringExpressionsITCase.java
deleted file mode 100644
index f9f1c6b..0000000
--- 
a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/java/expressions/test/StringExpressionsITCase.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.expressions.test;
-
-import org.apache.flink.api.expressions.ExpressionException;
-import org.apache.flink.api.expressions.ExpressionOperation;
-import org.apache.flink.api.expressions.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.expressions.ExpressionUtil;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.scala.expressions.JavaBatchTranslator;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class StringExpressionsITCase extends MultipleProgramsTestBase {
-
-
-       public StringExpressionsITCase(TestExecutionMode mode) {
-               super(mode);
-       }
-
-       private String resultPath;
-       private String expected = "";
-
-       @Rule
-       public TemporaryFolder tempFolder = new TemporaryFolder();
-
-       @Before
-       public void before() throws Exception {
-               resultPath = tempFolder.newFile().toURI().toString();
-       }
-
-       @After
-       public void after() throws Exception {
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-
-       @Test
-       public void testSubstring() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<Tuple2<String, Integer>> ds = env.fromElements(
-                               new Tuple2<String, Integer>("AAAA", 2),
-                               new Tuple2<String, Integer>("BBBB", 1));
-
-               ExpressionOperation<JavaBatchTranslator> in = 
ExpressionUtil.from(ds, "a, b");
-
-               ExpressionOperation<JavaBatchTranslator> result = in
-                               .select("a.substring(0, b)");
-
-               DataSet<Row> resultSet = ExpressionUtil.toSet(result, 
Row.class);
-               resultSet.writeAsText(resultPath, 
FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "AA\nB";
-       }
-
-       @Test
-       public void testSubstringWithMaxEnd() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<Tuple2<String, Integer>> ds = env.fromElements(
-                               new Tuple2<String, Integer>("ABCD", 2),
-                               new Tuple2<String, Integer>("ABCD", 1));
-
-               ExpressionOperation<JavaBatchTranslator> in = 
ExpressionUtil.from(ds, "a, b");
-
-               ExpressionOperation<JavaBatchTranslator> result = in
-                               .select("a.substring(b)");
-
-               DataSet<Row> resultSet = ExpressionUtil.toSet(result, 
Row.class);
-               resultSet.writeAsText(resultPath, 
FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "CD\nBCD";
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testNonWorkingSubstring1() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<Tuple2<String, Float>> ds = env.fromElements(
-                               new Tuple2<String, Float>("ABCD", 2.0f),
-                               new Tuple2<String, Float>("ABCD", 1.0f));
-
-               ExpressionOperation<JavaBatchTranslator> in = 
ExpressionUtil.from(ds, "a, b");
-
-               ExpressionOperation<JavaBatchTranslator> result = in
-                               .select("a.substring(0, b)");
-
-               DataSet<Row> resultSet = ExpressionUtil.toSet(result, 
Row.class);
-               resultSet.writeAsText(resultPath, 
FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "";
-       }
-
-       @Test(expected = ExpressionException.class)
-       public void testNonWorkingSubstring2() throws Exception {
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-               DataSet<Tuple2<String, String>> ds = env.fromElements(
-                               new Tuple2<String, String>("ABCD", "a"),
-                               new Tuple2<String, String>("ABCD", "b"));
-
-               ExpressionOperation<JavaBatchTranslator> in = 
ExpressionUtil.from(ds, "a, b");
-
-               ExpressionOperation<JavaBatchTranslator> result = in
-                               .select("a.substring(b, 15)");
-
-               DataSet<Row> resultSet = ExpressionUtil.toSet(result, 
Row.class);
-               resultSet.writeAsText(resultPath, 
FileSystem.WriteMode.OVERWRITE);
-
-               env.execute();
-
-               expected = "";
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/scala/expressions/test/PageRankExpressionITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/scala/expressions/test/PageRankExpressionITCase.java
 
b/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/scala/expressions/test/PageRankExpressionITCase.java
deleted file mode 100644
index b75a2ee..0000000
--- 
a/flink-staging/flink-expressions/src/test/java/org/apache/flink/api/scala/expressions/test/PageRankExpressionITCase.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.flink.api.scala.expressions.test;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.examples.scala.PageRankExpression;
-import org.apache.flink.test.testdata.PageRankData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedList;
-
-@RunWith(Parameterized.class)
-public class PageRankExpressionITCase extends JavaProgramTestBase {
-
-       private static int NUM_PROGRAMS = 2;
-
-       private int curProgId = config.getInteger("ProgramId", -1);
-
-       private String verticesPath;
-       private String edgesPath;
-       private String resultPath;
-       private String expectedResult;
-
-       public PageRankExpressionITCase(Configuration config) {
-               super(config);
-       }
-
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-               verticesPath = createTempFile("vertices.txt", 
PageRankData.VERTICES);
-               edgesPath = createTempFile("edges.txt", PageRankData.EDGES);
-       }
-
-       @Override
-       protected void testProgram() throws Exception {
-               expectedResult = runProgram(curProgId);
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               compareKeyValueParisWithDelta(expectedResult, resultPath, " ", 
0.01);
-       }
-
-       @Parameters
-       public static Collection<Object[]> getConfigurations() throws 
FileNotFoundException, IOException {
-
-               LinkedList<Configuration> tConfigs = new 
LinkedList<Configuration>();
-
-               for(int i=1; i <= NUM_PROGRAMS; i++) {
-                       Configuration config = new Configuration();
-                       config.setInteger("ProgramId", i);
-                       tConfigs.add(config);
-               }
-
-               return toParameterList(tConfigs);
-       }
-
-
-       public String runProgram(int progId) throws Exception {
-
-               switch(progId) {
-               case 1: {
-                       PageRankExpression.main(new String[]{verticesPath, 
edgesPath, resultPath, PageRankData
-                                       .NUM_VERTICES + "", "3"});
-                       return PageRankData.RANKS_AFTER_3_ITERATIONS;
-               }
-               case 2: {
-                       // start with a very high number of iteration such that 
the dynamic convergence criterion must handle termination
-                       PageRankExpression.main(new String[] {verticesPath, 
edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "1000"});
-                       return 
PageRankData.RANKS_AFTER_EPSILON_0_0001_CONVERGENCE;
-               }
-
-               default:
-                       throw new IllegalArgumentException("Invalid program 
id");
-               }
-       }
-}

Reply via email to