http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/arithmetic.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/arithmetic.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/arithmetic.scala deleted file mode 100644 index e3f62f8..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/arithmetic.scala +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.tree - -import org.apache.flink.api.expressions.ExpressionException -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo, NumericTypeInfo, TypeInformation} - -abstract class BinaryArithmetic extends BinaryExpression { - def typeInfo = { - if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) { - throw new ExpressionException( - s"""Non-numeric operand ${left} of type ${left.typeInfo} in $this""") - } - if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) { - throw new ExpressionException( - s"""Non-numeric operand "${right}" of type ${right.typeInfo} in $this""") - } - if (left.typeInfo != right.typeInfo) { - throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " + - s"${right.typeInfo} in $this") - } - left.typeInfo - } -} - -case class Plus(left: Expression, right: Expression) extends BinaryArithmetic { - override def typeInfo = { - if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]] && - !(left.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) { - throw new ExpressionException(s"Non-numeric operand type ${left.typeInfo} in $this") - } - if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]] && - !(right.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) { - throw new ExpressionException(s"Non-numeric operand type ${right.typeInfo} in $this") - } - if (left.typeInfo != right.typeInfo) { - throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " + - s"${right.typeInfo} in $this") - } - left.typeInfo - } - - override def toString = s"($left + $right)" -} - -case class UnaryMinus(child: Expression) extends UnaryExpression { - def typeInfo = { - if (!child.typeInfo.isInstanceOf[NumericTypeInfo[_]]) { - throw new ExpressionException( - s"""Non-numeric operand ${child} of type ${child.typeInfo} in $this""") - } - child.typeInfo - } - - override def toString = s"-($child)" -} - -case class Minus(left: Expression, right: Expression) extends BinaryArithmetic { - override def toString = s"($left - $right)" -} - -case class Div(left: Expression, right: Expression) extends BinaryArithmetic { - override def toString = s"($left / $right)" -} - -case class Mul(left: Expression, right: Expression) extends BinaryArithmetic { - override def toString = s"($left * $right)" -} - -case class Mod(left: Expression, right: Expression) extends BinaryArithmetic { - override def toString = s"($left * $right)" -} - -case class Abs(child: Expression) extends UnaryExpression { - def typeInfo = child.typeInfo - - override def toString = s"abs($child)" -} - -abstract class BitwiseBinaryArithmetic extends BinaryExpression { - def typeInfo: TypeInformation[_] = { - if (!left.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { - throw new ExpressionException( - s"""Non-integer operand ${left} of type ${left.typeInfo} in $this""") - } - if (!right.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { - throw new ExpressionException( - s"""Non-integer operand "${right}" of type ${right.typeInfo} in $this""") - } - if (left.typeInfo != right.typeInfo) { - throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " + - s"${right.typeInfo} in $this") - } - if (left.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) { - left.typeInfo - } else { - BasicTypeInfo.INT_TYPE_INFO - } - } -} - -case class BitwiseAnd(left: Expression, right: Expression) extends BitwiseBinaryArithmetic { - override def toString = s"($left & $right)" -} - -case class BitwiseOr(left: Expression, right: Expression) extends BitwiseBinaryArithmetic { - override def toString = s"($left | $right)" -} - - -case class BitwiseXor(left: Expression, right: Expression) extends BitwiseBinaryArithmetic { - override def toString = s"($left ^ $right)" -} - -case class BitwiseNot(child: Expression) extends UnaryExpression { - def typeInfo: TypeInformation[_] = { - if (!child.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { - throw new ExpressionException( - s"""Non-integer operand ${child} of type ${child.typeInfo} in $this""") - } - if (child.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) { - child.typeInfo - } else { - BasicTypeInfo.INT_TYPE_INFO - } - } - - override def toString = s"~($child)" -} -
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/cast.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/cast.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/cast.scala deleted file mode 100644 index f83de5b..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/cast.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.tree - -import org.apache.flink.api.common.typeinfo.TypeInformation - -case class Cast(child: Expression, tpe: TypeInformation[_]) extends UnaryExpression { - def typeInfo = tpe -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/comparison.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/comparison.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/comparison.scala deleted file mode 100644 index fdb5fd0..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/comparison.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.tree - -import org.apache.flink.api.expressions.ExpressionException -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, NumericTypeInfo} - -abstract class BinaryComparison extends BinaryExpression { - def typeInfo = { - if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) { - throw new ExpressionException(s"Non-numeric operand ${left} in $this") - } - if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) { - throw new ExpressionException(s"Non-numeric operand ${right} in $this") - } - if (left.typeInfo != right.typeInfo) { - throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " + - s"${right.typeInfo} in $this") - } - BasicTypeInfo.BOOLEAN_TYPE_INFO - } -} - -case class EqualTo(left: Expression, right: Expression) extends BinaryComparison { - override def typeInfo = { - if (left.typeInfo != right.typeInfo) { - throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " + - s"${right.typeInfo} in $this") - } - BasicTypeInfo.BOOLEAN_TYPE_INFO - } - - override def toString = s"$left === $right" -} - -case class NotEqualTo(left: Expression, right: Expression) extends BinaryComparison { - override def typeInfo = { - if (left.typeInfo != right.typeInfo) { - throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " + - s"${right.typeInfo} in $this") - } - BasicTypeInfo.BOOLEAN_TYPE_INFO - } - - override def toString = s"$left !== $right" -} - -case class GreaterThan(left: Expression, right: Expression) extends BinaryComparison { - override def toString = s"$left > $right" -} - -case class GreaterThanOrEqual(left: Expression, right: Expression) extends BinaryComparison { - override def toString = s"$left >= $right" -} - -case class LessThan(left: Expression, right: Expression) extends BinaryComparison { - override def toString = s"$left < $right" -} - -case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryComparison { - override def toString = s"$left <= $right" -} - -case class IsNull(child: Expression) extends UnaryExpression { - def typeInfo = { - BasicTypeInfo.BOOLEAN_TYPE_INFO - } - - override def toString = s"($child).isNull" -} - -case class IsNotNull(child: Expression) extends UnaryExpression { - def typeInfo = { - BasicTypeInfo.BOOLEAN_TYPE_INFO - } - - override def toString = s"($child).isNotNull" -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/fieldExpression.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/fieldExpression.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/fieldExpression.scala deleted file mode 100644 index a1d8589..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/fieldExpression.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.tree - -import org.apache.flink.api.expressions.ExpressionException -import org.apache.flink.api.common.typeinfo.TypeInformation - -case class UnresolvedFieldReference(override val name: String) extends LeafExpression { - def typeInfo = throw new ExpressionException(s"Unresolved field reference: $this") - - override def toString = "\"" + name -} - -case class ResolvedFieldReference( - override val name: String, - tpe: TypeInformation[_]) extends LeafExpression { - def typeInfo = tpe - - override def toString = s"'$name" -} - -case class Naming(child: Expression, override val name: String) extends UnaryExpression { - def typeInfo = child.typeInfo - - override def toString = s"$child as '$name" -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/literals.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/literals.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/literals.scala deleted file mode 100644 index 03949ee..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/literals.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.tree - -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} -import org.apache.flink.api.scala.expressions.ImplicitExpressionOperations - -object Literal { - def apply(l: Any): Literal = l match { - case i:Int => Literal(i, BasicTypeInfo.INT_TYPE_INFO) - case l:Long => Literal(l, BasicTypeInfo.LONG_TYPE_INFO) - case d: Double => Literal(d, BasicTypeInfo.DOUBLE_TYPE_INFO) - case f: Float => Literal(f, BasicTypeInfo.FLOAT_TYPE_INFO) - case str: String => Literal(str, BasicTypeInfo.STRING_TYPE_INFO) - case bool: Boolean => Literal(bool, BasicTypeInfo.BOOLEAN_TYPE_INFO) - } -} - -case class Literal(value: Any, tpe: TypeInformation[_]) - extends LeafExpression with ImplicitExpressionOperations { - def expr = this - def typeInfo = tpe - - override def toString = s"$value" -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/logic.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/logic.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/logic.scala deleted file mode 100644 index 8f0a068..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/logic.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.tree - -import org.apache.flink.api.expressions.ExpressionException -import org.apache.flink.api.common.typeinfo.BasicTypeInfo - -abstract class BinaryPredicate extends BinaryExpression { - def typeInfo = { - if (left.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO || - right.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) { - throw new ExpressionException(s"Non-boolean operand types ${left.typeInfo} and " + - s"${right.typeInfo} in $this") - } - BasicTypeInfo.BOOLEAN_TYPE_INFO - } -} - -case class Not(child: Expression) extends UnaryExpression { - def typeInfo = { - if (child.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) { - throw new ExpressionException(s"Non-boolean operand type ${child.typeInfo} in $this") - } - BasicTypeInfo.BOOLEAN_TYPE_INFO - } - - override val name = Expression.freshName("not-" + child.name) - - override def toString = s"!($child)" -} - -case class And(left: Expression, right: Expression) extends BinaryPredicate { - override def toString = s"$left && $right" - - override val name = Expression.freshName(left.name + "-and-" + right.name) -} - -case class Or(left: Expression, right: Expression) extends BinaryPredicate { - override def toString = s"$left || $right" - - override val name = Expression.freshName(left.name + "-or-" + right.name) - -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/package.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/package.scala deleted file mode 100644 index 04c29f7..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/package.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions - -/** - * This package contains the base class of AST nodes and all the expression language AST classes. - * Expression trees should not be manually constructed by users. They are implicitly constructed - * from the implicit DSL conversions in - * [[org.apache.flink.api.scala.expressions.ImplicitExpressionConversions]] and - * [[org.apache.flink.api.scala.expressions.ImplicitExpressionOperations]]. For the Java API, - * expression trees should be generated from a string parser that parses expressions and creates - * AST nodes. - */ -package object tree http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/stringExpressions.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/stringExpressions.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/stringExpressions.scala deleted file mode 100644 index 175d445..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/tree/stringExpressions.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.tree - -import org.apache.flink.api.expressions.ExpressionException -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo} - -case class Substring( - str: Expression, - beginIndex: Expression, - endIndex: Expression) extends Expression { - def typeInfo = { - if (str.typeInfo != BasicTypeInfo.STRING_TYPE_INFO) { - throw new ExpressionException( - s"""Operand must be of type String in $this, is ${str.typeInfo}.""") - } - if (!beginIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { - throw new ExpressionException( - s"""Begin index must be an integer type in $this, is ${beginIndex.typeInfo}.""") - } - if (!endIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { - throw new ExpressionException( - s"""End index must be an integer type in $this, is ${endIndex.typeInfo}.""") - } - - BasicTypeInfo.STRING_TYPE_INFO - } - - override def children: Seq[Expression] = Seq(str, beginIndex, endIndex) - override def toString = s"($str).substring($beginIndex, $endIndex)" -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenameOperator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenameOperator.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenameOperator.scala deleted file mode 100644 index 38c908d..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenameOperator.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.typeinfo - -import org.apache.flink.api.common.operators.Operator -import org.apache.flink.api.java.operators.SingleInputOperator -import org.apache.flink.api.java.{DataSet => JavaDataSet} - -/** - * This is a logical operator that can hold a [[RenamingProxyTypeInfo]] for renaming some - * fields of a [[org.apache.flink.api.common.typeutils.CompositeType]]. At runtime this - * disappears since the translation methods simply returns the input. - */ -class RenameOperator[T]( - input: JavaDataSet[T], - renamingTypeInformation: RenamingProxyTypeInfo[T]) - extends SingleInputOperator[T, T, RenameOperator[T]](input, renamingTypeInformation) { - - override protected def translateToDataFlow( - input: Operator[T]): Operator[T] = input -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenamingProxyTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenamingProxyTypeInfo.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenamingProxyTypeInfo.scala deleted file mode 100644 index 0263f8a..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RenamingProxyTypeInfo.scala +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.typeinfo - -import java.util - -import org.apache.flink.api.common.ExecutionConfig -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor -import org.apache.flink.api.common.typeutils.{CompositeType, TypeComparator, TypeSerializer} - -/** - * A TypeInformation that is used to rename fields of an underlying CompositeType. This - * allows the system to translate "as" expression operations to a [[RenameOperator]] - * that does not get translated to a runtime operator. - */ -class RenamingProxyTypeInfo[T]( - tpe: CompositeType[T], - fieldNames: Array[String]) extends CompositeType[T](tpe.getTypeClass) { - - def getUnderlyingType: CompositeType[T] = tpe - - if (tpe.getArity != fieldNames.length) { - throw new IllegalArgumentException(s"Number of field names '${fieldNames.mkString(",")}' and " + - s"number of fields in underlying type $tpe do not match.") - } - - if (fieldNames.toSet.size != fieldNames.length) { - throw new IllegalArgumentException(s"New field names must be unique. " + - s"Names: ${fieldNames.mkString(",")}.") - } - - override def getFieldIndex(fieldName: String): Int = { - val result = fieldNames.indexOf(fieldName) - if (result != fieldNames.lastIndexOf(fieldName)) { - -2 - } else { - result - } - } - override def getFieldNames: Array[String] = fieldNames - - override def isBasicType: Boolean = tpe.isBasicType - - override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[T] = - tpe.createSerializer(executionConfig) - - override def getArity: Int = tpe.getArity - - override def isKeyType: Boolean = tpe.isKeyType - - override def getTypeClass: Class[T] = tpe.getTypeClass - - override def getGenericParameters: java.util.List[TypeInformation[_]] = tpe.getGenericParameters - - override def isTupleType: Boolean = tpe.isTupleType - - override def toString = { - s"RenamingType(type: ${tpe.getTypeClass.getSimpleName}; " + - s"fields: ${fieldNames.mkString(", ")})" - } - - override def getTypeAt[X](pos: Int): TypeInformation[X] = tpe.getTypeAt(pos) - - override def getTotalFields: Int = tpe.getTotalFields - - override def createComparator( - logicalKeyFields: Array[Int], - orders: Array[Boolean], - logicalFieldOffset: Int, - executionConfig: ExecutionConfig) = - tpe.createComparator(logicalKeyFields, orders, logicalFieldOffset, executionConfig) - - // These are never called since we override create comparator - override protected def initializeNewComparator(localKeyCount: Int): Unit = - throw new RuntimeException("Cannot happen.") - - override protected def getNewComparator(executionConfig: ExecutionConfig): TypeComparator[T] = - throw new RuntimeException("Cannot happen.") - - override protected def addCompareField(fieldId: Int, comparator: TypeComparator[_]): Unit = - throw new RuntimeException("Cannot happen.") - - override def getFlatFields( - fieldExpression: String, - offset: Int, - result: util.List[FlatFieldDescriptor]): Unit = { - tpe.getFlatFields(fieldExpression, offset, result) - } - - override def getTypeAt[X](fieldExpression: String): TypeInformation[X] = { - tpe.getTypeAt(fieldExpression) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowSerializer.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowSerializer.scala deleted file mode 100644 index 006c0c9..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowSerializer.scala +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.typeinfo - -import org.apache.flink.api.expressions.Row -import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.core.memory.{DataOutputView, DataInputView} -; - -/** - * Serializer for [[Row]]. - */ -class RowSerializer(fieldSerializers: Array[TypeSerializer[Any]]) - extends TypeSerializer[Row] { - - override def isImmutableType: Boolean = false - - override def getLength: Int = -1 - - override def duplicate = this - - override def createInstance: Row = { - new Row(fieldSerializers.length) - } - - override def copy(from: Row, reuse: Row): Row = { - val len = fieldSerializers.length - - if (from.productArity != len) { - throw new RuntimeException("Row arity of reuse and from do not match.") - } - var i = 0 - while (i < len) { - val reuseField = reuse.productElement(i) - val fromField = from.productElement(i).asInstanceOf[AnyRef] - val copy = fieldSerializers(i).copy(fromField, reuseField) - reuse.setField(i, copy) - i += 1 - } - reuse - } - - override def copy(from: Row): Row = { - val len = fieldSerializers.length - - if (from.productArity != len) { - throw new RuntimeException("Row arity of reuse and from do not match.") - } - val result = new Row(len) - var i = 0 - while (i < len) { - val fromField = from.productElement(i).asInstanceOf[AnyRef] - val copy = fieldSerializers(i).copy(fromField) - result.setField(i, copy) - i += 1 - } - result - } - - override def serialize(value: Row, target: DataOutputView) { - val len = fieldSerializers.length - var i = 0 - while (i < len) { - val serializer = fieldSerializers(i) - serializer.serialize(value.productElement(i), target) - i += 1 - } - } - - override def deserialize(reuse: Row, source: DataInputView): Row = { - val len = fieldSerializers.length - - if (reuse.productArity != len) { - throw new RuntimeException("Row arity of reuse and fields do not match.") - } - - var i = 0 - while (i < len) { - val field = reuse.productElement(i).asInstanceOf[AnyRef] - reuse.setField(i, fieldSerializers(i).deserialize(field, source)) - i += 1 - } - reuse - } - - override def deserialize(source: DataInputView): Row = { - val len = fieldSerializers.length - - val result = new Row(len) - var i = 0 - while (i < len) { - result.setField(i, fieldSerializers(i).deserialize(source)) - i += 1 - } - result - } - - override def copy(source: DataInputView, target: DataOutputView): Unit = { - val len = fieldSerializers.length - var i = 0 - while (i < len) { - fieldSerializers(i).copy(source, target) - i += 1 - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowTypeInfo.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowTypeInfo.scala deleted file mode 100644 index 92e9bc8..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/expressions/typeinfo/RowTypeInfo.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.expressions.typeinfo - -import org.apache.flink.api.common.ExecutionConfig -import org.apache.flink.api.expressions.Row -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.TypeSerializer -import org.apache.flink.api.expressions.tree.Expression -import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo} - -/** - * TypeInformation for [[Row]]. - */ -class RowTypeInfo( - fieldTypes: Seq[TypeInformation[_]], - fieldNames: Seq[String]) - extends CaseClassTypeInfo[Row](classOf[Row], Array(), fieldTypes, fieldNames) { - - def this(fields: Seq[Expression]) = this(fields.map(_.typeInfo), fields.map(_.name)) - - if (fieldNames.toSet.size != fieldNames.size) { - throw new IllegalArgumentException("Field names must be unique.") - } - - override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[Row] = { - val fieldSerializers: Array[TypeSerializer[Any]] = new Array[TypeSerializer[Any]](getArity) - for (i <- 0 until getArity) { - fieldSerializers(i) = this.types(i).createSerializer(executionConfig) - .asInstanceOf[TypeSerializer[Any]] - } - - new RowSerializer(fieldSerializers) - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/java/expressions/ExpressionUtil.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/java/expressions/ExpressionUtil.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/java/expressions/ExpressionUtil.scala deleted file mode 100644 index ad7cfe4..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/java/expressions/ExpressionUtil.scala +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.java.expressions - -import org.apache.flink.api.expressions.ExpressionOperation -import org.apache.flink.api.java.DataSet -import org.apache.flink.api.java.typeutils.TypeExtractor -import org.apache.flink.api.scala.expressions.JavaBatchTranslator -import org.apache.flink.api.scala.expressions.JavaStreamingTranslator -import org.apache.flink.streaming.api.datastream.DataStream - -/** - * Convencience methods for creating an [[org.apache.flink.api.expressions.ExpressionOperation]] - * and for converting an [[org.apache.flink.api.expressions.ExpressionOperation]] back - * to a [[org.apache.flink.api.java.DataSet]] or - * [[org.apache.flink.streaming.api.datastream.DataStream]]. - */ -object ExpressionUtil { - - /** - * Transforms the given DataSet to a [[org.apache.flink.api.expressions.ExpressionOperation]]. - * The fields of the DataSet type are renamed to the given set of fields: - * - * Example: - * - * {{{ - * ExpressionUtil.from(set, "a, b") - * }}} - * - * This will transform the set containing elements of two fields to a table where the fields - * are named a and b. - */ - def from[T](set: DataSet[T], fields: String): ExpressionOperation[JavaBatchTranslator] = { - new JavaBatchTranslator().createExpressionOperation(set, fields) - } - - /** - * Transforms the given DataSet to a [[org.apache.flink.api.expressions.ExpressionOperation]]. - * The fields of the DataSet type are used to name the - * [[org.apache.flink.api.expressions.ExpressionOperation]] fields. - */ - def from[T](set: DataSet[T]): ExpressionOperation[JavaBatchTranslator] = { - new JavaBatchTranslator().createExpressionOperation(set) - } - - /** - * Transforms the given DataStream to a [[org.apache.flink.api.expressions.ExpressionOperation]]. - * The fields of the DataSet type are renamed to the given set of fields: - * - * Example: - * - * {{{ - * ExpressionUtil.from(set, "a, b") - * }}} - * - * This will transform the set containing elements of two fields to a table where the fields - * are named a and b. - */ - def from[T](set: DataStream[T], fields: String): ExpressionOperation[JavaStreamingTranslator] = { - new JavaStreamingTranslator().createExpressionOperation(set, fields) - } - - /** - * Transforms the given DataSet to a [[org.apache.flink.api.expressions.ExpressionOperation]]. - * The fields of the DataSet type are used to name the - * [[org.apache.flink.api.expressions.ExpressionOperation]] fields. - */ - def from[T](set: DataStream[T]): ExpressionOperation[JavaStreamingTranslator] = { - new JavaStreamingTranslator().createExpressionOperation(set) - } - - /** - * Converts the given [[org.apache.flink.api.expressions.ExpressionOperation]] to - * a DataSet. The given type must have exactly the same fields as the - * [[org.apache.flink.api.expressions.ExpressionOperation]]. That is, the names of the - * fields and the types must match. - */ - @SuppressWarnings(Array("unchecked")) - def toSet[T]( - op: ExpressionOperation[JavaBatchTranslator], - clazz: Class[T]): DataSet[T] = { - op.as(TypeExtractor.createTypeInfo(clazz)).asInstanceOf[DataSet[T]] - } - - /** - * Converts the given [[org.apache.flink.api.expressions.ExpressionOperation]] to - * a DataStream. The given type must have exactly the same fields as the - * [[org.apache.flink.api.expressions.ExpressionOperation]]. That is, the names of the - * fields and the types must match. - */ - @SuppressWarnings(Array("unchecked")) - def toStream[T]( - op: ExpressionOperation[JavaStreamingTranslator], clazz: Class[T]): DataStream[T] = { - op.as(TypeExtractor.createTypeInfo(clazz)).asInstanceOf[DataStream[T]] - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataSetConversions.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataSetConversions.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataSetConversions.scala deleted file mode 100644 index 567d19c..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataSetConversions.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.scala.expressions - -import org.apache.flink.api.expressions._ -import org.apache.flink.api.expressions.tree.{UnresolvedFieldReference, Expression} -import org.apache.flink.api.common.typeutils.CompositeType - -import org.apache.flink.api.scala._ - -/** - * Methods for converting a [[DataSet]] to an [[ExpressionOperation]]. A [[DataSet]] is - * wrapped in this by the implicit conversions in [[org.apache.flink.api.scala.expressions]]. - */ -class DataSetConversions[T](set: DataSet[T], inputType: CompositeType[T]) { - - /** - * Converts the [[DataSet]] to an [[ExpressionOperation]]. The field names of the resulting - * expression operation can be specified like this: - * - * {{{ - * val in: DataSet[(String, Int)] = ... - * val expr = in.as('a, 'b) - * }}} - * - * This results in an expression operation that has field `a` of type `String` and field `b` - * of type `Int`. - */ - def as(fields: Expression*): ExpressionOperation[ScalaBatchTranslator] = { - new ScalaBatchTranslator().createExpressionOperation(set, fields.toArray) - } - - /** - * Converts the [[DataSet]] to an [[ExpressionOperation]]. The field names of the resulting - * expression operation will be taken from the field names of the input type: - * - * {{{ - * val in: DataSet[(String, Int)] = ... - * val expr = in.toExpression - * }}} - * - * This results in an expression operation that has field `_1` of type `String` and field `_2` - * of type `Int`. - */ - def toExpression: ExpressionOperation[ScalaBatchTranslator] = { - val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference) - as(resultFields: _*) - } - -} - http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataStreamConversions.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataStreamConversions.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataStreamConversions.scala deleted file mode 100644 index 49dbce7..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/DataStreamConversions.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.expressions - -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.expressions._ -import org.apache.flink.api.expressions.tree.{Expression, UnresolvedFieldReference} -import org.apache.flink.api.scala._ -import org.apache.flink.streaming.api.scala.DataStream - -class DataStreamConversions[T](set: DataStream[T], inputType: CompositeType[T]) { - - /** - * Converts the [[DataStream]] to an [[ExpressionOperation]]. The field names of the resulting - * expression operation can be specified like this: - * - * {{{ - * val in: DataSet[(String, Int)] = ... - * val expr = in.as('a, 'b) - * }}} - * - * This results in an expression operation that has field `a` of type `String` and field `b` - * of type `Int`. - */ - - def as(fields: Expression*): ExpressionOperation[ScalaStreamingTranslator] = { - new ScalaStreamingTranslator().createExpressionOperation(set, fields.toArray) - } - - /** - * Converts the [[DataStream]] to an [[ExpressionOperation]]. The field names of the resulting - * expression operation will be taken from the field names of the input type: - * - * {{{ - * val in: DataSet[(String, Int)] = ... - * val expr = in.toExpression - * }}} - * - * This results in an expression operation that has field `_1` of type `String` and field `_2` - * of type `Int`. - */ - - def toExpression: ExpressionOperation[ScalaStreamingTranslator] = { - val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference) - as(resultFields: _*) - } - -} - http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaBatchTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaBatchTranslator.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaBatchTranslator.scala deleted file mode 100644 index ae41ceb..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaBatchTranslator.scala +++ /dev/null @@ -1,392 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.expressions - -import java.lang.reflect.Modifier - -import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.expressions.analysis.ExtractEquiJoinFields -import org.apache.flink.api.expressions.operations._ -import org.apache.flink.api.expressions.parser.ExpressionParser -import org.apache.flink.api.expressions.runtime.{ExpressionAggregateFunction, ExpressionFilterFunction, ExpressionJoinFunction, ExpressionSelectFunction} -import org.apache.flink.api.expressions.tree._ -import org.apache.flink.api.expressions.typeinfo.{RenameOperator, RenamingProxyTypeInfo, RowTypeInfo} -import org.apache.flink.api.expressions.{ExpressionException, ExpressionOperation, Row} -import org.apache.flink.api.java.aggregation.AggregationFunction -import org.apache.flink.api.java.operators.JoinOperator.EquiJoin -import org.apache.flink.api.java.operators.Keys.ExpressionKeys -import org.apache.flink.api.java.operators.{GroupReduceOperator, Keys, MapOperator, UnsortedGrouping} -import org.apache.flink.api.java.{DataSet => JavaDataSet} - -/** - * [[OperationTranslator]] for creating [[ExpressionOperation]]s from Java [[JavaDataSet]]s and - * translating them back to Scala [[JavaDataSet]]s. - */ -class JavaBatchTranslator extends OperationTranslator { - - type Representation[A] = JavaDataSet[A] - - - def createExpressionOperation[A]( - repr: JavaDataSet[A]): ExpressionOperation[JavaBatchTranslator] = { - val fields = - repr.getType.asInstanceOf[CompositeType[A]].getFieldNames.map(UnresolvedFieldReference) - - createExpressionOperation(repr, fields.toArray.asInstanceOf[Array[Expression]], false) - } - - def createExpressionOperation[A]( - repr: JavaDataSet[A], - expression: String): ExpressionOperation[JavaBatchTranslator] = { - val fields = ExpressionParser.parseExpressionList(expression) - - createExpressionOperation(repr, fields.toArray) - } - - def createExpressionOperation[A]( - repr: JavaDataSet[A], - fields: Array[Expression], - checkDeterministicFields: Boolean = true): ExpressionOperation[JavaBatchTranslator] = { - - // shortcut for DataSet[Row] - repr.getType match { - case rowTypeInfo: RowTypeInfo => - val expressions = rowTypeInfo.getFieldNames map { - name => (name, rowTypeInfo.getTypeAt(name)) - } - new ExpressionOperation( - Root(repr.asInstanceOf[JavaDataSet[Row]], expressions), this) - case _ => - } - - val clazz = repr.getType.getTypeClass - if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) { - throw new ExpressionException("Cannot create expression Operation from DataSet of type " + - clazz.getName + ". Only top-level classes or static members classes " + - " are supported.") - } - - if (!repr.getType.isInstanceOf[CompositeType[_]]) { - throw new ExpressionException("Only DataSets of composite type can be transformed to an" + - " Expression Operation. These would be tuples, case classes and POJOs.") - } - - val inputType = repr.getType.asInstanceOf[CompositeType[A]] - - if (!inputType.hasDeterministicFieldOrder && checkDeterministicFields) { - throw new ExpressionException(s"You cannot rename fields upon Table creaton: " + - s"Field order of input type $inputType is not deterministic." ) - } - - if (fields.length != inputType.getFieldNames.length) { - throw new ExpressionException("Number of selected fields: '" + fields.mkString(",") + - "' and number of fields in input type " + inputType + " do not match.") - } - - val newFieldNames = fields map { - case UnresolvedFieldReference(name) => name - case e => - throw new ExpressionException("Only field references allowed in 'as' operation, " + - " offending expression: " + e) - } - - if (newFieldNames.toSet.size != newFieldNames.size) { - throw new ExpressionException(s"Ambiguous field names in ${fields.mkString(", ")}") - } - - val resultFields: Seq[(String, TypeInformation[_])] = newFieldNames.zipWithIndex map { - case (name, index) => (name, inputType.getTypeAt(index)) - } - - val inputFields = inputType.getFieldNames - val fieldMappings = inputFields.zip(resultFields) - val expressions: Array[Expression] = fieldMappings map { - case (oldName, (newName, tpe)) => Naming(ResolvedFieldReference(oldName, tpe), newName) - } - - val rowDataSet = createSelect(expressions, repr, inputType) - - new ExpressionOperation(Root(rowDataSet, resultFields), new JavaBatchTranslator) - } - - override def translate[A](op: Operation)(implicit tpe: TypeInformation[A]): JavaDataSet[A] = { - - if (tpe.getTypeClass == classOf[Row]) { - // shortcut for DataSet[Row] - return translateInternal(op).asInstanceOf[JavaDataSet[A]] - } - - val clazz = tpe.getTypeClass - if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) { - throw new ExpressionException("Cannot create DataSet of type " + - clazz.getName + ". Only top-level classes or static member classes are supported.") - } - - - if (!implicitly[TypeInformation[A]].isInstanceOf[CompositeType[A]]) { - throw new ExpressionException( - "Expression operations can only be converted to composite types, type is: " + - implicitly[TypeInformation[A]] + - ". Composite types would be tuples, case classes and POJOs.") - } - - val resultSet = translateInternal(op) - - val resultType = resultSet.getType.asInstanceOf[RowTypeInfo] - - val outputType = implicitly[TypeInformation[A]].asInstanceOf[CompositeType[A]] - - val resultNames = resultType.getFieldNames - val outputNames = outputType.getFieldNames.toSeq - - if (resultNames.toSet != outputNames.toSet) { - throw new ExpressionException(s"Expression result type $resultType does not have the same" + - s"fields as output type $outputType") - } - - for (f <- outputNames) { - val in = resultType.getTypeAt(resultType.getFieldIndex(f)) - val out = outputType.getTypeAt(outputType.getFieldIndex(f)) - if (!in.equals(out)) { - throw new ExpressionException(s"Types for field $f differ on input $resultType and " + - s"output $outputType.") - } - } - - val outputFields = outputNames map { - f => ResolvedFieldReference(f, resultType.getTypeAt(f)) - } - - val function = new ExpressionSelectFunction( - resultSet.getType.asInstanceOf[RowTypeInfo], - outputType, - outputFields) - - val opName = s"select(${outputFields.mkString(",")})" - val operator = new MapOperator(resultSet, outputType, function, opName) - - operator - } - - private def translateInternal(op: Operation): JavaDataSet[Row] = { - op match { - case Root(dataSet: JavaDataSet[Row], resultFields) => - dataSet - - case Root(_, _) => - throw new ExpressionException("Invalid Root for JavaBatchTranslator: " + op) - - case GroupBy(_, fields) => - throw new ExpressionException("Dangling GroupBy operation. Did you forget a " + - "SELECT statement?") - - case As(input, newNames) => - val translatedInput = translateInternal(input) - val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] - val proxyType = new RenamingProxyTypeInfo[Row](inType, newNames.toArray) - new RenameOperator(translatedInput, proxyType) - - case sel@Select(Filter(Join(leftInput, rightInput), predicate), selection) => - - val expandedInput = ExpandAggregations(sel) - - if (expandedInput.eq(sel)) { - val translatedLeftInput = translateInternal(leftInput) - val translatedRightInput = translateInternal(rightInput) - val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]] - val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]] - - createJoin( - predicate, - selection, - translatedLeftInput, - translatedRightInput, - leftInType, - rightInType, - JoinHint.OPTIMIZER_CHOOSES) - } else { - translateInternal(expandedInput) - } - - case Filter(Join(leftInput, rightInput), predicate) => - val translatedLeftInput = translateInternal(leftInput) - val translatedRightInput = translateInternal(rightInput) - val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]] - val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]] - - createJoin( - predicate, - leftInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)) ++ - rightInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)), - translatedLeftInput, - translatedRightInput, - leftInType, - rightInType, - JoinHint.OPTIMIZER_CHOOSES) - - case Join(leftInput, rightInput) => - throw new ExpressionException("Join without filter condition encountered. " + - "Did you forget to add .where(...) ?") - - case sel@Select(input, selection) => - - val expandedInput = ExpandAggregations(sel) - - if (expandedInput.eq(sel)) { - // no expansions took place - val translatedInput = translateInternal(input) - val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] - val inputFields = inType.getFieldNames - createSelect( - selection, - translatedInput, - inType) - } else { - translateInternal(expandedInput) - } - - case agg@Aggregate(GroupBy(input, groupExpressions), aggregations) => - val translatedInput = translateInternal(input) - val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] - - val keyIndices = groupExpressions map { - case fe: ResolvedFieldReference => inType.getFieldIndex(fe.name) - case e => throw new ExpressionException(s"Expression $e is not a valid key expression.") - } - - val keys = new Keys.ExpressionKeys(keyIndices.toArray, inType, false) - - val grouping = new UnsortedGrouping(translatedInput, keys) - - val aggFunctions: Seq[AggregationFunction[Any]] = aggregations map { - case (fieldName, fun) => - fun.getFactory.createAggregationFunction[Any]( - inType.getTypeAt[Any](inType.getFieldIndex(fieldName)).getTypeClass) - } - - val aggIndices = aggregations map { - case (fieldName, _) => - inType.getFieldIndex(fieldName) - } - - val result = new GroupReduceOperator( - grouping, - inType, - new ExpressionAggregateFunction(aggIndices, aggFunctions), - "Expression Aggregation: " + agg) - - result - - case agg@Aggregate(input, aggregations) => - val translatedInput = translateInternal(input) - val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] - - val aggFunctions: Seq[AggregationFunction[Any]] = aggregations map { - case (fieldName, fun) => - fun.getFactory.createAggregationFunction[Any]( - inType.getTypeAt[Any](inType.getFieldIndex(fieldName)).getTypeClass) - } - - val aggIndices = aggregations map { - case (fieldName, _) => - inType.getFieldIndex(fieldName) - } - - val result = new GroupReduceOperator( - translatedInput, - inType, - new ExpressionAggregateFunction(aggIndices, aggFunctions), - "Expression Aggregation: " + agg) - - result - - - case Filter(input, predicate) => - val translatedInput = translateInternal(input) - val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] - val filter = new ExpressionFilterFunction[Row](predicate, inType) - translatedInput.filter(filter) - } - } - - private def createSelect[I]( - fields: Seq[Expression], - input: JavaDataSet[I], - inputType: CompositeType[I]): JavaDataSet[Row] = { - - fields foreach { - f => - if (f.exists(_.isInstanceOf[Aggregation])) { - throw new ExpressionException("Found aggregate in " + fields.mkString(", ") + ".") - } - - } - - val resultType = new RowTypeInfo(fields) - - val function = new ExpressionSelectFunction(inputType, resultType, fields) - - val opName = s"select(${fields.mkString(",")})" - val operator = new MapOperator(input, resultType, function, opName) - - operator - } - - private def createJoin[L, R]( - predicate: Expression, - fields: Seq[Expression], - leftInput: JavaDataSet[L], - rightInput: JavaDataSet[R], - leftType: CompositeType[L], - rightType: CompositeType[R], - joinHint: JoinHint): JavaDataSet[Row] = { - - val resultType = new RowTypeInfo(fields) - - val (reducedPredicate, leftFields, rightFields) = - ExtractEquiJoinFields(leftType, rightType, predicate) - - if (leftFields.isEmpty || rightFields.isEmpty) { - throw new ExpressionException("Could not derive equi-join predicates " + - "for predicate " + predicate + ".") - } - - val leftKey = new ExpressionKeys[L](leftFields, leftType) - val rightKey = new ExpressionKeys[R](rightFields, rightType) - - val joiner = new ExpressionJoinFunction[L, R, Row]( - reducedPredicate, - leftType, - rightType, - resultType, - fields) - - new EquiJoin[L, R, Row]( - leftInput, - rightInput, - leftKey, - rightKey, - joiner, - resultType, - joinHint, - predicate.toString) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaStreamingTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaStreamingTranslator.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaStreamingTranslator.scala deleted file mode 100644 index 56c38af..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/JavaStreamingTranslator.scala +++ /dev/null @@ -1,303 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.expressions - -import java.lang.reflect.Modifier - -import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.expressions.operations._ -import org.apache.flink.api.expressions.parser.ExpressionParser -import org.apache.flink.api.expressions.runtime.{ExpressionFilterFunction, ExpressionSelectFunction} -import org.apache.flink.api.expressions.tree._ -import org.apache.flink.api.expressions.typeinfo.RowTypeInfo -import org.apache.flink.api.expressions.{ExpressionException, ExpressionOperation, Row} -import org.apache.flink.streaming.api.datastream.DataStream -import org.apache.flink.streaming.api.invokable.operator.MapInvokable - -/** - * [[OperationTranslator]] for creating [[ExpressionOperation]]s from Java [[DataStream]]s and - * translating them back to Java [[DataStream]]s. - * - * This is very limited right now. Only select and filter are implemented. Also, the expression - * operations must be extended to allow windowing operations. - */ - -class JavaStreamingTranslator extends OperationTranslator { - - type Representation[A] = DataStream[A] - - - def createExpressionOperation[A]( - repr: DataStream[A]): ExpressionOperation[JavaStreamingTranslator] = { - val fields = - repr.getType.asInstanceOf[CompositeType[A]].getFieldNames.map(UnresolvedFieldReference) - - createExpressionOperation(repr, fields.toArray.asInstanceOf[Array[Expression]]) - } - - def createExpressionOperation[A]( - repr: DataStream[A], - expression: String): ExpressionOperation[JavaStreamingTranslator] = { - val fields = ExpressionParser.parseExpressionList(expression) - - createExpressionOperation(repr, fields.toArray) - } - - def createExpressionOperation[A]( - repr: DataStream[A], - fields: Array[Expression]): ExpressionOperation[JavaStreamingTranslator] = { - - // shortcut for DataSet[Row] - repr.getType match { - case rowTypeInfo: RowTypeInfo => - val expressions = rowTypeInfo.getFieldNames map { - name => (name, rowTypeInfo.getTypeAt(name)) - } - new ExpressionOperation( - Root(repr.asInstanceOf[DataStream[Row]], expressions), this) - case _ => - } - - val clazz = repr.getType.getTypeClass - if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) { - throw new ExpressionException("Cannot create expression Operation from DataSet of type " + - clazz.getName + ". Only top-level classes or static members classes " + - " are supported.") - } - - if (!repr.getType.isInstanceOf[CompositeType[_]]) { - throw new ExpressionException("Only DataSets of composite type can be transformed to an" + - " Expression Operation. These would be tuples, case classes and POJOs.") - } - - val inputType = repr.getType.asInstanceOf[CompositeType[A]] - - if (fields.length != inputType.getFieldNames.length) { - throw new ExpressionException("Number of selected fields: '" + fields.mkString(",") + - "' and number of fields in input type " + inputType + " do not match.") - } - - val newFieldNames = fields map { - case UnresolvedFieldReference(name) => name - case e => - throw new ExpressionException("Only field references allowed in 'as' operation, " + - " offending expression: " + e) - } - - if (newFieldNames.toSet.size != newFieldNames.size) { - throw new ExpressionException(s"Ambiguous field names in ${fields.mkString(", ")}") - } - - val resultFields: Seq[(String, TypeInformation[_])] = newFieldNames.zipWithIndex map { - case (name, index) => (name, inputType.getTypeAt(index)) - } - - val inputFields = inputType.getFieldNames - val fieldMappings = inputFields.zip(resultFields) - val expressions: Array[Expression] = fieldMappings map { - case (oldName, (newName, tpe)) => Naming(ResolvedFieldReference(oldName, tpe), newName) - } - - val rowDataSet = createSelect(expressions, repr, inputType) - - new ExpressionOperation(Root(rowDataSet, resultFields), new JavaStreamingTranslator) - } - - override def translate[A](op: Operation)(implicit tpe: TypeInformation[A]): DataStream[A] = { - - if (tpe.getTypeClass == classOf[Row]) { - // shortcut for DataSet[Row] - return translateInternal(op).asInstanceOf[DataStream[A]] - } - - val clazz = tpe.getTypeClass - if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) { - throw new ExpressionException("Cannot create DataStream of type " + - clazz.getName + ". Only top-level classes or static member classes are supported.") - } - - if (!implicitly[TypeInformation[A]].isInstanceOf[CompositeType[A]]) { - throw new ExpressionException( - "Expression operations can only be converted to composite types, type is: " + - implicitly[TypeInformation[A]] + - ". Composite types would be tuples, case classes and POJOs.") - - } - - val resultSet = translateInternal(op) - - val resultType = resultSet.getType.asInstanceOf[RowTypeInfo] - - val outputType = implicitly[TypeInformation[A]].asInstanceOf[CompositeType[A]] - - val resultNames = resultType.getFieldNames - val outputNames = outputType.getFieldNames.toSeq - - if (resultNames.toSet != outputNames.toSet) { - throw new ExpressionException(s"Expression result type $resultType does not have the same" + - s"fields as output type $outputType") - } - - for (f <- outputNames) { - val in = resultType.getTypeAt(resultType.getFieldIndex(f)) - val out = outputType.getTypeAt(outputType.getFieldIndex(f)) - if (!in.equals(out)) { - throw new ExpressionException(s"Types for field $f differ on input $resultType and " + - s"output $outputType.") - } - } - - val outputFields = outputNames map { - f => ResolvedFieldReference(f, resultType.getTypeAt(f)) - } - - val function = new ExpressionSelectFunction( - resultSet.getType.asInstanceOf[RowTypeInfo], - outputType, - outputFields) - - val opName = s"select(${outputFields.mkString(",")})" - - resultSet.transform(opName, outputType, new MapInvokable[Row, A](function)) - } - - private def translateInternal(op: Operation): DataStream[Row] = { - op match { - case Root(dataSet: DataStream[Row], resultFields) => - dataSet - - case Root(_, _) => - throw new ExpressionException("Invalid Root for JavaStreamingTranslator: " + op) - - case GroupBy(_, fields) => - throw new ExpressionException("Dangling GroupBy operation. Did you forget a " + - "SELECT statement?") - - case As(input, newNames) => - throw new ExpressionException("As operation for Streams not yet implemented.") - - case sel@Select(Filter(Join(leftInput, rightInput), predicate), selection) => - - val expandedInput = ExpandAggregations(sel) - - if (expandedInput.eq(sel)) { - val translatedLeftInput = translateInternal(leftInput) - val translatedRightInput = translateInternal(rightInput) - val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]] - val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]] - - createJoin( - predicate, - selection, - translatedLeftInput, - translatedRightInput, - leftInType, - rightInType, - JoinHint.OPTIMIZER_CHOOSES) - } else { - translateInternal(expandedInput) - } - - case Filter(Join(leftInput, rightInput), predicate) => - val translatedLeftInput = translateInternal(leftInput) - val translatedRightInput = translateInternal(rightInput) - val leftInType = translatedLeftInput.getType.asInstanceOf[CompositeType[Row]] - val rightInType = translatedRightInput.getType.asInstanceOf[CompositeType[Row]] - - createJoin( - predicate, - leftInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)) ++ - rightInput.outputFields.map( f => ResolvedFieldReference(f._1, f._2)), - translatedLeftInput, - translatedRightInput, - leftInType, - rightInType, - JoinHint.OPTIMIZER_CHOOSES) - - case Join(leftInput, rightInput) => - throw new ExpressionException("Join without filter condition encountered. " + - "Did you forget to add .where(...) ?") - - case sel@Select(input, selection) => - - val expandedInput = ExpandAggregations(sel) - - if (expandedInput.eq(sel)) { - // no expansions took place - val translatedInput = translateInternal(input) - val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] - val inputFields = inType.getFieldNames - createSelect( - selection, - translatedInput, - inType) - } else { - translateInternal(expandedInput) - } - - case agg@Aggregate(GroupBy(input, groupExpressions), aggregations) => - throw new ExpressionException("Aggregate operation for Streams not yet implemented.") - - case agg@Aggregate(input, aggregations) => - throw new ExpressionException("Aggregate operation for Streams not yet implemented.") - - case Filter(input, predicate) => - val translatedInput = translateInternal(input) - val inType = translatedInput.getType.asInstanceOf[CompositeType[Row]] - val filter = new ExpressionFilterFunction[Row](predicate, inType) - translatedInput.filter(filter) - } - } - - private def createSelect[I]( - fields: Seq[Expression], - input: DataStream[I], - inputType: CompositeType[I]): DataStream[Row] = { - - fields foreach { - f => - if (f.exists(_.isInstanceOf[Aggregation])) { - throw new ExpressionException("Found aggregate in " + fields.mkString(", ") + ".") - } - - } - - val resultType = new RowTypeInfo(fields) - - val function = new ExpressionSelectFunction(inputType, resultType, fields) - - val opName = s"select(${fields.mkString(",")})" - - input.transform(opName, resultType, new MapInvokable[I, Row](function)) - } - - private def createJoin[L, R]( - predicate: Expression, - fields: Seq[Expression], - leftInput: DataStream[L], - rightInput: DataStream[R], - leftType: CompositeType[L], - rightType: CompositeType[R], - joinHint: JoinHint): DataStream[Row] = { - - throw new ExpressionException("Join operation for Streams not yet implemented.") - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaBatchTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaBatchTranslator.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaBatchTranslator.scala deleted file mode 100644 index 724c8a7..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaBatchTranslator.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.expressions - - -import org.apache.flink.api.expressions.tree.Expression -import org.apache.flink.api.scala.wrap -import org.apache.flink.api.expressions.operations._ -import org.apache.flink.api.expressions.ExpressionOperation -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.scala.DataSet - -import scala.reflect.ClassTag - - -/** - * [[OperationTranslator]] for creating [[ExpressionOperation]]s from Scala [[DataSet]]s and - * translating them back to Scala [[DataSet]]s. - */ -class ScalaBatchTranslator extends OperationTranslator { - - private val javaTranslator = new JavaBatchTranslator - - override type Representation[A] = DataSet[A] - - def createExpressionOperation[A]( - repr: DataSet[A], - fields: Array[Expression]): ExpressionOperation[ScalaBatchTranslator] = { - - val result = javaTranslator.createExpressionOperation(repr.javaSet, fields) - - new ExpressionOperation[ScalaBatchTranslator](result.operation, this) - } - - override def translate[O](op: Operation)(implicit tpe: TypeInformation[O]): DataSet[O] = { - // fake it till you make it ... - wrap(javaTranslator.translate(op))(ClassTag.AnyRef.asInstanceOf[ClassTag[O]]) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaStreamingTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaStreamingTranslator.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaStreamingTranslator.scala deleted file mode 100644 index 7db483f..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/ScalaStreamingTranslator.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.scala.expressions - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.expressions.operations._ -import org.apache.flink.api.expressions.tree.Expression -import org.apache.flink.api.expressions.{ExpressionOperation, Row} -import org.apache.flink.api.scala.DataSet -import org.apache.flink.streaming.api.scala.DataStream - -import org.apache.flink.streaming.api.scala.javaToScalaStream - -/** - * [[OperationTranslator]] for creating [[ExpressionOperation]]s from Scala [[DataStream]]s and - * translating them back to Scala [[DataStream]]s. - * - * This is very limited right now. Only select and filter are implemented. Also, the expression - * operations must be extended to allow windowing operations. - */ -class ScalaStreamingTranslator extends OperationTranslator { - - private val javaTranslator = new JavaStreamingTranslator - - override type Representation[A] = DataStream[A] - - def createExpressionOperation[A]( - repr: DataStream[A], - fields: Array[Expression]): ExpressionOperation[ScalaStreamingTranslator] = { - - val result = javaTranslator.createExpressionOperation(repr.getJavaStream, fields) - - new ExpressionOperation[ScalaStreamingTranslator](result.operation, this) - } - - override def translate[O](op: Operation)(implicit tpe: TypeInformation[O]): DataStream[O] = { - // fake it till you make it ... - javaToScalaStream(javaTranslator.translate(op)) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/expressionDsl.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/expressionDsl.scala b/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/expressionDsl.scala deleted file mode 100644 index 1f6c397..0000000 --- a/flink-staging/flink-expressions/src/main/scala/org/apache/flink/api/scala/expressions/expressionDsl.scala +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.scala.expressions - -import org.apache.flink.api.expressions.tree._ -import org.apache.flink.api.common.typeinfo.TypeInformation - -import scala.language.implicitConversions - -/** - * These are all the operations that can be used to construct an [[Expression]] AST for expression - * operations. - * - * These operations must be kept in sync with the parser in - * [[org.apache.flink.api.expressions.parser.ExpressionParser]]. - */ -trait ImplicitExpressionOperations { - def expr: Expression - - def && (other: Expression) = And(expr, other) - def || (other: Expression) = Or(expr, other) - - def > (other: Expression) = GreaterThan(expr, other) - def >= (other: Expression) = GreaterThanOrEqual(expr, other) - def < (other: Expression) = LessThan(expr, other) - def <= (other: Expression) = LessThanOrEqual(expr, other) - - def === (other: Expression) = EqualTo(expr, other) - def !== (other: Expression) = NotEqualTo(expr, other) - - def unary_! = Not(expr) - def unary_- = UnaryMinus(expr) - - def isNull = IsNull(expr) - def isNotNull = IsNotNull(expr) - - def + (other: Expression) = Plus(expr, other) - def - (other: Expression) = Minus(expr, other) - def / (other: Expression) = Div(expr, other) - def * (other: Expression) = Mul(expr, other) - def % (other: Expression) = Mod(expr, other) - - def & (other: Expression) = BitwiseAnd(expr, other) - def | (other: Expression) = BitwiseOr(expr, other) - def ^ (other: Expression) = BitwiseXor(expr, other) - def unary_~ = BitwiseNot(expr) - - def abs = Abs(expr) - - def sum = Sum(expr) - def min = Min(expr) - def max = Max(expr) - def count = Count(expr) - def avg = Avg(expr) - - def substring(beginIndex: Expression, endIndex: Expression = Literal(Int.MaxValue)) = { - Substring(expr, beginIndex, endIndex) - } - - def cast(toType: TypeInformation[_]) = Cast(expr, toType) - - def as(name: Symbol) = Naming(expr, name.name) -} - -/** - * Implicit conversions from Scala Literals to Expression [[Literal]] and from [[Expression]] - * to [[ImplicitExpressionOperations]]. - */ -trait ImplicitExpressionConversions { - implicit class WithOperations(e: Expression) extends ImplicitExpressionOperations { - def expr = e - } - - implicit class SymbolExpression(s: Symbol) extends ImplicitExpressionOperations { - def expr = UnresolvedFieldReference(s.name) - } - - implicit class LiteralLongExpression(l: Long) extends ImplicitExpressionOperations { - def expr = Literal(l) - } - - implicit class LiteralIntExpression(i: Int) extends ImplicitExpressionOperations { - def expr = Literal(i) - } - - implicit class LiteralFloatExpression(f: Float) extends ImplicitExpressionOperations { - def expr = Literal(f) - } - - implicit class LiteralDoubleExpression(d: Double) extends ImplicitExpressionOperations { - def expr = Literal(d) - } - - implicit class LiteralStringExpression(str: String) extends ImplicitExpressionOperations { - def expr = Literal(str) - } - - implicit class LiteralBooleanExpression(bool: Boolean) extends ImplicitExpressionOperations { - def expr = Literal(bool) - } - - implicit def symbol2FieldExpression(sym: Symbol): Expression = UnresolvedFieldReference(sym.name) - implicit def int2Literal(i: Int): Expression = Literal(i) - implicit def long2Literal(l: Long): Expression = Literal(l) - implicit def double2Literal(d: Double): Expression = Literal(d) - implicit def float2Literal(d: Float): Expression = Literal(d) - implicit def string2Literal(str: String): Expression = Literal(str) - implicit def boolean2Literal(bool: Boolean): Expression = Literal(bool) -}