http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateBinaryResultAssembler.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateBinaryResultAssembler.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateBinaryResultAssembler.scala new file mode 100644 index 0000000..253cac9 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateBinaryResultAssembler.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.codegen + +import org.apache.flink.api.table.tree.Expression +import org.apache.flink.api.common.typeutils.CompositeType +import org.slf4j.LoggerFactory + +/** + * Code generator for assembling the result of a binary operation. + */ +class GenerateBinaryResultAssembler[L, R, O]( + leftTypeInfo: CompositeType[L], + rightTypeInfo: CompositeType[R], + resultTypeInfo: CompositeType[O], + outputFields: Seq[Expression], + cl: ClassLoader) + extends GenerateResultAssembler[(L, R, O) => O]( + Seq(("input0", leftTypeInfo), ("input1", rightTypeInfo)), + cl = cl) { + + val LOG = LoggerFactory.getLogger(this.getClass) + + import scala.reflect.runtime.universe._ + + + override protected def generateInternal(): ((L, R, O) => O) = { + + val leftType = typeTermForTypeInfo(leftTypeInfo) + val rightType = typeTermForTypeInfo(rightTypeInfo) + val resultType = typeTermForTypeInfo(resultTypeInfo) + + val resultCode = createResult(resultTypeInfo, outputFields) + + val code: Tree = + q""" + (input0: $leftType, input1: $rightType, out: $resultType) => { + ..$resultCode + } + """ + + LOG.debug(s"Generated binary result-assembler:\n$code") + toolBox.eval(code).asInstanceOf[(L, R, O) => O] + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala new file mode 100644 index 0000000..42f256f --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.codegen + +import org.apache.flink.api.table.tree.Expression +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.table.typeinfo.RowTypeInfo +import org.apache.flink.api.java.typeutils.{TupleTypeInfo, PojoTypeInfo} +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo + +/** + * Base class for unary and binary result assembler code generators. + */ +abstract class GenerateResultAssembler[R]( + inputs: Seq[(String, CompositeType[_])], + cl: ClassLoader) + extends ExpressionCodeGenerator[R](inputs, cl = cl) { + import scala.reflect.runtime.{universe => ru} + import scala.reflect.runtime.universe._ + + def createResult[T]( + resultTypeInfo: CompositeType[T], + outputFields: Seq[Expression]): Tree = { + + val resultType = typeTermForTypeInfo(resultTypeInfo) + + val fieldsCode = outputFields.map(generateExpression) + + val block = resultTypeInfo match { + case ri: RowTypeInfo => + val resultSetters: Seq[Tree] = fieldsCode.zipWithIndex map { + case (fieldCode, i) => + q""" + out.setField($i, { ..${fieldCode.code}; ${fieldCode.resultTerm} }) + """ + } + + q""" + ..$resultSetters + out + """ + + case pj: PojoTypeInfo[_] => + val resultSetters: Seq[Tree] = fieldsCode.zip(outputFields) map { + case (fieldCode, expr) => + val fieldName = newTermName(expr.name) + q""" + out.$fieldName = { ..${fieldCode.code}; ${fieldCode.resultTerm} } + """ + } + + q""" + ..$resultSetters + out + """ + + case tup: TupleTypeInfo[_] => + val resultSetters: Seq[Tree] = fieldsCode.zip(outputFields) map { + case (fieldCode, expr) => + val fieldName = newTermName(expr.name) + q""" + out.$fieldName = { ..${fieldCode.code}; ${fieldCode.resultTerm} } + """ + } + + q""" + ..$resultSetters + out + """ + + case cc: CaseClassTypeInfo[_] => + val resultFields: Seq[Tree] = fieldsCode map { + fieldCode => + q"{ ..${fieldCode.code}; ${fieldCode.resultTerm}}" + } + q""" + new $resultType(..$resultFields) + """ + } + + block + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateUnaryPredicate.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateUnaryPredicate.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateUnaryPredicate.scala new file mode 100644 index 0000000..32af2a9 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateUnaryPredicate.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.codegen + +import org.apache.flink.api.table.tree.Expression +import org.apache.flink.api.common.typeutils.CompositeType +import org.slf4j.LoggerFactory + +/** + * Code generator for a unary predicate, i.e. a Filter. + */ +class GenerateUnaryPredicate[T]( + inputType: CompositeType[T], + predicate: Expression, + cl: ClassLoader) extends ExpressionCodeGenerator[T => Boolean]( + Seq(("input0", inputType)), + cl = cl) { + + val LOG = LoggerFactory.getLogger(this.getClass) + + import scala.reflect.runtime.{universe => ru} + import scala.reflect.runtime.universe._ + + override protected def generateInternal(): (T => Boolean) = { + val pred = generateExpression(predicate) + + val tpe = typeTermForTypeInfo(inputType) + + val code = if (nullCheck) { + q""" + (input0: $tpe) => { + ..${pred.code} + if (${pred.nullTerm}) { + false + } else { + ${pred.resultTerm} + } + } + """ + } else { + q""" + (input0: $tpe) => { + ..${pred.code} + ${pred.resultTerm} + } + """ + } + + LOG.debug(s"""Generated unary predicate "$predicate":\n$code""") + toolBox.eval(code).asInstanceOf[(T) => Boolean] + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateUnaryResultAssembler.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateUnaryResultAssembler.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateUnaryResultAssembler.scala new file mode 100644 index 0000000..38d7109 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateUnaryResultAssembler.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.codegen + +import org.apache.flink.api.table.tree.Expression +import org.apache.flink.api.common.typeutils.CompositeType +import org.slf4j.LoggerFactory + +/** + * Code generator for assembling the result of a unary operation. + */ +class GenerateUnaryResultAssembler[I, O]( + inputTypeInfo: CompositeType[I], + resultTypeInfo: CompositeType[O], + outputFields: Seq[Expression], + cl: ClassLoader) + extends GenerateResultAssembler[(I, O) => O]( + Seq(("input0", inputTypeInfo)), + cl = cl) { + + val LOG = LoggerFactory.getLogger(this.getClass) + + import scala.reflect.runtime.universe._ + + override protected def generateInternal(): ((I, O) => O) = { + + val inputType = typeTermForTypeInfo(inputTypeInfo) + val resultType = typeTermForTypeInfo(resultTypeInfo) + + val resultCode = createResult(resultTypeInfo, outputFields) + + val code: Tree = + q""" + (input0: $inputType, out: $resultType) => { + ..$resultCode + } + """ + + LOG.debug(s"Generated unary result-assembler:\n${show(code)}") + toolBox.eval(code).asInstanceOf[(I, O) => O] + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/package.scala new file mode 100644 index 0000000..b69ac1c --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/codegen/package.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table + +package object codegen { + // Used in ExpressionCodeGenerator because Scala 2.10 reflection is not thread safe. We might + // have several parallel expression operators in one TaskManager, therefore we need to guard + // these operations. + object ReflectionLock +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/ExpandAggregations.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/ExpandAggregations.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/ExpandAggregations.scala new file mode 100644 index 0000000..894dd22 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/ExpandAggregations.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.operations + +import org.apache.flink.api.table.analysis.SelectionAnalyzer +import org.apache.flink.api.table.tree._ +import org.apache.flink.api.java.aggregation.Aggregations + +import scala.collection.mutable + +/** + * This is used to expand a [[Select]] that contains aggregations. If it is called on a [[Select]] + * without aggregations it is simply returned. + * + * This select: + * {{{ + * in.select('key, 'value.avg) + * }}} + * + * is transformed to this expansion: + * {{{ + * in + * .select('key, 'value, Literal(1) as 'intermediate.1) + * .aggregate('value.sum, 'intermediate.1.sum) + * .select('key, 'value / 'intermediate.1) + * }}} + * + * If the input of the [[Select]] is a [[GroupBy]] this is preserved before the aggregation. + */ +object ExpandAggregations { + def apply(select: Select): Operation = select match { + case Select(input, selection) => + + val aggregations = mutable.HashMap[(Expression, Aggregations), String]() + val intermediateFields = mutable.HashSet[Expression]() + val aggregationIntermediates = mutable.HashMap[Aggregation, Seq[Expression]]() + + var intermediateCount = 0 + selection foreach { f => + f.transformPre { + case agg: Aggregation => + val intermediateReferences = agg.getIntermediateFields.zip(agg.getAggregations) map { + case (expr, basicAgg) => + aggregations.get((expr, basicAgg)) match { + case Some(intermediateName) => + ResolvedFieldReference(intermediateName, expr.typeInfo) + case None => + intermediateCount = intermediateCount + 1 + val intermediateName = s"intermediate.$intermediateCount" + intermediateFields += Naming(expr, intermediateName) + aggregations((expr, basicAgg)) = intermediateName + ResolvedFieldReference(intermediateName, expr.typeInfo) + } + } + + aggregationIntermediates(agg) = intermediateReferences + // Return a NOP so that we don't add the children of the aggregation + // to intermediate fields. We already added the necessary fields to the list + // of intermediate fields. + NopExpression() + + case fa: ResolvedFieldReference => + if (!fa.name.startsWith("intermediate")) { + intermediateFields += Naming(fa, fa.name) + } + fa + } + } + + if (aggregations.isEmpty) { + // no aggregations, just return + return select + } + + // also add the grouping keys to the set of intermediate fields, because we use a Set, + // they are only added when not already present + input match { + case GroupBy(_, groupingFields) => + groupingFields foreach { + case fa: ResolvedFieldReference => + intermediateFields += Naming(fa, fa.name) + } + case _ => // Nothing to add + } + + val basicAggregations = aggregations.map { + case ((expr, basicAgg), fieldName) => + (fieldName, basicAgg) + } + + val finalFields = selection.map { f => + f.transformPre { + case agg: Aggregation => + val intermediates = aggregationIntermediates(agg) + agg.getFinalField(intermediates) + } + } + + val intermediateAnalyzer = new SelectionAnalyzer(input.outputFields) + val analyzedIntermediates = intermediateFields.toSeq.map(intermediateAnalyzer.analyze) + + val finalAnalyzer = + new SelectionAnalyzer(analyzedIntermediates.map(e => (e.name, e.typeInfo))) + val analyzedFinals = finalFields.map(finalAnalyzer.analyze) + + val result = input match { + case GroupBy(groupByInput, groupingFields) => + Select( + Aggregate( + GroupBy( + Select(groupByInput, analyzedIntermediates), + groupingFields), + basicAggregations.toSeq), + analyzedFinals) + + case _ => + Select( + Aggregate( + Select(input, analyzedIntermediates), + basicAggregations.toSeq), + analyzedFinals) + + } + + result + + case _ => select + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/TableTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/TableTranslator.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/TableTranslator.scala new file mode 100644 index 0000000..194edda --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/TableTranslator.scala @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.operations + +import java.lang.reflect.Modifier + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.table.parser.ExpressionParser +import org.apache.flink.api.table.tree.{Expression, Naming, ResolvedFieldReference, UnresolvedFieldReference} +import org.apache.flink.api.table.typeinfo.RowTypeInfo +import org.apache.flink.api.table.{ExpressionException, Table} + +import scala.language.reflectiveCalls + +/** + * When an [[org.apache.flink.api.table.Table]] is created a [[TableTranslator]] corresponding to + * the underlying representation (either [[org.apache.flink.api.scala.DataSet]] or + * [[org.apache.flink.streaming.api.scala.DataStream]] is created. This way, the Table API can be + * completely agnostic while translation back to the correct API is handled by the API specific + * [[TableTranslator]]. + */ +abstract class TableTranslator { + + type Representation[A] <: { def getType(): TypeInformation[A] } + + /** + * Translates the given Table API [[Operation]] back to the underlying representation, i.e, + * a DataSet or a DataStream. + */ + def translate[A](op: Operation)(implicit tpe: TypeInformation[A]): Representation[A] + + /** + * Creates a [[Table]] from a DataSet or a DataStream (the underlying representation). + */ + def createTable[A]( + repr: Representation[A], + inputType: CompositeType[A], + expressions: Array[Expression], + resultFields: Seq[(String, TypeInformation[_])]): Table[this.type] + + /** + * Creates a [[Table]] from the given DataSet or DataStream. + */ + def createTable[A](repr: Representation[A]): Table[this.type] = { + + val fields = repr.getType() match { + case c: CompositeType[A] => c.getFieldNames.map(UnresolvedFieldReference) + + case tpe => Array() // createTable will throw an exception for this later + } + createTable( + repr, + fields.toArray.asInstanceOf[Array[Expression]], + checkDeterministicFields = false) + } + + /** + * Creates a [[Table]] from the given DataSet or DataStream while only taking those + * fields mentioned in the field expression. + */ + def createTable[A](repr: Representation[A], expression: String): Table[this.type] = { + + val fields = ExpressionParser.parseExpressionList(expression) + + createTable(repr, fields.toArray, checkDeterministicFields = true) + } + + /** + * Creates a [[Table]] from the given DataSet or DataStream while only taking those + * fields mentioned in the fields parameter. + * + * When checkDeterministicFields is true check whether the fields of the underlying + * [[TypeInformation]] have a deterministic ordering. This is only the case for Tuples + * and Case classes. For a POJO, the field order is not obvious, this can lead to problems + * when a user renames fields and assumes a certain ordering. + */ + def createTable[A]( + repr: Representation[A], + fields: Array[Expression], + checkDeterministicFields: Boolean = true): Table[this.type] = { + + // shortcut for DataSet[Row] or DataStream[Row] + repr.getType() match { + case rowTypeInfo: RowTypeInfo => + val expressions = rowTypeInfo.getFieldNames map { + name => (name, rowTypeInfo.getTypeAt(name)) + } + new Table( + Root(repr, expressions), this) + + case c: CompositeType[A] => // us ok + + case tpe => throw new ExpressionException("Only DataSets or DataStreams of composite type" + + "can be transformed to a Table. These would be tuples, case classes and " + + "POJOs. Type is: " + tpe) + + } + + val clazz = repr.getType().getTypeClass + if (clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) { + throw new ExpressionException("Cannot create Table from DataSet or DataStream of type " + + clazz.getName + ". Only top-level classes or static members classes " + + " are supported.") + } + + val inputType = repr.getType().asInstanceOf[CompositeType[A]] + + if (!inputType.hasDeterministicFieldOrder && checkDeterministicFields) { + throw new ExpressionException(s"You cannot rename fields upon Table creaton: " + + s"Field order of input type $inputType is not deterministic." ) + } + + if (fields.length != inputType.getFieldNames.length) { + throw new ExpressionException("Number of selected fields: '" + fields.mkString(",") + + "' and number of fields in input type " + inputType + " do not match.") + } + + val newFieldNames = fields map { + case UnresolvedFieldReference(name) => name + case e => + throw new ExpressionException("Only field references allowed in 'as' operation, " + + " offending expression: " + e) + } + + if (newFieldNames.toSet.size != newFieldNames.size) { + throw new ExpressionException(s"Ambiguous field names in ${fields.mkString(", ")}") + } + + val resultFields: Seq[(String, TypeInformation[_])] = newFieldNames.zipWithIndex map { + case (name, index) => (name, inputType.getTypeAt(index)) + } + + val inputFields = inputType.getFieldNames + val fieldMappings = inputFields.zip(resultFields) + val expressions: Array[Expression] = fieldMappings map { + case (oldName, (newName, tpe)) => Naming(ResolvedFieldReference(oldName, tpe), newName) + } + + createTable(repr, inputType, expressions, resultFields) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/operations.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/operations.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/operations.scala new file mode 100644 index 0000000..5b80570 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/operations.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.operations + +import org.apache.flink.api.table.tree.Expression +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.aggregation.Aggregations + +/** + * Base class for all Table API operations. + */ +sealed abstract class Operation { + def outputFields: Seq[(String, TypeInformation[_])] +} + +/** + * Operation that transforms a [[org.apache.flink.api.scala.DataSet]] or + * [[org.apache.flink.streaming.api.scala.DataStream]] into a [[org.apache.flink.api.table.Table]]. + */ +case class Root[T](input: T, outputFields: Seq[(String, TypeInformation[_])]) extends Operation + +/** + * Operation that joins two [[org.apache.flink.api.table.Table]]s. A "filter" and a "select" + * should be applied after a join operation. + */ +case class Join(left: Operation, right: Operation) extends Operation { + def outputFields = left.outputFields ++ right.outputFields + + override def toString = s"Join($left, $right)" +} + +/** + * Operation that filters out elements that do not match the predicate expression. + */ +case class Filter(input: Operation, predicate: Expression) extends Operation { + def outputFields = input.outputFields + + override def toString = s"Filter($input, $predicate)" +} + +/** + * Selection expression. Similar to an SQL SELECT statement. The expressions can select fields + * and perform arithmetic or logic operations. The expressions can also perform aggregates + * on fields. + */ +case class Select(input: Operation, selection: Seq[Expression]) extends Operation { + def outputFields = selection.toSeq map { e => (e.name, e.typeInfo) } + + override def toString = s"Select($input, ${selection.mkString(",")})" +} + +/** + * Operation that gives new names to fields. Use this to disambiguate fields before a join + * operation. + */ +case class As(input: Operation, names: Seq[String]) extends Operation { + val outputFields = input.outputFields.zip(names) map { + case ((_, tpe), newName) => (newName, tpe) + } + + override def toString = s"As($input, ${names.mkString(",")})" +} + +/** + * Grouping operation. Keys are specified using field references. A group by operation os only + * useful when performing a select with aggregates afterwards. + * @param input + * @param fields + */ +case class GroupBy(input: Operation, fields: Seq[Expression]) extends Operation { + def outputFields = input.outputFields + + override def toString = s"GroupBy($input, ${fields.mkString(",")})" +} + +/** + * Internal operation. Selection operations containing aggregates are expanded to an [[Aggregate]] + * and a simple [[Select]]. + */ +case class Aggregate( + input: Operation, + aggregations: Seq[(String, Aggregations)]) extends Operation { + def outputFields = input.outputFields + + override def toString = s"Aggregate($input, ${aggregations.mkString(",")})" +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/package.scala new file mode 100644 index 0000000..0f75424 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/operations/package.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table + +/** + * The operations in this package are created by calling methods on [[Table]] they + * should not be manually created by users of the API. + */ +package object operations http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala new file mode 100644 index 0000000..bdcb22c --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/package.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api + +/** + * == Table API == + * + * This package contains the generic part of the Table API. It can be used with Flink Streaming + * and Flink Batch. From Scala as well as from Java. + * + * When using the Table API, as user creates a [[org.apache.flink.api.table.Table]] from + * a DataSet or DataStream. On this relational operations can be performed. A table can also + * be converted back to a DataSet or DataStream. + * + * Packages [[org.apache.flink.api.scala.table]] and [[org.apache.flink.api.java.table]] contain + * the language specific part of the API. Refer to these packages for documentation on how + * the Table API can be used in Java and Scala. + */ +package object table http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala new file mode 100644 index 0000000..a0bc2b9 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/parser/ExpressionParser.scala @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.parser + +import org.apache.flink.api.table.ExpressionException +import org.apache.flink.api.table.operations.As +import org.apache.flink.api.table.tree._ + +import scala.util.parsing.combinator.{PackratParsers, JavaTokenParsers} + +/** + * Parser for expressions inside a String. This parses exactly the same expressions that + * would be accepted by the Scala Expression DSL. + * + * See [[org.apache.flink.api.scala.expressions.ImplicitExpressionConversions]] and + * [[org.apache.flink.api.scala.expressions.ImplicitExpressionOperations]] for the constructs + * available in the Scala Expression DSL. This parser must be kept in sync with the Scala DSL + * lazy valined in the above files. + */ +object ExpressionParser extends JavaTokenParsers with PackratParsers { + + // Literals + + lazy val numberLiteral: PackratParser[Expression] = + ((wholeNumber <~ ("L" | "l")) | floatingPointNumber | decimalNumber | wholeNumber) ^^ { + str => + if (str.endsWith("L") || str.endsWith("l")) { + Literal(str.toLong) + } else if (str.matches("""-?\d+""")) { + Literal(str.toInt) + } else if (str.endsWith("f") | str.endsWith("F")) { + Literal(str.toFloat) + } else { + Literal(str.toDouble) + } + } + + lazy val singleQuoteStringLiteral: Parser[Expression] = + ("'" + """([^'\p{Cntrl}\\]|\\[\\'"bfnrt]|\\u[a-fA-F0-9]{4})*""" + "'").r ^^ { + str => Literal(str.substring(1, str.length - 1)) + } + + lazy val stringLiteralFlink: PackratParser[Expression] = super.stringLiteral ^^ { + str => Literal(str.substring(1, str.length - 1)) + } + + lazy val boolLiteral: PackratParser[Expression] = ("true" | "false") ^^ { + str => Literal(str.toBoolean) + } + + lazy val literalExpr: PackratParser[Expression] = + numberLiteral | + stringLiteralFlink | singleQuoteStringLiteral | + boolLiteral + + lazy val fieldReference: PackratParser[Expression] = ident ^^ { + case sym => UnresolvedFieldReference(sym) + } + + lazy val atom: PackratParser[Expression] = + ( "(" ~> expression <~ ")" ) | literalExpr | fieldReference + + // suffix ops + lazy val isNull: PackratParser[Expression] = atom <~ ".isNull" ^^ { e => IsNull(e) } + lazy val isNotNull: PackratParser[Expression] = atom <~ ".isNotNull" ^^ { e => IsNotNull(e) } + + lazy val abs: PackratParser[Expression] = atom <~ ".abs" ^^ { e => Abs(e) } + + lazy val sum: PackratParser[Expression] = atom <~ ".sum" ^^ { e => Sum(e) } + lazy val min: PackratParser[Expression] = atom <~ ".min" ^^ { e => Min(e) } + lazy val max: PackratParser[Expression] = atom <~ ".max" ^^ { e => Max(e) } + lazy val count: PackratParser[Expression] = atom <~ ".count" ^^ { e => Count(e) } + lazy val avg: PackratParser[Expression] = atom <~ ".avg" ^^ { e => Avg(e) } + + lazy val as: PackratParser[Expression] = atom ~ ".as(" ~ fieldReference ~ ")" ^^ { + case e ~ _ ~ as ~ _ => Naming(e, as.name) + } + + lazy val substring: PackratParser[Expression] = + atom ~ ".substring(" ~ expression ~ "," ~ expression ~ ")" ^^ { + case e ~ _ ~ from ~ _ ~ to ~ _ => Substring(e, from, to) + + } + + lazy val substringWithoutEnd: PackratParser[Expression] = + atom ~ ".substring(" ~ expression ~ ")" ^^ { + case e ~ _ ~ from ~ _ => Substring(e, from, Literal(Integer.MAX_VALUE)) + + } + + lazy val suffix = + isNull | isNotNull | + abs | sum | min | max | count | avg | + substring | substringWithoutEnd | atom + + + // unary ops + + lazy val unaryNot: PackratParser[Expression] = "!" ~> suffix ^^ { e => Not(e) } + + lazy val unaryMinus: PackratParser[Expression] = "-" ~> suffix ^^ { e => UnaryMinus(e) } + + lazy val unaryBitwiseNot: PackratParser[Expression] = "~" ~> suffix ^^ { e => BitwiseNot(e) } + + lazy val unary = unaryNot | unaryMinus | unaryBitwiseNot | suffix + + // binary bitwise opts + + lazy val binaryBitwise = unary * ( + "&" ^^^ { (a:Expression, b:Expression) => BitwiseAnd(a,b) } | + "|" ^^^ { (a:Expression, b:Expression) => BitwiseOr(a,b) } | + "^" ^^^ { (a:Expression, b:Expression) => BitwiseXor(a,b) } ) + + // arithmetic + + lazy val product = binaryBitwise * ( + "*" ^^^ { (a:Expression, b:Expression) => Mul(a,b) } | + "/" ^^^ { (a:Expression, b:Expression) => Div(a,b) } | + "%" ^^^ { (a:Expression, b:Expression) => Mod(a,b) } ) + + lazy val term = product * ( + "+" ^^^ { (a:Expression, b:Expression) => Plus(a,b) } | + "-" ^^^ { (a:Expression, b:Expression) => Minus(a,b) } ) + + // Comparison + + lazy val equalTo: PackratParser[Expression] = term ~ "===" ~ term ^^ { + case l ~ _ ~ r => EqualTo(l, r) + } + + lazy val equalToAlt: PackratParser[Expression] = term ~ "=" ~ term ^^ { + case l ~ _ ~ r => EqualTo(l, r) + } + + lazy val notEqualTo: PackratParser[Expression] = term ~ "!==" ~ term ^^ { + case l ~ _ ~ r => NotEqualTo(l, r) + } + + lazy val greaterThan: PackratParser[Expression] = term ~ ">" ~ term ^^ { + case l ~ _ ~ r => GreaterThan(l, r) + } + + lazy val greaterThanOrEqual: PackratParser[Expression] = term ~ ">=" ~ term ^^ { + case l ~ _ ~ r => GreaterThanOrEqual(l, r) + } + + lazy val lessThan: PackratParser[Expression] = term ~ "<" ~ term ^^ { + case l ~ _ ~ r => LessThan(l, r) + } + + lazy val lessThanOrEqual: PackratParser[Expression] = term ~ "<=" ~ term ^^ { + case l ~ _ ~ r => LessThanOrEqual(l, r) + } + + lazy val comparison: PackratParser[Expression] = + equalTo | equalToAlt | notEqualTo | + greaterThan | greaterThanOrEqual | + lessThan | lessThanOrEqual | term + + // logic + + lazy val logic = comparison * ( + "&&" ^^^ { (a:Expression, b:Expression) => And(a,b) } | + "||" ^^^ { (a:Expression, b:Expression) => Or(a,b) } ) + + // alias + + lazy val alias: PackratParser[Expression] = logic ~ "as" ~ fieldReference ^^ { + case e ~ _ ~ name => Naming(e, name.name) + } | logic + + lazy val expression: PackratParser[Expression] = alias + + lazy val expressionList: Parser[List[Expression]] = rep1sep(expression, ",") + + def parseExpressionList(expression: String): List[Expression] = { + parseAll(expressionList, expression) match { + case Success(lst, _) => lst + + case Failure(msg, _) => throw new ExpressionException("Could not parse expression: " + msg) + + case Error(msg, _) => throw new ExpressionException("Could not parse expression: " + msg) + } + } + + def parseExpression(exprString: String): Expression = { + parseAll(expression, exprString) match { + case Success(lst, _) => lst + + case fail => + throw new ExpressionException("Could not parse expression: " + fail.toString) + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala new file mode 100644 index 0000000..7e9bc0d --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime + +import org.apache.flink.api.table.Row +import org.apache.flink.api.common.functions.RichGroupReduceFunction +import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable +import org.apache.flink.api.java.aggregation.AggregationFunction +import org.apache.flink.configuration.Configuration +import org.apache.flink.util.Collector + +@Combinable +class ExpressionAggregateFunction( + private val fieldPositions: Seq[Int], + private val functions: Seq[AggregationFunction[Any]]) + extends RichGroupReduceFunction[Row, Row] { + + override def open(conf: Configuration): Unit = { + var i = 0 + val len = functions.length + while (i < len) { + functions(i).initializeAggregate() + i += 1 + } + } + + override def reduce(in: java.lang.Iterable[Row], out: Collector[Row]): Unit = { + + val fieldPositions = this.fieldPositions + val functions = this.functions + + var current: Row = null + + val values = in.iterator() + while (values.hasNext) { + current = values.next() + + var i = 0 + val len = functions.length + while (i < len) { + functions(i).aggregate(current.productElement(fieldPositions(i))) + i += 1 + } + } + + var i = 0 + val len = functions.length + while (i < len) { + current.setField(fieldPositions(i), functions(i).getAggregate) + functions(i).initializeAggregate() + i += 1 + } + + out.collect(current) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala new file mode 100644 index 0000000..b0e2d05 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime + +import org.apache.flink.api.table.codegen.GenerateUnaryPredicate +import org.apache.flink.api.table.tree.{NopExpression, Expression} +import org.apache.flink.api.common.functions.RichFilterFunction +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.configuration.Configuration + +class ExpressionFilterFunction[T]( + predicate: Expression, + inputType: CompositeType[T]) extends RichFilterFunction[T] { + + var compiledPredicate: (T) => Boolean = null + + override def open(config: Configuration): Unit = { + if (compiledPredicate == null) { + compiledPredicate = predicate match { + case n: NopExpression => null + case _ => + val codegen = new GenerateUnaryPredicate[T]( + inputType, + predicate, + getRuntimeContext.getUserCodeClassLoader) + codegen.generate() + } + } + } + + override def filter(in: T) = compiledPredicate(in) +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala new file mode 100644 index 0000000..f0f5636 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime + +import org.apache.flink.api.table.tree.{NopExpression, Expression} +import org.apache.flink.api.common.functions.RichFlatJoinFunction +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.table.codegen.{GenerateBinaryResultAssembler, +GenerateBinaryPredicate} +import org.apache.flink.configuration.Configuration +import org.apache.flink.util.Collector + +class ExpressionJoinFunction[L, R, O]( + predicate: Expression, + leftType: CompositeType[L], + rightType: CompositeType[R], + resultType: CompositeType[O], + outputFields: Seq[Expression]) extends RichFlatJoinFunction[L, R, O] { + + var compiledPredicate: (L, R) => Boolean = null + var resultAssembler: (L, R, O) => O = null + var result: O = null.asInstanceOf[O] + + override def open(config: Configuration): Unit = { + result = resultType.createSerializer(getRuntimeContext.getExecutionConfig).createInstance() + if (compiledPredicate == null) { + compiledPredicate = predicate match { + case n: NopExpression => null + case _ => + val codegen = new GenerateBinaryPredicate[L, R]( + leftType, + rightType, + predicate, + getRuntimeContext.getUserCodeClassLoader) + codegen.generate() + } + } + + if (resultAssembler == null) { + val resultCodegen = new GenerateBinaryResultAssembler[L, R, O]( + leftType, + rightType, + resultType, + outputFields, + getRuntimeContext.getUserCodeClassLoader) + + resultAssembler = resultCodegen.generate() + } + } + + def join(left: L, right: R, out: Collector[O]) = { + if (compiledPredicate == null) { + result = resultAssembler(left, right, result) + out.collect(result) + } else { + if (compiledPredicate(left, right)) { + result = resultAssembler(left, right, result) + out.collect(result) } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala new file mode 100644 index 0000000..0a2830b --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.runtime + +import org.apache.flink.api.table.tree.Expression +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.table.codegen.GenerateUnaryResultAssembler +import org.apache.flink.configuration.Configuration + +class ExpressionSelectFunction[I, O]( + inputType: CompositeType[I], + resultType: CompositeType[O], + outputFields: Seq[Expression]) extends RichMapFunction[I, O] { + + var resultAssembler: (I, O) => O = null + var result: O = null.asInstanceOf[O] + + override def open(config: Configuration): Unit = { + result = resultType.createSerializer(getRuntimeContext.getExecutionConfig).createInstance() + + if (resultAssembler == null) { + val resultCodegen = new GenerateUnaryResultAssembler[I, O]( + inputType, + resultType, + outputFields, + getRuntimeContext.getUserCodeClassLoader) + + resultAssembler = resultCodegen.generate() + } + } + + def map(in: I): O = { + resultAssembler(in, result) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala new file mode 100644 index 0000000..a1bc4b7 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table + +/** + * The functions in this package are used transforming Table API operations to Java API operations. + */ +package object runtime http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/Expression.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/Expression.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/Expression.scala new file mode 100644 index 0000000..6302572 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/Expression.scala @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.tree + +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, TypeInformation} + +import scala.language.postfixOps + + +abstract class Expression extends Product { + def children: Seq[Expression] + def name: String = Expression.freshName("expression") + def typeInfo: TypeInformation[_] + + /** + * Tests for equality by first testing for reference equality. + */ + def fastEquals(other: Expression): Boolean = this.eq(other) || this == other + + def transformPre(rule: PartialFunction[Expression, Expression]): Expression = { + val afterTransform = rule.applyOrElse(this, identity[Expression]) + + if (afterTransform fastEquals this) { + this.transformChildrenPre(rule) + } else { + afterTransform.transformChildrenPre(rule) + } + } + + def transformChildrenPre(rule: PartialFunction[Expression, Expression]): Expression = { + var changed = false + val newArgs = productIterator map { + case child: Expression if children.contains(child) => + val newChild = child.transformPre(rule) + if (newChild fastEquals child) { + child + } else { + changed = true + newChild + } + case other: AnyRef => other + case null => null + } toArray + + if (changed) makeCopy(newArgs) else this + } + + def transformPost(rule: PartialFunction[Expression, Expression]): Expression = { + val afterChildren = transformChildrenPost(rule) + if (afterChildren fastEquals this) { + rule.applyOrElse(this, identity[Expression]) + } else { + rule.applyOrElse(afterChildren, identity[Expression]) + } + } + + def transformChildrenPost(rule: PartialFunction[Expression, Expression]): Expression = { + var changed = false + val newArgs = productIterator map { + case child: Expression if children.contains(child) => + val newChild = child.transformPost(rule) + if (newChild fastEquals child) { + child + } else { + changed = true + newChild + } + case other: AnyRef => other + case null => null + } toArray + // toArray forces evaluation, toSeq does not seem to work here + + if (changed) makeCopy(newArgs) else this + } + + def exists(predicate: Expression => Boolean): Boolean = { + var exists = false + this.transformPre { + case e: Expression => if (predicate(e)) { + exists = true + } + e + } + exists + } + + /** + * Creates a new copy of this expression with new children. This is used during transformation + * if children change. This must be overridden by Expressions that don't have the Constructor + * arguments in the same order as the `children`. + */ + def makeCopy(newArgs: Seq[AnyRef]): this.type = { + val defaultCtor = + this.getClass.getConstructors.find { _.getParameterTypes.size > 0}.head + try { + defaultCtor.newInstance(newArgs.toArray: _*).asInstanceOf[this.type] + } catch { + case iae: IllegalArgumentException => + println("IAE " + this) + throw new RuntimeException("Should never happen.") + } + } +} + +abstract class BinaryExpression() extends Expression { + def left: Expression + def right: Expression + def children = Seq(left, right) +} + +abstract class UnaryExpression() extends Expression { + def child: Expression + def children = Seq(child) +} + +abstract class LeafExpression() extends Expression { + val children = Nil +} + +case class NopExpression() extends LeafExpression { + val typeInfo = new NothingTypeInfo() + override val name = Expression.freshName("nop") + +} + +object Expression { + def freshName(prefix: String): String = { + s"$prefix-${freshNameCounter.getAndIncrement}" + } + + val freshNameCounter = new AtomicInteger +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/aggregations.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/aggregations.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/aggregations.scala new file mode 100644 index 0000000..e5cdac5 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/aggregations.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.tree + +import org.apache.flink.api.table.ExpressionException +import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.java.aggregation.Aggregations + + +abstract sealed class Aggregation extends UnaryExpression { + def typeInfo = { + child.typeInfo match { + case BasicTypeInfo.LONG_TYPE_INFO => // ok + case BasicTypeInfo.INT_TYPE_INFO => + case BasicTypeInfo.DOUBLE_TYPE_INFO => + case BasicTypeInfo.FLOAT_TYPE_INFO => + case BasicTypeInfo.BYTE_TYPE_INFO => + case BasicTypeInfo.SHORT_TYPE_INFO => + case _ => + throw new ExpressionException(s"Unsupported type ${child.typeInfo} for " + + s"aggregation $this. Only numeric data types supported.") + } + child.typeInfo + } + + override def toString = s"Aggregate($child)" + + def getIntermediateFields: Seq[Expression] + def getFinalField(inputs: Seq[Expression]): Expression + def getAggregations: Seq[Aggregations] +} + +case class Sum(child: Expression) extends Aggregation { + override def toString = s"($child).sum" + + override def getIntermediateFields: Seq[Expression] = Seq(child) + override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0) + override def getAggregations = Seq(Aggregations.SUM) +} + +case class Min(child: Expression) extends Aggregation { + override def toString = s"($child).min" + + override def getIntermediateFields: Seq[Expression] = Seq(child) + override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0) + override def getAggregations = Seq(Aggregations.MIN) + +} + +case class Max(child: Expression) extends Aggregation { + override def toString = s"($child).max" + + override def getIntermediateFields: Seq[Expression] = Seq(child) + override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0) + override def getAggregations = Seq(Aggregations.MAX) +} + +case class Count(child: Expression) extends Aggregation { + override def typeInfo = { + child.typeInfo match { + case _ => // we can count anything... :D + } + BasicTypeInfo.INT_TYPE_INFO + } + + override def toString = s"($child).count" + + override def getIntermediateFields: Seq[Expression] = Seq(Literal(Integer.valueOf(1))) + override def getFinalField(inputs: Seq[Expression]): Expression = inputs(0) + override def getAggregations = Seq(Aggregations.SUM) + +} + +case class Avg(child: Expression) extends Aggregation { + override def toString = s"($child).avg" + + override def getIntermediateFields: Seq[Expression] = Seq(child, Literal(1)) + // This is just sweet. Use our own AST representation and let the code generator do + // our dirty work. + override def getFinalField(inputs: Seq[Expression]): Expression = + Div(inputs(0), inputs(1)) + override def getAggregations = Seq(Aggregations.SUM, Aggregations.SUM) + +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/arithmetic.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/arithmetic.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/arithmetic.scala new file mode 100644 index 0000000..84f9b18 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/arithmetic.scala @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.tree + +import org.apache.flink.api.table.ExpressionException +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo, NumericTypeInfo, TypeInformation} + +abstract class BinaryArithmetic extends BinaryExpression { + def typeInfo = { + if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) { + throw new ExpressionException( + s"""Non-numeric operand ${left} of type ${left.typeInfo} in $this""") + } + if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) { + throw new ExpressionException( + s"""Non-numeric operand "${right}" of type ${right.typeInfo} in $this""") + } + if (left.typeInfo != right.typeInfo) { + throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " + + s"${right.typeInfo} in $this") + } + left.typeInfo + } +} + +case class Plus(left: Expression, right: Expression) extends BinaryArithmetic { + override def typeInfo = { + if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]] && + !(left.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) { + throw new ExpressionException(s"Non-numeric operand type ${left.typeInfo} in $this") + } + if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]] && + !(right.typeInfo == BasicTypeInfo.STRING_TYPE_INFO)) { + throw new ExpressionException(s"Non-numeric operand type ${right.typeInfo} in $this") + } + if (left.typeInfo != right.typeInfo) { + throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " + + s"${right.typeInfo} in $this") + } + left.typeInfo + } + + override def toString = s"($left + $right)" +} + +case class UnaryMinus(child: Expression) extends UnaryExpression { + def typeInfo = { + if (!child.typeInfo.isInstanceOf[NumericTypeInfo[_]]) { + throw new ExpressionException( + s"""Non-numeric operand ${child} of type ${child.typeInfo} in $this""") + } + child.typeInfo + } + + override def toString = s"-($child)" +} + +case class Minus(left: Expression, right: Expression) extends BinaryArithmetic { + override def toString = s"($left - $right)" +} + +case class Div(left: Expression, right: Expression) extends BinaryArithmetic { + override def toString = s"($left / $right)" +} + +case class Mul(left: Expression, right: Expression) extends BinaryArithmetic { + override def toString = s"($left * $right)" +} + +case class Mod(left: Expression, right: Expression) extends BinaryArithmetic { + override def toString = s"($left * $right)" +} + +case class Abs(child: Expression) extends UnaryExpression { + def typeInfo = child.typeInfo + + override def toString = s"abs($child)" +} + +abstract class BitwiseBinaryArithmetic extends BinaryExpression { + def typeInfo: TypeInformation[_] = { + if (!left.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { + throw new ExpressionException( + s"""Non-integer operand ${left} of type ${left.typeInfo} in $this""") + } + if (!right.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { + throw new ExpressionException( + s"""Non-integer operand "${right}" of type ${right.typeInfo} in $this""") + } + if (left.typeInfo != right.typeInfo) { + throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " + + s"${right.typeInfo} in $this") + } + if (left.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) { + left.typeInfo + } else { + BasicTypeInfo.INT_TYPE_INFO + } + } +} + +case class BitwiseAnd(left: Expression, right: Expression) extends BitwiseBinaryArithmetic { + override def toString = s"($left & $right)" +} + +case class BitwiseOr(left: Expression, right: Expression) extends BitwiseBinaryArithmetic { + override def toString = s"($left | $right)" +} + + +case class BitwiseXor(left: Expression, right: Expression) extends BitwiseBinaryArithmetic { + override def toString = s"($left ^ $right)" +} + +case class BitwiseNot(child: Expression) extends UnaryExpression { + def typeInfo: TypeInformation[_] = { + if (!child.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { + throw new ExpressionException( + s"""Non-integer operand ${child} of type ${child.typeInfo} in $this""") + } + if (child.typeInfo == BasicTypeInfo.LONG_TYPE_INFO) { + child.typeInfo + } else { + BasicTypeInfo.INT_TYPE_INFO + } + } + + override def toString = s"~($child)" +} + http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/cast.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/cast.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/cast.scala new file mode 100644 index 0000000..a3acc35 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/cast.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.tree + +import org.apache.flink.api.common.typeinfo.TypeInformation + +case class Cast(child: Expression, tpe: TypeInformation[_]) extends UnaryExpression { + def typeInfo = tpe +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/comparison.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/comparison.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/comparison.scala new file mode 100644 index 0000000..e0a34a9 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/comparison.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.tree + +import org.apache.flink.api.table.ExpressionException +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, NumericTypeInfo} + +abstract class BinaryComparison extends BinaryExpression { + def typeInfo = { + if (!left.typeInfo.isInstanceOf[NumericTypeInfo[_]]) { + throw new ExpressionException(s"Non-numeric operand ${left} in $this") + } + if (!right.typeInfo.isInstanceOf[NumericTypeInfo[_]]) { + throw new ExpressionException(s"Non-numeric operand ${right} in $this") + } + if (left.typeInfo != right.typeInfo) { + throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " + + s"${right.typeInfo} in $this") + } + BasicTypeInfo.BOOLEAN_TYPE_INFO + } +} + +case class EqualTo(left: Expression, right: Expression) extends BinaryComparison { + override def typeInfo = { + if (left.typeInfo != right.typeInfo) { + throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " + + s"${right.typeInfo} in $this") + } + BasicTypeInfo.BOOLEAN_TYPE_INFO + } + + override def toString = s"$left === $right" +} + +case class NotEqualTo(left: Expression, right: Expression) extends BinaryComparison { + override def typeInfo = { + if (left.typeInfo != right.typeInfo) { + throw new ExpressionException(s"Differing operand data types ${left.typeInfo} and " + + s"${right.typeInfo} in $this") + } + BasicTypeInfo.BOOLEAN_TYPE_INFO + } + + override def toString = s"$left !== $right" +} + +case class GreaterThan(left: Expression, right: Expression) extends BinaryComparison { + override def toString = s"$left > $right" +} + +case class GreaterThanOrEqual(left: Expression, right: Expression) extends BinaryComparison { + override def toString = s"$left >= $right" +} + +case class LessThan(left: Expression, right: Expression) extends BinaryComparison { + override def toString = s"$left < $right" +} + +case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryComparison { + override def toString = s"$left <= $right" +} + +case class IsNull(child: Expression) extends UnaryExpression { + def typeInfo = { + BasicTypeInfo.BOOLEAN_TYPE_INFO + } + + override def toString = s"($child).isNull" +} + +case class IsNotNull(child: Expression) extends UnaryExpression { + def typeInfo = { + BasicTypeInfo.BOOLEAN_TYPE_INFO + } + + override def toString = s"($child).isNotNull" +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/fieldExpression.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/fieldExpression.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/fieldExpression.scala new file mode 100644 index 0000000..cc42148 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/fieldExpression.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.tree + +import org.apache.flink.api.table.ExpressionException +import org.apache.flink.api.common.typeinfo.TypeInformation + +case class UnresolvedFieldReference(override val name: String) extends LeafExpression { + def typeInfo = throw new ExpressionException(s"Unresolved field reference: $this") + + override def toString = "\"" + name +} + +case class ResolvedFieldReference( + override val name: String, + tpe: TypeInformation[_]) extends LeafExpression { + def typeInfo = tpe + + override def toString = s"'$name" +} + +case class Naming(child: Expression, override val name: String) extends UnaryExpression { + def typeInfo = child.typeInfo + + override def toString = s"$child as '$name" +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/literals.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/literals.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/literals.scala new file mode 100644 index 0000000..852d5a1 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/literals.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.tree + +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.scala.table.ImplicitExpressionOperations + +object Literal { + def apply(l: Any): Literal = l match { + case i:Int => Literal(i, BasicTypeInfo.INT_TYPE_INFO) + case l:Long => Literal(l, BasicTypeInfo.LONG_TYPE_INFO) + case d: Double => Literal(d, BasicTypeInfo.DOUBLE_TYPE_INFO) + case f: Float => Literal(f, BasicTypeInfo.FLOAT_TYPE_INFO) + case str: String => Literal(str, BasicTypeInfo.STRING_TYPE_INFO) + case bool: Boolean => Literal(bool, BasicTypeInfo.BOOLEAN_TYPE_INFO) + } +} + +case class Literal(value: Any, tpe: TypeInformation[_]) + extends LeafExpression with ImplicitExpressionOperations { + def expr = this + def typeInfo = tpe + + override def toString = s"$value" +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/logic.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/logic.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/logic.scala new file mode 100644 index 0000000..8ab838d --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/logic.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.tree + +import org.apache.flink.api.table.ExpressionException +import org.apache.flink.api.common.typeinfo.BasicTypeInfo + +abstract class BinaryPredicate extends BinaryExpression { + def typeInfo = { + if (left.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO || + right.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) { + throw new ExpressionException(s"Non-boolean operand types ${left.typeInfo} and " + + s"${right.typeInfo} in $this") + } + BasicTypeInfo.BOOLEAN_TYPE_INFO + } +} + +case class Not(child: Expression) extends UnaryExpression { + def typeInfo = { + if (child.typeInfo != BasicTypeInfo.BOOLEAN_TYPE_INFO) { + throw new ExpressionException(s"Non-boolean operand type ${child.typeInfo} in $this") + } + BasicTypeInfo.BOOLEAN_TYPE_INFO + } + + override val name = Expression.freshName("not-" + child.name) + + override def toString = s"!($child)" +} + +case class And(left: Expression, right: Expression) extends BinaryPredicate { + override def toString = s"$left && $right" + + override val name = Expression.freshName(left.name + "-and-" + right.name) +} + +case class Or(left: Expression, right: Expression) extends BinaryPredicate { + override def toString = s"$left || $right" + + override val name = Expression.freshName(left.name + "-or-" + right.name) + +} http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/package.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/package.scala new file mode 100644 index 0000000..caac402 --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/package.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table + +/** + * This package contains the base class of AST nodes and all the expression language AST classes. + * Expression trees should not be manually constructed by users. They are implicitly constructed + * from the implicit DSL conversions in + * [[org.apache.flink.api.scala.expressions.ImplicitExpressionConversions]] and + * [[org.apache.flink.api.scala.expressions.ImplicitExpressionOperations]]. For the Java API, + * expression trees should be generated from a string parser that parses expressions and creates + * AST nodes. + */ +package object tree http://git-wip-us.apache.org/repos/asf/flink/blob/c9519c8d/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/stringExpressions.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/stringExpressions.scala b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/stringExpressions.scala new file mode 100644 index 0000000..e14374f --- /dev/null +++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/tree/stringExpressions.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.tree + +import org.apache.flink.api.table.ExpressionException +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, IntegerTypeInfo} + +case class Substring( + str: Expression, + beginIndex: Expression, + endIndex: Expression) extends Expression { + def typeInfo = { + if (str.typeInfo != BasicTypeInfo.STRING_TYPE_INFO) { + throw new ExpressionException( + s"""Operand must be of type String in $this, is ${str.typeInfo}.""") + } + if (!beginIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { + throw new ExpressionException( + s"""Begin index must be an integer type in $this, is ${beginIndex.typeInfo}.""") + } + if (!endIndex.typeInfo.isInstanceOf[IntegerTypeInfo[_]]) { + throw new ExpressionException( + s"""End index must be an integer type in $this, is ${endIndex.typeInfo}.""") + } + + BasicTypeInfo.STRING_TYPE_INFO + } + + override def children: Seq[Expression] = Seq(str, beginIndex, endIndex) + override def toString = s"($str).substring($beginIndex, $endIndex)" +}