http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
deleted file mode 100644
index 2cbd8fa..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala
+++ /dev/null
@@ -1,238 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.parser
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.table.ExpressionException
-import org.apache.flink.api.table.plan.As
-import org.apache.flink.api.table.expressions._
-
-import scala.util.parsing.combinator.{PackratParsers, JavaTokenParsers}
-
-/**
- * Parser for expressions inside a String. This parses exactly the same 
expressions that
- * would be accepted by the Scala Expression DSL.
- *
- * See [[org.apache.flink.api.scala.table.ImplicitExpressionConversions]] and
- * [[org.apache.flink.api.scala.table.ImplicitExpressionOperations]] for the 
constructs
- * available in the Scala Expression DSL. This parser must be kept in sync 
with the Scala DSL
- * lazy valined in the above files.
- */
-object ExpressionParser extends JavaTokenParsers with PackratParsers {
-  case class Keyword(key: String)
-
-  // Convert the keyword into an case insensitive Parser
-  implicit def keyword2Parser(kw: Keyword): Parser[String] = {
-    ("""(?i)\Q""" + kw.key + """\E""").r
-  }
-
-  // KeyWord
-
-  lazy val AS: Keyword = Keyword("as")
-  lazy val COUNT: Keyword = Keyword("count")
-  lazy val AVG: Keyword = Keyword("avg")
-  lazy val MIN: Keyword = Keyword("min")
-  lazy val MAX: Keyword = Keyword("max")
-  lazy val SUM: Keyword = Keyword("sum")
-
-  // Literals
-
-  lazy val numberLiteral: PackratParser[Expression] =
-    ((wholeNumber <~ ("L" | "l")) | floatingPointNumber | decimalNumber | 
wholeNumber) ^^ {
-      str =>
-        if (str.endsWith("L") || str.endsWith("l")) {
-          Literal(str.toLong)
-        } else if (str.matches("""-?\d+""")) {
-          Literal(str.toInt)
-        } else if (str.endsWith("f") | str.endsWith("F")) {
-          Literal(str.toFloat)
-        } else {
-          Literal(str.toDouble)
-        }
-    }
-
-  lazy val singleQuoteStringLiteral: Parser[Expression] =
-    ("'" + """([^'\p{Cntrl}\\]|\\[\\'"bfnrt]|\\u[a-fA-F0-9]{4})*""" + "'").r 
^^ {
-      str => Literal(str.substring(1, str.length - 1))
-    }
-
-  lazy val stringLiteralFlink: PackratParser[Expression] = super.stringLiteral 
^^ {
-    str => Literal(str.substring(1, str.length - 1))
-  }
-
-  lazy val boolLiteral: PackratParser[Expression] = ("true" | "false") ^^ {
-    str => Literal(str.toBoolean)
-  }
-
-  lazy val literalExpr: PackratParser[Expression] =
-    numberLiteral |
-      stringLiteralFlink | singleQuoteStringLiteral |
-      boolLiteral
-
-  lazy val fieldReference: PackratParser[Expression] = ident ^^ {
-    case sym => UnresolvedFieldReference(sym)
-  }
-
-  lazy val atom: PackratParser[Expression] =
-    ( "(" ~> expression <~ ")" ) | literalExpr | fieldReference
-
-  // suffix ops
-  lazy val isNull: PackratParser[Expression] = atom <~ ".isNull" ^^ { e => 
IsNull(e) }
-  lazy val isNotNull: PackratParser[Expression] = atom <~ ".isNotNull" ^^ { e 
=> IsNotNull(e) }
-
-  lazy val abs: PackratParser[Expression] = atom <~ ".abs" ^^ { e => Abs(e) }
-
-  lazy val sum: PackratParser[Expression] =
-    (atom <~ ".sum" ^^ { e => Sum(e) }) | (SUM ~ "(" ~> atom <~ ")" ^^ { e => 
Sum(e) })
-  lazy val min: PackratParser[Expression] =
-    (atom <~ ".min" ^^ { e => Min(e) }) | (MIN ~ "(" ~> atom <~ ")" ^^ { e => 
Min(e) })
-  lazy val max: PackratParser[Expression] =
-    (atom <~ ".max" ^^ { e => Max(e) }) | (MAX ~ "(" ~> atom <~ ")" ^^ { e => 
Max(e) })
-  lazy val count: PackratParser[Expression] =
-    (atom <~ ".count" ^^ { e => Count(e) }) | (COUNT ~ "(" ~> atom <~ ")" ^^ { 
e => Count(e) })
-  lazy val avg: PackratParser[Expression] =
-    (atom <~ ".avg" ^^ { e => Avg(e) }) | (AVG ~ "(" ~> atom <~ ")" ^^ { e => 
Avg(e) })
-
-  lazy val cast: PackratParser[Expression] =
-    atom <~ ".cast(BYTE)" ^^ { e => Cast(e, BasicTypeInfo.BYTE_TYPE_INFO) } |
-    atom <~ ".cast(SHORT)" ^^ { e => Cast(e, BasicTypeInfo.SHORT_TYPE_INFO) } |
-    atom <~ ".cast(INT)" ^^ { e => Cast(e, BasicTypeInfo.INT_TYPE_INFO) } |
-    atom <~ ".cast(LONG)" ^^ { e => Cast(e, BasicTypeInfo.LONG_TYPE_INFO) } |
-    atom <~ ".cast(FLOAT)" ^^ { e => Cast(e, BasicTypeInfo.FLOAT_TYPE_INFO) } |
-    atom <~ ".cast(DOUBLE)" ^^ { e => Cast(e, BasicTypeInfo.DOUBLE_TYPE_INFO) 
} |
-    atom <~ ".cast(BOOL)" ^^ { e => Cast(e, BasicTypeInfo.BOOLEAN_TYPE_INFO) } 
|
-    atom <~ ".cast(BOOLEAN)" ^^ { e => Cast(e, 
BasicTypeInfo.BOOLEAN_TYPE_INFO) } |
-    atom <~ ".cast(STRING)" ^^ { e => Cast(e, BasicTypeInfo.STRING_TYPE_INFO) 
} |
-    atom <~ ".cast(DATE)" ^^ { e => Cast(e, BasicTypeInfo.DATE_TYPE_INFO) }
-
-  lazy val as: PackratParser[Expression] = atom ~ ".as(" ~ fieldReference ~ 
")" ^^ {
-    case e ~ _ ~ as ~ _ => Naming(e, as.name)
-  }
-
-  lazy val substring: PackratParser[Expression] =
-    atom ~ ".substring(" ~ expression ~ "," ~ expression ~ ")" ^^ {
-      case e ~ _ ~ from ~ _ ~ to ~ _ => Substring(e, from, to)
-
-    }
-
-  lazy val substringWithoutEnd: PackratParser[Expression] =
-    atom ~ ".substring(" ~ expression ~ ")" ^^ {
-      case e ~ _ ~ from ~ _ => Substring(e, from, Literal(Integer.MAX_VALUE))
-
-    }
-
-  lazy val suffix =
-    isNull | isNotNull |
-      abs | sum | min | max | count | avg | cast |
-      substring | substringWithoutEnd | atom
-
-
-  // unary ops
-
-  lazy val unaryNot: PackratParser[Expression] = "!" ~> suffix ^^ { e => 
Not(e) }
-
-  lazy val unaryMinus: PackratParser[Expression] = "-" ~> suffix ^^ { e => 
UnaryMinus(e) }
-
-  lazy val unaryBitwiseNot: PackratParser[Expression] = "~" ~> suffix ^^ { e 
=> BitwiseNot(e) }
-
-  lazy val unary = unaryNot | unaryMinus | unaryBitwiseNot | suffix
-
-  // binary bitwise opts
-
-  lazy val binaryBitwise = unary * (
-    "&" ^^^ { (a:Expression, b:Expression) => BitwiseAnd(a,b) } |
-      "|" ^^^ { (a:Expression, b:Expression) => BitwiseOr(a,b) } |
-      "^" ^^^ { (a:Expression, b:Expression) => BitwiseXor(a,b) } )
-
-  // arithmetic
-
-  lazy val product = binaryBitwise * (
-    "*" ^^^ { (a:Expression, b:Expression) => Mul(a,b) } |
-      "/" ^^^ { (a:Expression, b:Expression) => Div(a,b) } |
-      "%" ^^^ { (a:Expression, b:Expression) => Mod(a,b) } )
-
-  lazy val term = product * (
-    "+" ^^^ { (a:Expression, b:Expression) => Plus(a,b) } |
-     "-" ^^^ { (a:Expression, b:Expression) => Minus(a,b) } )
-
-  // Comparison
-
-  lazy val equalTo: PackratParser[Expression] = term ~ ("===" | "=") ~ term ^^ 
{
-    case l ~ _ ~ r => EqualTo(l, r)
-  }
-
-  lazy val notEqualTo: PackratParser[Expression] = term ~ ("!==" | "!=" | 
"<>") ~ term ^^ {
-    case l ~ _ ~ r => NotEqualTo(l, r)
-  }
-
-  lazy val greaterThan: PackratParser[Expression] = term ~ ">" ~ term ^^ {
-    case l ~ _ ~ r => GreaterThan(l, r)
-  }
-
-  lazy val greaterThanOrEqual: PackratParser[Expression] = term ~ ">=" ~ term 
^^ {
-    case l ~ _ ~ r => GreaterThanOrEqual(l, r)
-  }
-
-  lazy val lessThan: PackratParser[Expression] = term ~ "<" ~ term ^^ {
-    case l ~ _ ~ r => LessThan(l, r)
-  }
-
-  lazy val lessThanOrEqual: PackratParser[Expression] = term ~ "<=" ~ term ^^ {
-    case l ~ _ ~ r => LessThanOrEqual(l, r)
-  }
-
-  lazy val comparison: PackratParser[Expression] =
-      equalTo | notEqualTo |
-      greaterThan | greaterThanOrEqual |
-      lessThan | lessThanOrEqual | term
-
-  // logic
-
-  lazy val logic = comparison * (
-    "&&" ^^^ { (a:Expression, b:Expression) => And(a,b) } |
-      "||" ^^^ { (a:Expression, b:Expression) => Or(a,b) } )
-
-  // alias
-
-  lazy val alias: PackratParser[Expression] = logic ~ AS ~ fieldReference ^^ {
-    case e ~ _ ~ name => Naming(e, name.name)
-  } | logic
-
-  lazy val expression: PackratParser[Expression] = alias
-
-  lazy val expressionList: Parser[List[Expression]] = rep1sep(expression, ",")
-
-  def parseExpressionList(expression: String): List[Expression] = {
-    parseAll(expressionList, expression) match {
-      case Success(lst, _) => lst
-
-      case Failure(msg, _) => throw new ExpressionException("Could not parse 
expression: " + msg)
-
-      case Error(msg, _) => throw new ExpressionException("Could not parse 
expression: " + msg)
-    }
-  }
-
-  def parseExpression(exprString: String): Expression = {
-    parseAll(expression, exprString) match {
-      case Success(lst, _) => lst
-
-      case fail =>
-        throw new ExpressionException("Could not parse expression: " + 
fail.toString)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/ExpandAggregations.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/ExpandAggregations.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/ExpandAggregations.scala
deleted file mode 100644
index 2e09f39..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/ExpandAggregations.scala
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.plan
-
-import org.apache.flink.api.table.expressions.analysis.SelectionAnalyzer
-import org.apache.flink.api.table.expressions._
-import org.apache.flink.api.java.aggregation.Aggregations
-
-import scala.collection.mutable
-
-/**
- * This is used to expand a [[Select]] that contains aggregations. If it is 
called on a [[Select]]
- * without aggregations it is simply returned.
- *
- * This select:
- * {{{
- *   in.select('key, 'value.avg)
- * }}}
- *
- * is transformed to this expansion:
- * {{{
- *   in
- *     .select('key, 'value, Literal(1) as 'intermediate.1)
- *     .aggregate('value.sum, 'intermediate.1.sum)
- *     .select('key, 'value / 'intermediate.1)
- * }}}
- *
- * If the input of the [[Select]] is a [[GroupBy]] this is preserved before 
the aggregation.
- */
-object ExpandAggregations {
-  def apply(select: Select): PlanNode = select match {
-    case Select(input, selection) =>
-
-      val aggregations = mutable.HashMap[(Expression, Aggregations), String]()
-      val intermediateFields = mutable.HashSet[Expression]()
-      val aggregationIntermediates = mutable.HashMap[Aggregation, 
Seq[Expression]]()
-
-      var intermediateCount = 0
-      var resultCount = 0
-      selection foreach {  f =>
-        f.transformPre {
-          case agg: Aggregation =>
-            val intermediateReferences = 
agg.getIntermediateFields.zip(agg.getAggregations) map {
-              case (expr, basicAgg) =>
-                resultCount += 1
-                val resultName = s"result.$resultCount"
-                aggregations.get((expr, basicAgg)) match {
-                  case Some(intermediateName) =>
-                    Naming(ResolvedFieldReference(intermediateName, 
expr.typeInfo), resultName)
-                  case None =>
-                    intermediateCount = intermediateCount + 1
-                    val intermediateName = s"intermediate.$intermediateCount"
-                    intermediateFields += Naming(expr, intermediateName)
-                    aggregations((expr, basicAgg)) = intermediateName
-                    Naming(ResolvedFieldReference(intermediateName, 
expr.typeInfo), resultName)
-                }
-            }
-
-            aggregationIntermediates(agg) = intermediateReferences
-            // Return a NOP so that we don't add the children of the 
aggregation
-            // to intermediate fields. We already added the necessary fields 
to the list
-            // of intermediate fields.
-            NopExpression()
-
-          case fa: ResolvedFieldReference =>
-            if (!fa.name.startsWith("intermediate")) {
-              intermediateFields += Naming(fa, fa.name)
-            }
-            fa
-        }
-      }
-
-      if (aggregations.isEmpty) {
-        // no aggregations, just return
-        return select
-      }
-
-      // also add the grouping keys to the set of intermediate fields, because 
we use a Set,
-      // they are only added when not already present
-      input match {
-        case GroupBy(_, groupingFields) =>
-          groupingFields foreach {
-            case fa: ResolvedFieldReference =>
-              intermediateFields += Naming(fa, fa.name)
-          }
-        case _ => // Nothing to add
-      }
-
-      val basicAggregations = aggregations.map {
-        case ((expr, basicAgg), fieldName) =>
-          (fieldName, basicAgg)
-      }
-
-      val finalFields = selection.map {  f =>
-        f.transformPre {
-          case agg: Aggregation =>
-            val intermediates = aggregationIntermediates(agg)
-            agg.getFinalField(intermediates)
-        }
-      }
-
-      val intermediateAnalyzer = new SelectionAnalyzer(input.outputFields)
-      val analyzedIntermediates = 
intermediateFields.toSeq.map(intermediateAnalyzer.analyze)
-
-      val finalAnalyzer =
-        new SelectionAnalyzer(analyzedIntermediates.map(e => (e.name, 
e.typeInfo)))
-      val analyzedFinals = finalFields.map(finalAnalyzer.analyze)
-
-      val result = input match {
-        case GroupBy(groupByInput, groupingFields) =>
-          Select(
-            Aggregate(
-              GroupBy(
-                Select(groupByInput, analyzedIntermediates),
-                groupingFields),
-              basicAggregations.toSeq),
-            analyzedFinals)
-
-        case _ =>
-          Select(
-            Aggregate(
-              Select(input, analyzedIntermediates),
-              basicAggregations.toSeq),
-            analyzedFinals)
-
-      }
-
-      result
-
-    case _ => select
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
deleted file mode 100644
index ba8aba4..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/PlanTranslator.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.plan
-
-import java.lang.reflect.Modifier
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.parser.ExpressionParser
-import org.apache.flink.api.table.expressions.{Expression, Naming, 
ResolvedFieldReference, UnresolvedFieldReference}
-import org.apache.flink.api.table.typeinfo.RowTypeInfo
-import org.apache.flink.api.table.{ExpressionException, Table}
-
-import scala.language.reflectiveCalls
-
-/**
- * Base class for translators that transform the logical plan in a [[Table]] 
to an executable
- * Flink plan and also for creating a [[Table]] from a DataSet or DataStream.
- */
-abstract class PlanTranslator {
-
-  type Representation[A] <: { def getType(): TypeInformation[A] }
-
-  /**
-   * Translates the given Table API [[PlanNode]] back to the underlying 
representation, i.e,
-   * a DataSet or a DataStream.
-   */
-  def translate[A](op: PlanNode)(implicit tpe: TypeInformation[A]): 
Representation[A]
-
-  /**
-   * Creates a [[Table]] from a DataSet or a DataStream (the underlying 
representation).
-   */
-  def createTable[A](
-      repr: Representation[A],
-      inputType: CompositeType[A],
-      expressions: Array[Expression],
-      resultFields: Seq[(String, TypeInformation[_])]): Table
-
-  /**
-   * Creates a [[Table]] from the given DataSet or DataStream.
-   */
-  def createTable[A](repr: Representation[A]): Table = {
-
-    val fields = repr.getType() match {
-      case c: CompositeType[A] => c.getFieldNames.map(UnresolvedFieldReference)
-
-      case tpe => Array() // createTable will throw an exception for this later
-    }
-    createTable(
-      repr,
-      fields.toArray.asInstanceOf[Array[Expression]],
-      checkDeterministicFields = false)
-  }
-
-  /**
-   * Creates a [[Table]] from the given DataSet or DataStream while only 
taking those
-   * fields mentioned in the field expression.
-   */
-  def createTable[A](repr: Representation[A], expression: String): Table = {
-
-    val fields = ExpressionParser.parseExpressionList(expression)
-
-    createTable(repr, fields.toArray, checkDeterministicFields = true)
-  }
-
-  /**
-   * Creates a [[Table]] from the given DataSet or DataStream while only 
taking those
-   * fields mentioned in the fields parameter.
-   *
-   * When checkDeterministicFields is true check whether the fields of the 
underlying
-   * [[TypeInformation]] have a deterministic ordering. This is only the case 
for Tuples
-   * and Case classes. For a POJO, the field order is not obvious, this can 
lead to problems
-   * when a user renames fields and assumes a certain ordering.
-   */
-  def createTable[A](
-      repr: Representation[A],
-      fields: Array[Expression],
-      checkDeterministicFields: Boolean = true): Table = {
-
-    // shortcut for DataSet[Row] or DataStream[Row]
-    repr.getType() match {
-      case rowTypeInfo: RowTypeInfo =>
-        val expressions = rowTypeInfo.getFieldNames map {
-          name => (name, rowTypeInfo.getTypeAt(name))
-        }
-        new Table(
-          Root(repr, expressions))
-
-      case c: CompositeType[A] => // us ok
-
-      case tpe => throw new ExpressionException("Only DataSets or DataStreams 
of composite type" +
-        "can be transformed to a Table. These would be tuples, case classes 
and " +
-        "POJOs. Type is: " + tpe)
-
-    }
-
-    val clazz = repr.getType().getTypeClass
-    if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers))
-        || clazz.getCanonicalName() == null) {
-      throw new ExpressionException("Cannot create Table from DataSet or 
DataStream of type " +
-        clazz.getName + ". Only top-level classes or static members classes " +
-        " are supported.")
-    }
-
-    val inputType = repr.getType().asInstanceOf[CompositeType[A]]
-
-    if (!inputType.hasDeterministicFieldOrder && checkDeterministicFields) {
-      throw new ExpressionException(s"You cannot rename fields upon Table 
creation: " +
-        s"Field order of input type $inputType is not deterministic." )
-    }
-
-    if (fields.length != inputType.getFieldNames.length) {
-      throw new ExpressionException("Number of selected fields: '" + 
fields.mkString(",") +
-        "' and number of fields in input type " + inputType + " do not match.")
-    }
-
-    val newFieldNames = fields map {
-      case UnresolvedFieldReference(name) => name
-      case e =>
-        throw new ExpressionException("Only field references allowed in 'as' 
operation, " +
-          " offending expression: " + e)
-    }
-
-    if (newFieldNames.toSet.size != newFieldNames.size) {
-      throw new ExpressionException(s"Ambiguous field names in 
${fields.mkString(", ")}")
-    }
-
-    val resultFields: Seq[(String, TypeInformation[_])] = 
newFieldNames.zipWithIndex map {
-      case (name, index) => (name, inputType.getTypeAt(index))
-    }
-
-    val inputFields = inputType.getFieldNames
-    val fieldMappings = inputFields.zip(resultFields)
-    val expressions: Array[Expression] = fieldMappings map {
-      case (oldName, (newName, tpe)) => Naming(ResolvedFieldReference(oldName, 
tpe), newName)
-    }
-
-    createTable(repr, inputType, expressions, resultFields)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala
deleted file mode 100644
index 7ec34d7..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/operations.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.plan
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.aggregation.Aggregations
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.table.trees.TreeNode
-
-/**
- * Base class for all Table API operations.
- */
-sealed abstract class PlanNode extends TreeNode[PlanNode] { self: Product =>
-  def outputFields: Seq[(String, TypeInformation[_])]
-}
-
-/**
- * Operation that transforms a [[org.apache.flink.api.scala.DataSet]] or
- * [[org.apache.flink.streaming.api.scala.DataStream]] into a 
[[org.apache.flink.api.table.Table]].
- */
-case class Root[T](input: T, outputFields: Seq[(String, TypeInformation[_])]) 
extends PlanNode {
-  val children = Nil
-  override def toString = s"Root($outputFields)"
-}
-
-/**
- * Operation that joins two [[org.apache.flink.api.table.Table]]s. A "filter" 
and a "select"
- * should be applied after a join operation.
- */
-case class Join(left: PlanNode, right: PlanNode) extends PlanNode {
-
-  val children = Seq(left, right)
-
-  def outputFields = left.outputFields ++ right.outputFields
-
-  override def toString = s"Join($left, $right)"
-}
-
-/**
- * Operation that filters out elements that do not match the predicate 
expression.
- */
-case class Filter(input: PlanNode, predicate: Expression) extends PlanNode {
-
-  val children = Seq(input)
-
-  def outputFields = input.outputFields
-
-  override def toString = s"Filter($input, $predicate)"
-}
-
-/**
- * Selection expression. Similar to an SQL SELECT statement. The expressions 
can select fields
- * and perform arithmetic or logic operations. The expressions can also 
perform aggregates
- * on fields.
- */
-case class Select(input: PlanNode, selection: Seq[Expression]) extends 
PlanNode {
-
-  val children = Seq(input)
-
-  def outputFields = selection.toSeq map { e => (e.name, e.typeInfo) }
-
-  override def toString = s"Select($input, ${selection.mkString(",")})"
-}
-
-/**
- * Operation that gives new names to fields. Use this to disambiguate fields 
before a join
- * operation.
- */
-case class As(input: PlanNode, names: Seq[String]) extends PlanNode {
-
-  val children = Seq(input)
-
-  val outputFields = input.outputFields.zip(names) map {
-    case ((_, tpe), newName) => (newName, tpe)
-  }
-
-  override def toString = s"As($input, ${names.mkString(",")})"
-}
-
-/**
- * Grouping operation. Keys are specified using field references. A group by 
operation os only
- * useful when performing a select with aggregates afterwards.
- * @param input
- * @param fields
- */
-case class GroupBy(input: PlanNode, fields: Seq[Expression]) extends PlanNode {
-
-  val children = Seq(input)
-
-  def outputFields = input.outputFields
-
-  override def toString = s"GroupBy($input, ${fields.mkString(",")})"
-}
-
-/**
- * Internal operation. Selection operations containing aggregates are expanded 
to an [[Aggregate]]
- * and a simple [[Select]].
- */
-case class Aggregate(
-    input: PlanNode,
-    aggregations: Seq[(String, Aggregations)]) extends PlanNode {
-
-  val children = Seq(input)
-
-  def outputFields = input.outputFields
-
-  override def toString = s"Aggregate($input, ${aggregations.mkString(",")})"
-}
-
-/**
- * UnionAll operation, union all elements from left and right.
- */
-case class UnionAll(left: PlanNode, right: PlanNode) extends PlanNode{
-  val children = Seq(left, right)
-
-  def outputFields = left.outputFields
-
-  override def toString = s"Union($left, $right)"
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala
deleted file mode 100644
index a598483..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/plan/package.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-/**
- * The operations in this package are created by calling methods on [[Table]] 
they
- * should not be manually created by users of the API.
- */
-package object plan

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala
deleted file mode 100644
index 932f9df..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime
-
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.common.functions.RichGroupReduceFunction
-import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable
-import org.apache.flink.api.java.aggregation.AggregationFunction
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.util.Collector
-
-@Combinable
-class ExpressionAggregateFunction(
-    private val fieldPositions: Seq[Int],
-    private val functions: Seq[AggregationFunction[Any]])
-  extends RichGroupReduceFunction[Row, Row] {
-
-  override def open(conf: Configuration): Unit = {
-    var i = 0
-    val len = functions.length
-    while (i < len) {
-      functions(i).initializeAggregate()
-      i += 1
-    }
-  }
-
-  override def reduce(in: java.lang.Iterable[Row], out: Collector[Row]): Unit 
= {
-
-    val fieldPositions = this.fieldPositions
-    val functions = this.functions
-
-    var current: Row = null
-
-    val values = in.iterator()
-    while (values.hasNext) {
-      current = values.next()
-
-      var i = 0
-      val len = functions.length
-      while (i < len) {
-        functions(i).aggregate(current.productElement(fieldPositions(i)))
-        i += 1
-      }
-    }
-
-    var i = 0
-    val len = functions.length
-    while (i < len) {
-      current.setField(fieldPositions(i), functions(i).getAggregate)
-      functions(i).initializeAggregate()
-      i += 1
-    }
-
-    out.collect(current)
-  }
-
-}
-
-@Combinable
-class NoExpressionAggregateFunction() extends RichGroupReduceFunction[Row, 
Row] {
-
-  override def reduce(in: java.lang.Iterable[Row], out: Collector[Row]): Unit 
= {
-
-    var first: Row = null
-
-    val values = in.iterator()
-    if (values.hasNext) {
-      first = values.next()
-    }
-
-    out.collect(first)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
deleted file mode 100644
index 4e50272..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime
-
-import org.apache.flink.api.common.functions.{FilterFunction, 
RichFilterFunction}
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.TableConfig
-import org.apache.flink.api.table.codegen.GenerateFilter
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.configuration.Configuration
-
-/**
- * Proxy function that takes an expression predicate. This is compiled
- * upon runtime and calls to [[filter()]] are forwarded to the compiled code.
- */
-class ExpressionFilterFunction[T](
-    predicate: Expression,
-    inputType: CompositeType[T],
-    config: TableConfig = TableConfig.DEFAULT) extends RichFilterFunction[T] {
-
-  var compiledFilter: FilterFunction[T] = null
-
-  override def open(c: Configuration): Unit = {
-    if (compiledFilter == null) {
-      val codegen = new GenerateFilter[T](
-        inputType,
-        predicate,
-        getRuntimeContext.getUserCodeClassLoader,
-        config)
-      compiledFilter = codegen.generate()
-    }
-  }
-
-  override def filter(in: T) = compiledFilter.filter(in)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
deleted file mode 100644
index cf2c90f..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime
-
-import org.apache.flink.api.common.functions.{FlatJoinFunction, 
RichFlatJoinFunction}
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.TableConfig
-import org.apache.flink.api.table.codegen.GenerateJoin
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.util.Collector
-
-/**
- * Proxy function that takes an expression predicate and output fields. These 
are compiled
- * upon runtime and calls to [[join()]] are forwarded to the compiled code.
- */
-class ExpressionJoinFunction[L, R, O](
-    predicate: Expression,
-    leftType: CompositeType[L],
-    rightType: CompositeType[R],
-    resultType: CompositeType[O],
-    outputFields: Seq[Expression],
-    config: TableConfig = TableConfig.DEFAULT) extends RichFlatJoinFunction[L, 
R, O] {
-
-  var compiledJoin: FlatJoinFunction[L, R, O] = null
-
-  override def open(c: Configuration): Unit = {
-    val codegen = new GenerateJoin[L, R, O](
-      leftType,
-      rightType,
-      resultType,
-      predicate,
-      outputFields,
-      getRuntimeContext.getUserCodeClassLoader,
-      config)
-    compiledJoin = codegen.generate()
-  }
-
-  def join(left: L, right: R, out: Collector[O]) = {
-    compiledJoin.join(left, right, out)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
deleted file mode 100644
index ab7adb1..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.runtime
-
-import org.apache.flink.api.table.TableConfig
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
-import org.apache.flink.api.common.typeutils.CompositeType
-import org.apache.flink.api.table.codegen.GenerateSelect
-import org.apache.flink.configuration.Configuration
-
-/**
- * Proxy function that takes expressions. These are compiled
- * upon runtime and calls to [[map()]] are forwarded to the compiled code.
- */
-class ExpressionSelectFunction[I, O](
-     inputType: CompositeType[I],
-     resultType: CompositeType[O],
-     outputFields: Seq[Expression],
-     config: TableConfig = TableConfig.DEFAULT) extends RichMapFunction[I, O] {
-
-  var compiledSelect: MapFunction[I, O] = null
-
-  override def open(c: Configuration): Unit = {
-
-    if (compiledSelect == null) {
-      val resultCodegen = new GenerateSelect[I, O](
-        inputType,
-        resultType,
-        outputFields,
-        getRuntimeContext.getUserCodeClassLoader,
-        config)
-
-      compiledSelect = resultCodegen.generate()
-    }
-  }
-
-  def map(in: I): O = {
-    compiledSelect.map(in)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala
deleted file mode 100644
index a1bc4b7..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table
-
-/**
- * The functions in this package are used transforming Table API operations to 
Java API operations.
- */
-package object runtime

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Analyzer.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Analyzer.scala
deleted file mode 100644
index 87051cf..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Analyzer.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.trees
-
-/**
- * Base class for tree analyzers/transformers. Analyzers must implement method 
`rules` to
- * provide the chain of rules that are invoked one after another. The tree 
resulting
- * from one rule is fed into the next rule and the final result is returned 
from method `analyze`.
- */
-abstract class Analyzer[A <: TreeNode[A]] {
-
-  def rules: Seq[Rule[A]]
-
-  final def analyze(expr: A): A = {
-    var currentTree = expr
-    for (rule <- rules) {
-      var running = true
-      while (running) {
-        val newTree = rule(currentTree)
-        if (newTree fastEquals currentTree) {
-          running = false
-        }
-        currentTree = newTree
-      }
-    }
-    currentTree
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Rule.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Rule.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Rule.scala
deleted file mode 100644
index b8a27cb..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/Rule.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.trees
-
-/**
- * Base class for a rule that is part of an [[Analyzer]] rule chain. Method 
`rule` gets a tree
- * and must return a tree. The returned tree can also be the input tree. In an 
[[Analyzer]]
- * rule chain the result tree of one [[Rule]] is fed into the next [[Rule]] in 
the chain.
- *
- * A [[Rule]] is repeatedly applied to a tree until the tree does not change 
between
- * rule applications.
- */
-abstract class Rule[A <: TreeNode[A]] {
-  def apply(expr: A): A
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
deleted file mode 100644
index 84f1d7e..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/trees/TreeNode.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.trees
-
-/**
- * Generic base class for trees that can be transformed and traversed.
- */
-abstract class TreeNode[A <: TreeNode[A]] { self: A with Product =>
-
-  /**
-   * List of child nodes that should be considered when doing transformations. 
Other values
-   * in the Product will not be transformed, only handed through.
-   */
-  def children: Seq[A]
-
-  /**
-   * Tests for equality by first testing for reference equality.
-   */
-  def fastEquals(other: TreeNode[_]): Boolean = this.eq(other) || this == other
-
-  def transformPre(rule: PartialFunction[A, A]): A = {
-    val afterTransform = rule.applyOrElse(this, identity[A])
-
-    if (afterTransform fastEquals this) {
-      this.transformChildrenPre(rule)
-    } else {
-      afterTransform.transformChildrenPre(rule)
-    }
-  }
-
-  def transformChildrenPre(rule: PartialFunction[A, A]): A = {
-    var changed = false
-    val newArgs = productIterator map {
-      case child: A if children.contains(child) =>
-        val newChild = child.transformPre(rule)
-        if (newChild fastEquals child) {
-          child
-        } else {
-          changed = true
-          newChild
-        }
-      case other: AnyRef => other
-      case null => null
-    } toArray
-
-    if (changed) makeCopy(newArgs) else this
-  }
-
-  def transformPost(rule: PartialFunction[A, A]): A = {
-    val afterChildren = transformChildrenPost(rule)
-    if (afterChildren fastEquals this) {
-      rule.applyOrElse(this, identity[A])
-    } else {
-      rule.applyOrElse(afterChildren, identity[A])
-    }
-  }
-
-  def transformChildrenPost(rule: PartialFunction[A, A]): A = {
-    var changed = false
-    val newArgs = productIterator map {
-      case child: A if children.contains(child) =>
-        val newChild = child.transformPost(rule)
-        if (newChild fastEquals child) {
-          child
-        } else {
-          changed = true
-          newChild
-        }
-      case other: AnyRef => other
-      case null => null
-    } toArray
-    // toArray forces evaluation, toSeq does not seem to work here
-
-    if (changed) makeCopy(newArgs) else this
-  }
-
-  def exists(predicate: A => Boolean): Boolean = {
-    var exists = false
-    this.transformPre {
-      case e: A => if (predicate(e)) {
-        exists = true
-      }
-        e
-    }
-    exists
-  }
-
-  /**
-   * Creates a new copy of this expression with new children. This is used 
during transformation
-   * if children change. This must be overridden by tree nodes that don't have 
the Constructor
-   * arguments in the same order as the `children`.
-   */
-  def makeCopy(newArgs: Seq[AnyRef]): this.type = {
-    val defaultCtor =
-      this.getClass.getConstructors.find { _.getParameterTypes.size > 0}.head
-    try {
-      defaultCtor.newInstance(newArgs.toArray: _*).asInstanceOf[this.type]
-    } catch {
-      case iae: IllegalArgumentException =>
-        println("IAE " + this)
-        throw new RuntimeException("Should never happen.")
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala
deleted file mode 100644
index 3b5459b..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.typeinfo
-
-import org.apache.flink.api.common.operators.Operator
-import org.apache.flink.api.java.operators.SingleInputOperator
-import org.apache.flink.api.java.{DataSet => JavaDataSet}
-
-/**
- * This is a logical operator that can hold a [[RenamingProxyTypeInfo]] for 
renaming some
- * fields of a [[org.apache.flink.api.common.typeutils.CompositeType]]. At 
runtime this
- * disappears since the translation methods simply returns the input.
- */
-class RenameOperator[T](
-    input: JavaDataSet[T],
-    renamingTypeInformation: RenamingProxyTypeInfo[T])
-  extends SingleInputOperator[T, T, RenameOperator[T]](input, 
renamingTypeInformation) {
-
-  override protected def translateToDataFlow(
-      input: Operator[T]): Operator[T] = input
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala
deleted file mode 100644
index dd598ab..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenamingProxyTypeInfo.scala
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.typeinfo
-
-import java.util
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import 
org.apache.flink.api.common.typeutils.CompositeType.{TypeComparatorBuilder,
-FlatFieldDescriptor}
-import org.apache.flink.api.common.typeutils.{CompositeType, TypeSerializer}
-
-/**
- * A TypeInformation that is used to rename fields of an underlying 
CompositeType. This
- * allows the system to translate "as" Table API operations to a 
[[RenameOperator]]
- * that does not get translated to a runtime operator.
- */
-class RenamingProxyTypeInfo[T](
-    val tpe: CompositeType[T],
-    val fieldNames: Array[String])
-  extends CompositeType[T](tpe.getTypeClass) {
-
-  def getUnderlyingType: CompositeType[T] = tpe
-
-  if (tpe.getArity != fieldNames.length) {
-    throw new IllegalArgumentException(s"Number of field names 
'${fieldNames.mkString(",")}' and " +
-      s"number of fields in underlying type $tpe do not match.")
-  }
-
-  if (fieldNames.toSet.size != fieldNames.length) {
-    throw new IllegalArgumentException(s"New field names must be unique. " +
-      s"Names: ${fieldNames.mkString(",")}.")
-  }
-
-  override def getFieldIndex(fieldName: String): Int = {
-    val result = fieldNames.indexOf(fieldName)
-    if (result != fieldNames.lastIndexOf(fieldName)) {
-      -2
-    } else {
-      result
-    }
-  }
-  override def getFieldNames: Array[String] = fieldNames
-
-  override def isBasicType: Boolean = tpe.isBasicType
-
-  override def createSerializer(executionConfig: ExecutionConfig): 
TypeSerializer[T] =
-    tpe.createSerializer(executionConfig)
-
-  override def getArity: Int = tpe.getArity
-
-  override def isKeyType: Boolean = tpe.isKeyType
-
-  override def getTypeClass: Class[T] = tpe.getTypeClass
-
-  override def getGenericParameters: java.util.List[TypeInformation[_]] = 
tpe.getGenericParameters
-
-  override def isTupleType: Boolean = tpe.isTupleType
-
-  override def toString = {
-    s"RenamingType(type: ${tpe.getTypeClass.getSimpleName}; " +
-      s"fields: ${fieldNames.mkString(", ")})"
-  }
-
-  override def getTypeAt[X](pos: Int): TypeInformation[X] = tpe.getTypeAt(pos)
-
-  override def getTotalFields: Int = tpe.getTotalFields
-
-  override def createComparator(
-        logicalKeyFields: Array[Int],
-        orders: Array[Boolean],
-        logicalFieldOffset: Int,
-        executionConfig: ExecutionConfig) =
-    tpe.createComparator(logicalKeyFields, orders, logicalFieldOffset, 
executionConfig)
-
-  override def getFlatFields(
-      fieldExpression: String,
-      offset: Int,
-      result: util.List[FlatFieldDescriptor]): Unit = {
-    tpe.getFlatFields(fieldExpression, offset, result)
-  }
-
-  override def getTypeAt[X](fieldExpression: String): TypeInformation[X] = {
-    tpe.getTypeAt(fieldExpression)
-  }
-
-  override protected def createTypeComparatorBuilder(): 
TypeComparatorBuilder[T] = {
-    throw new RuntimeException("This method should never be called because 
createComparator is " +
-      "overwritten.")
-  }
-
-  override def equals(obj: Any): Boolean = {
-    obj match {
-      case renamingProxy: RenamingProxyTypeInfo[_] =>
-        renamingProxy.canEqual(this) &&
-        tpe.equals(renamingProxy.tpe) &&
-        fieldNames.sameElements(renamingProxy.fieldNames)
-      case _ => false
-    }
-  }
-
-  override def hashCode(): Int = {
-    31 * tpe.hashCode() + 
util.Arrays.hashCode(fieldNames.asInstanceOf[Array[AnyRef]])
-  }
-
-  override def canEqual(obj: Any): Boolean = {
-    obj.isInstanceOf[RenamingProxyTypeInfo[_]]
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
deleted file mode 100644
index 5e9613d..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.typeinfo
-
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.core.memory.{DataOutputView, DataInputView}
-
-/**
- * Serializer for [[Row]].
- */
-class RowSerializer(val fieldSerializers: Array[TypeSerializer[Any]])
-  extends TypeSerializer[Row] {
-
-  override def isImmutableType: Boolean = false
-
-  override def getLength: Int = -1
-
-  override def duplicate = this
-
-  override def createInstance: Row = {
-    new Row(fieldSerializers.length)
-  }
-
-  override def copy(from: Row, reuse: Row): Row = {
-    val len = fieldSerializers.length
-
-    if (from.productArity != len) {
-      throw new RuntimeException("Row arity of reuse and from do not match.")
-    }
-    var i = 0
-    while (i < len) {
-      val reuseField = reuse.productElement(i)
-      val fromField = from.productElement(i).asInstanceOf[AnyRef]
-      val copy = fieldSerializers(i).copy(fromField, reuseField)
-      reuse.setField(i, copy)
-      i += 1
-    }
-    reuse
-  }
-
-  override def copy(from: Row): Row = {
-    val len = fieldSerializers.length
-
-    if (from.productArity != len) {
-      throw new RuntimeException("Row arity of reuse and from do not match.")
-    }
-    val result = new Row(len)
-    var i = 0
-    while (i < len) {
-      val fromField = from.productElement(i).asInstanceOf[AnyRef]
-      val copy = fieldSerializers(i).copy(fromField)
-      result.setField(i, copy)
-      i += 1
-    }
-    result
-  }
-
-  override def serialize(value: Row, target: DataOutputView) {
-    val len = fieldSerializers.length
-    var i = 0
-    while (i < len) {
-      val serializer = fieldSerializers(i)
-      serializer.serialize(value.productElement(i), target)
-      i += 1
-    }
-  }
-
-  override def deserialize(reuse: Row, source: DataInputView): Row = {
-    val len = fieldSerializers.length
-
-    if (reuse.productArity != len) {
-      throw new RuntimeException("Row arity of reuse and fields do not match.")
-    }
-
-    var i = 0
-    while (i < len) {
-      val field = reuse.productElement(i).asInstanceOf[AnyRef]
-      reuse.setField(i, fieldSerializers(i).deserialize(field, source))
-      i += 1
-    }
-    reuse
-  }
-
-  override def deserialize(source: DataInputView): Row = {
-    val len = fieldSerializers.length
-
-    val result = new Row(len)
-    var i = 0
-    while (i < len) {
-      result.setField(i, fieldSerializers(i).deserialize(source))
-      i += 1
-    }
-    result
-  }
-
-  override def copy(source: DataInputView, target: DataOutputView): Unit = {
-    val len = fieldSerializers.length
-    var i = 0
-    while (i < len) {
-      fieldSerializers(i).copy(source, target)
-      i += 1
-    }
-  }
-
-  override def equals(any: scala.Any): Boolean = {
-    any match {
-      case otherRS: RowSerializer =>
-        otherRS.canEqual(this) &&
-          fieldSerializers.sameElements(otherRS.fieldSerializers)
-      case _ => false
-    }
-  }
-
-  override def canEqual(obj: scala.Any): Boolean = {
-    obj.isInstanceOf[RowSerializer]
-  }
-
-  override def hashCode(): Int = {
-    java.util.Arrays.hashCode(fieldSerializers.asInstanceOf[Array[AnyRef]])
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala
deleted file mode 100644
index db3c881..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowTypeInfo.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.table.typeinfo
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.table.Row
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.api.table.expressions.Expression
-import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo}
-
-/**
- * TypeInformation for [[Row]].
- */
-class RowTypeInfo(
-    fieldTypes: Seq[TypeInformation[_]],
-    fieldNames: Seq[String])
-  extends CaseClassTypeInfo[Row](classOf[Row], Array(), fieldTypes, 
fieldNames) {
-
-  def this(fields: Seq[Expression]) = this(fields.map(_.typeInfo), 
fields.map(_.name))
-
-  if (fieldNames.toSet.size != fieldNames.size) {
-    throw new IllegalArgumentException("Field names must be unique.")
-  }
-
-  override def createSerializer(executionConfig: ExecutionConfig): 
TypeSerializer[Row] = {
-    val fieldSerializers: Array[TypeSerializer[Any]] = new 
Array[TypeSerializer[Any]](getArity)
-    for (i <- 0 until getArity) {
-      fieldSerializers(i) = this.types(i).createSerializer(executionConfig)
-        .asInstanceOf[TypeSerializer[Any]]
-    }
-
-    new RowSerializer(fieldSerializers)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala
deleted file mode 100644
index dda6265..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/PageRankTable.scala
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.scala
-
-import org.apache.flink.api.table.expressions.Literal
-import org.apache.flink.api.common.functions.GroupReduceFunction
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.examples.java.graph.util.PageRankData
-import org.apache.flink.util.Collector
-
-import _root_.scala.collection.JavaConverters._
-
-/**
-* A basic implementation of the Page Rank algorithm using a bulk iteration.
-*
-* This implementation requires a set of pages and a set of directed links as 
input and works as
-* follows.
-*
-* In each iteration, the rank of every page is evenly distributed to all pages 
it points to. Each
-* page collects the partial ranks of all pages that point to it, sums them up, 
and applies a
-* dampening factor to the sum. The result is the new rank of the page. A new 
iteration is started
-* with the new ranks of all pages. This implementation terminates after a 
fixed number of
-* iterations. This is the Wikipedia entry for the
-* [[http://en.wikipedia.org/wiki/Page_rank Page Rank algorithm]]
-*
-* Input files are plain text files and must be formatted as follows:
-*
-*  - Pages represented as an (long) ID separated by new-line characters.
-*    For example `"1\n2\n12\n42\n63"` gives five pages with IDs 1, 2, 12, 42, 
and 63.
-*  - Links are represented as pairs of page IDs which are separated by space  
characters. Links
-*    are separated by new-line characters.
-*    For example `"1 2\n2 12\n1 12\n42 63"` gives four (directed) links 
(1)->(2), (2)->(12),
-*    (1)->(12), and (42)->(63). For this simple implementation it is required 
that each page has
-*    at least one incoming and one outgoing link (a page can point to itself).
-*
-* Usage:
-* {{{
-*   PageRankBasic <pages path> <links path> <output path> <num pages> <num 
iterations>
-* }}}
-*
-* If no parameters are provided, the program is run with default data from
-* [[org.apache.flink.examples.java.graph.util.PageRankData]] and 10 iterations.
-*
-* This example shows how to use:
-*
-*  - Bulk Iterations
-*  - Table API expressions
-*/
-object PageRankTable {
-
-  private final val DAMPENING_FACTOR: Double = 0.85
-  private final val EPSILON: Double = 0.0001
-
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-
-    // set up execution environment
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    // read input data
-    val pagesWithRanks = getPagesDataSet(env).map { p => (p, 1.0 / numPages) }
-      .as('pageId, 'rank)
-
-    val links = getLinksDataSet(env)
-
-    // build adjacency list from link input
-    val adjacencyLists = links
-      .groupBy("sourceId").reduceGroup( new GroupReduceFunction[Link, 
AdjacencyList] {
-
-        override def reduce(
-            values: _root_.java.lang.Iterable[Link],
-            out: Collector[AdjacencyList]): Unit = {
-          var outputId = -1L
-          val outputList = values.asScala map { t => outputId = t.sourceId; 
t.targetId }
-          out.collect(new AdjacencyList(outputId, outputList.toArray))
-        }
-
-      }).as('sourceId, 'targetIds)
-
-    // start iteration
-    val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
-      currentRanks =>
-        val newRanks = currentRanks.toTable
-          // distribute ranks to target pages
-          .join(adjacencyLists).where('pageId === 'sourceId)
-          .select('rank, 'targetIds).toDataSet[RankOutput]
-          .flatMap {
-            (in, out: Collector[(Long, Double)]) =>
-              val targets = in.targetIds
-              val len = targets.length
-              targets foreach { t => out.collect((t, in.rank / len )) }
-          }
-          .as('pageId, 'rank)
-          // collect ranks and sum them up
-          .groupBy('pageId).select('pageId, 'rank.sum as 'rank)
-          // apply dampening factor
-          .select(
-            'pageId,
-            ('rank * DAMPENING_FACTOR) + (Literal(1) - DAMPENING_FACTOR) / 
numPages as 'rank)
-
-
-        val termination = currentRanks.toTable
-          .as('curId, 'curRank).join(newRanks.as('newId, 'newRank))
-          .where('curId === 'newId && ('curRank - 'newRank).abs > EPSILON)
-
-        (newRanks, termination)
-    }
-
-    val result = finalRanks
-
-    // emit result
-    if (fileOutput) {
-      result.writeAsCsv(outputPath, "\n", " ")
-      // execute program
-      env.execute("Expression PageRank Example")
-    } else {
-      // execute program and print result
-      result.print()
-    }
-  }
-
-  // *************************************************************************
-  //     USER TYPES
-  // *************************************************************************
-
-  case class Link(sourceId: Long, targetId: Long)
-
-  case class Page(pageId: Long, rank: Double)
-
-  case class AdjacencyList(sourceId: Long, targetIds: Array[Long])
-
-  case class RankOutput(rank: Double, targetIds: Array[Long])
-
-  // *************************************************************************
-  //     UTIL METHODS
-  // *************************************************************************
-
-  private def parseParameters(args: Array[String]): Boolean = {
-    if (args.length > 0) {
-      fileOutput = true
-      if (args.length == 5) {
-        pagesInputPath = args(0)
-        linksInputPath = args(1)
-        outputPath = args(2)
-        numPages = args(3).toLong
-        maxIterations = args(4).toInt
-      } else {
-        System.err.println("Usage: PageRankBasic <pages path> <links path> 
<output path> <num " +
-          "pages> <num iterations>")
-        false
-      }
-    } else {
-      System.out.println("Executing PageRank Basic example with default 
parameters and built-in " +
-        "default data.")
-      System.out.println("  Provide parameters to read input data from files.")
-      System.out.println("  See the documentation for the correct format of 
input files.")
-      System.out.println("  Usage: PageRankBasic <pages path> <links path> 
<output path> <num " +
-        "pages> <num iterations>")
-
-      numPages = PageRankData.getNumberOfPages
-    }
-    true
-  }
-
-  private def getPagesDataSet(env: ExecutionEnvironment): DataSet[Long] = {
-    if (fileOutput) {
-      env.readCsvFile[Tuple1[Long]](pagesInputPath, fieldDelimiter = " ", 
lineDelimiter = "\n")
-        .map(x => x._1)
-    } else {
-      env.generateSequence(1, 15)
-    }
-  }
-
-  private def getLinksDataSet(env: ExecutionEnvironment): DataSet[Link] = {
-    if (fileOutput) {
-      env.readCsvFile[Link](linksInputPath, fieldDelimiter = " ",
-        includedFields = Array(0, 1))
-    } else {
-      val edges = PageRankData.EDGES.map { case Array(v1, v2) => 
Link(v1.asInstanceOf[Long],
-        v2.asInstanceOf[Long])}
-      env.fromCollection(edges)
-    }
-  }
-
-  private var fileOutput: Boolean = false
-  private var pagesInputPath: String = null
-  private var linksInputPath: String = null
-  private var outputPath: String = null
-  private var numPages: Double = 0
-  private var maxIterations: Int = 10
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
deleted file mode 100644
index 63dddc9..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/StreamingTableFilter.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.scala
-
-import org.apache.flink.streaming.api.scala._
-
-import org.apache.flink.api.scala.table._
-
-import scala.Stream._
-import scala.math._
-import scala.language.postfixOps
-import scala.util.Random
-
-/**
- * Simple example for demonstrating the use of the Table API with Flink 
Streaming.
- */
-object StreamingTableFilter {
-
-  case class CarEvent(carId: Int, speed: Int, distance: Double, time: Long) 
extends Serializable
-
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-
-    val cars = genCarStream().toTable
-      .filter('carId === 0)
-      .select('carId, 'speed, 'distance + 1000 as 'distance, 'time % 5 as 
'time)
-      .toDataStream[CarEvent]
-
-    cars.print()
-
-    
StreamExecutionEnvironment.getExecutionEnvironment.execute("TopSpeedWindowing")
-
-  }
-
-  def genCarStream(): DataStream[CarEvent] = {
-
-    def nextSpeed(carEvent : CarEvent) : CarEvent =
-    {
-      val next =
-        if (Random.nextBoolean()) min(100, carEvent.speed + 5) else max(0, 
carEvent.speed - 5)
-      CarEvent(carEvent.carId, next, carEvent.distance + 
next/3.6d,System.currentTimeMillis)
-    }
-    def carStream(speeds : Stream[CarEvent]) : Stream[CarEvent] =
-    {
-      Thread.sleep(1000)
-      speeds.append(carStream(speeds.map(nextSpeed)))
-    }
-    carStream(range(0, 
numOfCars).map(CarEvent(_,50,0,System.currentTimeMillis())))
-  }
-
-  def parseParameters(args: Array[String]): Boolean = {
-    if (args.length > 0) {
-      if (args.length == 3) {
-        numOfCars = args(0).toInt
-        evictionSec = args(1).toInt
-        triggerMeters = args(2).toDouble
-        true
-      }
-      else {
-        System.err.println("Usage: TopSpeedWindowing <numCars> <evictSec> 
<triggerMeters>")
-        false
-      }
-    }else{
-      true
-    }
-  }
-
-  var numOfCars = 2
-  var evictionSec = 10
-  var triggerMeters = 50d
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
deleted file mode 100644
index f527a3c..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/TPCHQuery3Table.scala
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.examples.scala
-
-import org.apache.flink.api.table.expressions.Literal
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-
-/**
- * This program implements a modified version of the TPC-H query 3. The
- * example demonstrates how to assign names to fields by extending the Tuple 
class.
- * The original query can be found at
- * 
[http://www.tpc.org/tpch/spec/tpch2.16.0.pdf](http://www.tpc.org/tpch/spec/tpch2.16.0.pdf)
- * (page 29).
- *
- * This program implements the following SQL equivalent:
- *
- * {{{
- * SELECT 
- *      l_orderkey, 
- *      SUM(l_extendedprice*(1-l_discount)) AS revenue,
- *      o_orderdate, 
- *      o_shippriority 
- * FROM customer, 
- *      orders, 
- *      lineitem 
- * WHERE
- *      c_mktsegment = '[SEGMENT]' 
- *      AND c_custkey = o_custkey
- *      AND l_orderkey = o_orderkey
- *      AND o_orderdate < date '[DATE]'
- *      AND l_shipdate > date '[DATE]'
- * GROUP BY
- *      l_orderkey, 
- *      o_orderdate, 
- *      o_shippriority;
- * }}}
- *
- * Compared to the original TPC-H query this version does not sort the result 
by revenue
- * and orderdate.
- *
- * Input files are plain text CSV files using the pipe character ('|') as 
field separator 
- * as generated by the TPC-H data generator which is available at 
- * [http://www.tpc.org/tpch/](a href="http://www.tpc.org/tpch/).
- *
- * Usage: 
- * {{{
- * TPCHQuery3Expression <lineitem-csv path> <customer-csv path> <orders-csv 
path> <result path>
- * }}}
- *  
- * This example shows how to use:
- *  - Table API expressions
- *
- */
-object TPCHQuery3Table {
-
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-
-    // set filter date
-    val dateFormat = new _root_.java.text.SimpleDateFormat("yyyy-MM-dd")
-    val date = dateFormat.parse("1995-03-12")
-    
-    // get execution environment
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    val lineitems = getLineitemDataSet(env)
-      .filter( l => dateFormat.parse(l.shipDate).after(date) )
-      .as('id, 'extdPrice, 'discount, 'shipDate)
-
-    val customers = getCustomerDataSet(env)
-      .as('id, 'mktSegment)
-      .filter( 'mktSegment === "AUTOMOBILE" )
-
-    val orders = getOrdersDataSet(env)
-      .filter( o => dateFormat.parse(o.orderDate).before(date) )
-      .as('orderId, 'custId, 'orderDate, 'shipPrio)
-
-    val items =
-      orders.join(customers)
-        .where('custId === 'id)
-        .select('orderId, 'orderDate, 'shipPrio)
-      .join(lineitems)
-        .where('orderId === 'id)
-        .select(
-          'orderId,
-          'extdPrice * (Literal(1.0f) - 'discount) as 'revenue,
-          'orderDate,
-          'shipPrio)
-
-    val result = items
-      .groupBy('orderId, 'orderDate, 'shipPrio)
-      .select('orderId, 'revenue.sum, 'orderDate, 'shipPrio)
-
-    // emit result
-    result.writeAsCsv(outputPath, "\n", "|")
-
-    // execute program
-    env.execute("Scala TPCH Query 3 (Expression) Example")
-  }
-  
-  // *************************************************************************
-  //     USER DATA TYPES
-  // *************************************************************************
-  
-  case class Lineitem(id: Long, extdPrice: Double, discount: Double, shipDate: 
String)
-  case class Customer(id: Long, mktSegment: String)
-  case class Order(orderId: Long, custId: Long, orderDate: String, shipPrio: 
Long)
-
-  // *************************************************************************
-  //     UTIL METHODS
-  // *************************************************************************
-  
-  private var lineitemPath: String = null
-  private var customerPath: String = null
-  private var ordersPath: String = null
-  private var outputPath: String = null
-
-  private def parseParameters(args: Array[String]): Boolean = {
-    if (args.length == 4) {
-      lineitemPath = args(0)
-      customerPath = args(1)
-      ordersPath = args(2)
-      outputPath = args(3)
-      true
-    } else {
-      System.err.println("This program expects data from the TPC-H benchmark 
as input data.\n" +
-          " Due to legal restrictions, we can not ship generated data.\n" +
-          " You can find the TPC-H data generator at 
http://www.tpc.org/tpch/.\n"; +
-          " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path>" + 
-                             "<orders-csv path> <result path>");
-      false
-    }
-  }
-  
-  private def getLineitemDataSet(env: ExecutionEnvironment): DataSet[Lineitem] 
= {
-    env.readCsvFile[Lineitem](
-        lineitemPath,
-        fieldDelimiter = "|",
-        includedFields = Array(0, 5, 6, 10) )
-  }
-
-  private def getCustomerDataSet(env: ExecutionEnvironment): DataSet[Customer] 
= {
-    env.readCsvFile[Customer](
-        customerPath,
-        fieldDelimiter = "|",
-        includedFields = Array(0, 6) )
-  }
-  
-  private def getOrdersDataSet(env: ExecutionEnvironment): DataSet[Order] = {
-    env.readCsvFile[Order](
-        ordersPath,
-        fieldDelimiter = "|",
-        includedFields = Array(0, 1, 4, 7) )
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
 
b/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
deleted file mode 100644
index cac9590..0000000
--- 
a/flink-staging/flink-table/src/main/scala/org/apache/flink/examples/scala/WordCountTable.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.examples.scala
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-
-/**
- * Simple example for demonstrating the use of the Table API for a Word Count.
- */
-object WordCountTable {
-
-  case class WC(word: String, count: Int)
-
-  def main(args: Array[String]): Unit = {
-
-    // set up execution environment
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
-    val expr = input.toTable
-    val result = expr
-      .groupBy('word)
-      .select('word, 'count.sum as 'count)
-      .toDataSet[WC]
-
-    result.print()
-  }
-}

Reply via email to