[FLINK-5662] [table] Rework internal type handling of Table API This closes #3271.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bc6b225 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bc6b225 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bc6b225 Branch: refs/heads/master Commit: 6bc6b225e55095eb8797db2903b0546410e7fdd9 Parents: 1ce10c8 Author: twalthr <twal...@apache.org> Authored: Mon Feb 6 17:18:08 2017 +0100 Committer: twalthr <twal...@apache.org> Committed: Mon Feb 13 17:50:00 2017 +0100 ---------------------------------------------------------------------- .../flink/table/api/BatchTableEnvironment.scala | 34 ++- .../table/api/StreamTableEnvironment.scala | 41 ++-- .../apache/flink/table/api/TableConfig.scala | 24 -- .../flink/table/api/TableEnvironment.scala | 130 +++++++++- .../flink/table/calcite/FlinkTypeFactory.scala | 17 +- .../flink/table/codegen/CodeGenerator.scala | 33 +-- .../flink/table/codegen/ExpressionReducer.scala | 11 +- .../apache/flink/table/codegen/generated.scala | 25 +- .../flink/table/plan/logical/operators.scala | 18 +- .../table/plan/nodes/CommonAggregate.scala | 69 ++++++ .../flink/table/plan/nodes/CommonCalc.scala | 152 ++++++++++++ .../table/plan/nodes/CommonCorrelate.scala | 229 ++++++++++++++++++ .../flink/table/plan/nodes/CommonScan.scala | 82 +++++++ .../flink/table/plan/nodes/FlinkAggregate.scala | 69 ------ .../flink/table/plan/nodes/FlinkCalc.scala | 172 ------------- .../flink/table/plan/nodes/FlinkCorrelate.scala | 233 ------------------ .../flink/table/plan/nodes/FlinkRel.scala | 37 --- .../table/plan/nodes/dataset/BatchScan.scala | 61 ++--- .../nodes/dataset/BatchTableSourceScan.scala | 7 +- .../plan/nodes/dataset/DataSetAggregate.scala | 82 ++----- .../table/plan/nodes/dataset/DataSetCalc.scala | 27 +-- .../plan/nodes/dataset/DataSetCorrelate.scala | 12 +- .../plan/nodes/dataset/DataSetIntersect.scala | 57 +---- .../table/plan/nodes/dataset/DataSetJoin.scala | 32 ++- .../table/plan/nodes/dataset/DataSetMinus.scala | 57 +---- .../table/plan/nodes/dataset/DataSetRel.scala | 17 +- .../table/plan/nodes/dataset/DataSetScan.scala | 8 +- .../nodes/dataset/DataSetSingleRowJoin.scala | 48 ++-- .../table/plan/nodes/dataset/DataSetSort.scala | 49 +--- .../table/plan/nodes/dataset/DataSetUnion.scala | 24 +- .../plan/nodes/dataset/DataSetValues.scala | 18 +- .../nodes/dataset/DataSetWindowAggregate.scala | 68 ++---- .../nodes/datastream/DataStreamAggregate.scala | 239 ++++++++----------- .../plan/nodes/datastream/DataStreamCalc.scala | 27 +-- .../nodes/datastream/DataStreamCorrelate.scala | 14 +- .../plan/nodes/datastream/DataStreamRel.scala | 15 +- .../plan/nodes/datastream/DataStreamScan.scala | 10 +- .../plan/nodes/datastream/DataStreamUnion.scala | 6 +- .../nodes/datastream/DataStreamValues.scala | 23 +- .../plan/nodes/datastream/StreamScan.scala | 84 ++----- .../datastream/StreamTableSourceScan.scala | 12 +- .../table/runtime/aggregate/AggregateUtil.scala | 120 +++++----- .../flink/table/typeutils/TypeConverter.scala | 156 ------------ .../api/java/batch/TableEnvironmentITCase.java | 7 +- .../scala/batch/TableEnvironmentITCase.scala | 4 +- .../batch/utils/TableProgramsTestBase.scala | 11 +- .../expressions/utils/ExpressionTestBase.scala | 16 +- 47 files changed, 1184 insertions(+), 1503 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala index dd0487a..2dec00e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala @@ -23,11 +23,12 @@ import _root_.java.util.concurrent.atomic.AtomicInteger import org.apache.calcite.plan.RelOptPlanner.CannotPlanException import org.apache.calcite.plan.RelOptUtil import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.sql2rel.RelDecorrelator import org.apache.calcite.tools.{Programs, RuleSet} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.io.DiscardingOutputFormat -import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.api.java.typeutils.GenericTypeInfo import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} import org.apache.flink.table.explain.PlanJsonParser import org.apache.flink.table.expressions.Expression @@ -135,7 +136,7 @@ abstract class BatchTableEnvironment( private[flink] def explain(table: Table, extended: Boolean): String = { val ast = table.getRelNode val optimizedPlan = optimize(ast) - val dataSet = translate[Row](optimizedPlan) (TypeExtractor.createTypeInfo(classOf[Row])) + val dataSet = translate[Row](optimizedPlan, ast.getRowType) (new GenericTypeInfo(classOf[Row])) dataSet.output(new DiscardingOutputFormat[Row]) val env = dataSet.getExecutionEnvironment val jasonSqlPlan = env.getExecutionPlan @@ -250,28 +251,39 @@ abstract class BatchTableEnvironment( * @return The [[DataSet]] that corresponds to the translated [[Table]]. */ protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = { - val dataSetPlan = optimize(table.getRelNode) - translate(dataSetPlan) + val relNode = table.getRelNode + val dataSetPlan = optimize(relNode) + translate(dataSetPlan, relNode.getRowType) } /** - * Translates a logical [[RelNode]] into a [[DataSet]]. + * Translates a logical [[RelNode]] into a [[DataSet]]. Converts to target type if necessary. * * @param logicalPlan The root node of the relational expression tree. + * @param logicalType The row type of the result. Since the logicalPlan can lose the + * field naming during optimization we pass the row type separately. * @param tpe The [[TypeInformation]] of the resulting [[DataSet]]. * @tparam A The type of the resulting [[DataSet]]. * @return The [[DataSet]] that corresponds to the translated [[Table]]. */ - protected def translate[A](logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataSet[A] = { + protected def translate[A]( + logicalPlan: RelNode, + logicalType: RelDataType) + (implicit tpe: TypeInformation[A]): DataSet[A] = { TableEnvironment.validateType(tpe) logicalPlan match { case node: DataSetRel => - node.translateToPlan( - this, - Some(tpe.asInstanceOf[TypeInformation[Any]]) - ).asInstanceOf[DataSet[A]] - case _ => ??? + val plan = node.translateToPlan(this) + val conversion = sinkConversion(plan.getType, logicalType, tpe, "DataSetSinkConversion") + conversion match { + case None => plan.asInstanceOf[DataSet[A]] // no conversion necessary + case Some(mapFunction) => plan.map(mapFunction).name(s"to: $tpe") + } + + case _ => + throw TableException("Cannot generate DataSet due to an invalid logical plan. " + + "This is a bug and should not happen. Please file an issue.") } } } http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index 81e884d..19c4af1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -23,10 +23,11 @@ import _root_.java.util.concurrent.atomic.AtomicInteger import org.apache.calcite.plan.RelOptPlanner.CannotPlanException import org.apache.calcite.plan.RelOptUtil import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.sql2rel.RelDecorrelator import org.apache.calcite.tools.{Programs, RuleSet} import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.api.java.typeutils.{GenericTypeInfo, RowTypeInfo} import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.explain.PlanJsonParser @@ -200,11 +201,11 @@ abstract class StreamTableEnvironment( dataStream: DataStream[T], fields: Array[Expression]): Unit = { - val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType, fields.toArray) + val (fieldNames, fieldIndexes) = getFieldInfo[T](dataStream.getType, fields) val dataStreamTable = new DataStreamTable[T]( dataStream, - fieldIndexes.toArray, - fieldNames.toArray + fieldIndexes, + fieldNames ) registerTableInternal(name, dataStreamTable) } @@ -255,30 +256,40 @@ abstract class StreamTableEnvironment( * @return The [[DataStream]] that corresponds to the translated [[Table]]. */ protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = { - val dataStreamPlan = optimize(table.getRelNode) - translate(dataStreamPlan) + val relNode = table.getRelNode + val dataStreamPlan = optimize(relNode) + translate(dataStreamPlan, relNode.getRowType) } /** * Translates a logical [[RelNode]] into a [[DataStream]]. * * @param logicalPlan The root node of the relational expression tree. + * @param logicalType The row type of the result. Since the logicalPlan can lose the + * field naming during optimization we pass the row type separately. * @param tpe The [[TypeInformation]] of the resulting [[DataStream]]. * @tparam A The type of the resulting [[DataStream]]. * @return The [[DataStream]] that corresponds to the translated [[Table]]. */ - protected def translate[A] - (logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataStream[A] = { + protected def translate[A]( + logicalPlan: RelNode, + logicalType: RelDataType) + (implicit tpe: TypeInformation[A]): DataStream[A] = { TableEnvironment.validateType(tpe) logicalPlan match { case node: DataStreamRel => - node.translateToPlan( - this, - Some(tpe.asInstanceOf[TypeInformation[Any]]) - ).asInstanceOf[DataStream[A]] - case _ => ??? + val plan = node.translateToPlan(this) + val conversion = sinkConversion(plan.getType, logicalType, tpe, "DataStreamSinkConversion") + conversion match { + case None => plan.asInstanceOf[DataStream[A]] // no conversion necessary + case Some(mapFunction) => plan.map(mapFunction).name(s"to: $tpe") + } + + case _ => + throw TableException("Cannot generate DataStream due to an invalid logical plan. " + + "This is a bug and should not happen. Please file an issue.") } } @@ -291,7 +302,9 @@ abstract class StreamTableEnvironment( def explain(table: Table): String = { val ast = table.getRelNode val optimizedPlan = optimize(ast) - val dataStream = translate[Row](optimizedPlan)(TypeExtractor.createTypeInfo(classOf[Row])) + val dataStream = translate[Row]( + optimizedPlan, + ast.getRowType)(new GenericTypeInfo(classOf[Row])) val env = dataStream.getExecutionEnvironment val jsonSqlPlan = env.getExecutionPlan http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala index a8876a8..6448657 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableConfig.scala @@ -37,12 +37,6 @@ class TableConfig { private var nullCheck: Boolean = true /** - * Defines if efficient types (such as Tuple types or Atomic types) - * should be used within operators where possible. - */ - private var efficientTypeUsage = false - - /** * Defines the configuration of Calcite for Table API and SQL queries. */ private var calciteConfig = CalciteConfig.DEFAULT @@ -73,24 +67,6 @@ class TableConfig { } /** - * Returns the usage of efficient types. If enabled, efficient types (such as Tuple types - * or Atomic types) are used within operators where possible. - * - * NOTE: Currently, this is an experimental feature. - */ - def getEfficientTypeUsage = efficientTypeUsage - - /** - * Sets the usage of efficient types. If enabled, efficient types (such as Tuple types - * or Atomic types) are used within operators where possible. - * - * NOTE: Currently, this is an experimental feature. - */ - def setEfficientTypeUsage(efficientTypeUsage: Boolean): Unit = { - this.efficientTypeUsage = efficientTypeUsage - } - - /** * Returns the current configuration of Calcite for Table API and SQL queries. */ def getCalciteConfig: CalciteConfig = calciteConfig http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index bcff1fb..b36441a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -31,27 +31,30 @@ import org.apache.calcite.sql.SqlOperatorTable import org.apache.calcite.sql.parser.SqlParser import org.apache.calcite.sql.util.ChainedSqlOperatorTable import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RuleSet, RuleSets} +import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} +import org.apache.flink.api.java.typeutils._ import org.apache.flink.api.java.{ExecutionEnvironment => JavaBatchExecEnv} import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.api.scala.{ExecutionEnvironment => ScalaBatchExecEnv} import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaStreamExecEnv} import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecEnv} -import java.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv} +import org.apache.flink.table.api.java.{BatchTableEnvironment => JavaBatchTableEnv, StreamTableEnvironment => JavaStreamTableEnv} import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv, StreamTableEnvironment => ScalaStreamTableEnv} import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem} -import org.apache.flink.table.codegen.ExpressionReducer +import org.apache.flink.table.codegen.{CodeGenerator, ExpressionReducer} import org.apache.flink.table.expressions.{Alias, Expression, UnresolvedFieldReference} import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils.{checkForInstantiation, checkNotSingleton, createScalarSqlFunction, createTableSqlFunctions} import org.apache.flink.table.functions.{ScalarFunction, TableFunction} import org.apache.flink.table.plan.cost.DataSetCostFactory import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode} -import org.apache.flink.table.plan.schema.{RelTable, TableSourceTable} +import org.apache.flink.table.plan.schema.RelTable +import org.apache.flink.table.runtime.MapRunner import org.apache.flink.table.sinks.TableSink import org.apache.flink.table.sources.{DefinedFieldNames, TableSource} import org.apache.flink.table.validate.FunctionCatalog +import org.apache.flink.types.Row import _root_.scala.collection.JavaConverters._ @@ -410,7 +413,7 @@ abstract class TableEnvironment(val config: TableConfig) { } exprs.map { case UnresolvedFieldReference(name) => (0, name) - case _ => throw new TableException("Field reference expression expected.") + case _ => throw new TableException("Field reference expression requested.") } case t: TupleTypeInfo[A] => exprs.zipWithIndex.map { @@ -466,6 +469,123 @@ abstract class TableEnvironment(val config: TableConfig) { (fieldNames.toArray, fieldIndexes.toArray) } + /** + * Creates a final converter that maps the internal row type to external type. + * + * @param physicalRowTypeInfo the input of the sink + * @param logicalRowType the logical type with correct field names (esp. for POJO field mapping) + * @param requestedTypeInfo the output type of the sink + * @param functionName name of the map function. Must not be unique but has to be a + * valid Java class identifier. + */ + protected def sinkConversion[T]( + physicalRowTypeInfo: TypeInformation[Row], + logicalRowType: RelDataType, + requestedTypeInfo: TypeInformation[T], + functionName: String) + : Option[MapFunction[Row, T]] = { + + // validate that at least the field types of physical and logical type match + // we do that here to make sure that plan translation was correct + val logicalRowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(logicalRowType) + if (physicalRowTypeInfo != logicalRowTypeInfo) { + throw TableException("The field types of physical and logical row types do not match." + + "This is a bug and should not happen. Please file an issue.") + } + + // requested type is a generic Row, no conversion needed + if (requestedTypeInfo.isInstanceOf[GenericTypeInfo[_]] && + requestedTypeInfo.getTypeClass == classOf[Row]) { + return None + } + + // convert to type information + val logicalFieldTypes = logicalRowType.getFieldList.asScala map { relDataType => + FlinkTypeFactory.toTypeInfo(relDataType.getType) + } + // field names + val logicalFieldNames = logicalRowType.getFieldNames.asScala + + // validate requested type + if (requestedTypeInfo.getArity != logicalFieldTypes.length) { + throw new TableException("Arity of result does not match requested type.") + } + requestedTypeInfo match { + + // POJO type requested + case pt: PojoTypeInfo[_] => + logicalFieldNames.zip(logicalFieldTypes) foreach { + case (fName, fType) => + val pojoIdx = pt.getFieldIndex(fName) + if (pojoIdx < 0) { + throw new TableException(s"POJO does not define field name: $fName") + } + val requestedTypeInfo = pt.getTypeAt(pojoIdx) + if (fType != requestedTypeInfo) { + throw new TableException(s"Result field does not match requested type. " + + s"requested: $requestedTypeInfo; Actual: $fType") + } + } + + // Tuple/Case class/Row type requested + case tt: TupleTypeInfoBase[_] => + logicalFieldTypes.zipWithIndex foreach { + case (fieldTypeInfo, i) => + val requestedTypeInfo = tt.getTypeAt(i) + if (fieldTypeInfo != requestedTypeInfo) { + throw new TableException(s"Result field does not match requested type. " + + s"Requested: $requestedTypeInfo; Actual: $fieldTypeInfo") + } + } + + // Atomic type requested + case at: AtomicType[_] => + if (logicalFieldTypes.size != 1) { + throw new TableException(s"Requested result type is an atomic type but " + + s"result has more or less than a single field.") + } + val fieldTypeInfo = logicalFieldTypes.head + if (fieldTypeInfo != at) { + throw new TableException(s"Result field does not match requested type. " + + s"Requested: $at; Actual: $fieldTypeInfo") + } + + case _ => + throw new TableException(s"Unsupported result type: $requestedTypeInfo") + } + + // code generate MapFunction + val generator = new CodeGenerator( + config, + false, + physicalRowTypeInfo, + None, + None) + + val conversion = generator.generateConverterResultExpression( + requestedTypeInfo, + logicalFieldNames) + + val body = + s""" + |${conversion.code} + |return ${conversion.resultTerm}; + |""".stripMargin + + val genFunction = generator.generateFunction( + functionName, + classOf[MapFunction[Row, T]], + body, + requestedTypeInfo) + + val mapFunction = new MapRunner[Row, T]( + genFunction.name, + genFunction.code, + genFunction.returnType) + + Some(mapFunction) + } + } /** http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala index f3e2f91..251be14 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala @@ -28,7 +28,7 @@ import org.apache.calcite.sql.parser.SqlParserPos import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo +import org.apache.flink.api.java.typeutils.{ObjectArrayTypeInfo, RowTypeInfo} import org.apache.flink.api.java.typeutils.ValueTypeInfo._ import org.apache.flink.table.api.TableException import org.apache.flink.table.plan.schema.{CompositeRelDataType, GenericRelDataType} @@ -36,8 +36,10 @@ import org.apache.flink.table.typeutils.TimeIntervalTypeInfo import org.apache.flink.table.typeutils.TypeCheckUtils.isSimple import org.apache.flink.table.plan.schema.ArrayRelDataType import org.apache.flink.table.calcite.FlinkTypeFactory.typeInfoToSqlTypeName +import org.apache.flink.types.Row import scala.collection.mutable +import scala.collection.JavaConverters._ /** * Flink specific type factory that represents the interface between Flink's [[TypeInformation]] @@ -167,6 +169,19 @@ object FlinkTypeFactory { throw TableException(s"Type is not supported: $t") } + /** + * Converts a Calcite logical record into a Flink type information. + */ + def toInternalRowTypeInfo(logicalRowType: RelDataType): TypeInformation[Row] = { + // convert to type information + val logicalFieldTypes = logicalRowType.getFieldList.asScala map { relDataType => + FlinkTypeFactory.toTypeInfo(relDataType.getType) + } + // field names + val logicalFieldNames = logicalRowType.getFieldNames.asScala + new RowTypeInfo(logicalFieldTypes.toArray, logicalFieldNames.toArray) + } + def toTypeInfo(relDataType: RelDataType): TypeInformation[_] = relDataType.getSqlTypeName match { case BOOLEAN => BOOLEAN_TYPE_INFO case TINYINT => BYTE_TYPE_INFO http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala index d49d7a0..c679bd8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala @@ -40,8 +40,8 @@ import org.apache.flink.table.codegen.calls.FunctionGenerator import org.apache.flink.table.codegen.calls.ScalarOperators._ import org.apache.flink.table.functions.UserDefinedFunction import org.apache.flink.table.runtime.TableFunctionCollector -import org.apache.flink.table.typeutils.TypeConverter import org.apache.flink.table.typeutils.TypeCheckUtils._ +import org.apache.flink.types.Row import scala.collection.JavaConversions._ import scala.collection.mutable @@ -62,8 +62,8 @@ import scala.collection.mutable class CodeGenerator( config: TableConfig, nullableInput: Boolean, - input1: TypeInformation[Any], - input2: Option[TypeInformation[Any]] = None, + input1: TypeInformation[_ <: Any], + input2: Option[TypeInformation[_ <: Any]] = None, input1PojoFieldMapping: Option[Array[Int]] = None, input2PojoFieldMapping: Option[Array[Int]] = None) extends RexVisitor[GeneratedExpression] { @@ -112,7 +112,7 @@ class CodeGenerator( * @param config configuration that determines runtime behavior */ def this(config: TableConfig) = - this(config, false, TypeConverter.DEFAULT_ROW_TYPE, None, None) + this(config, false, new RowTypeInfo(), None, None) // set of member statements that will be added only once // we use a LinkedHashSet to keep the insertion order @@ -224,15 +224,16 @@ class CodeGenerator( * @param bodyCode code contents of the SAM (Single Abstract Method). Inputs, collector, or * output record can be accessed via the given term methods. * @param returnType expected return type - * @tparam T Flink Function to be generated. + * @tparam F Flink Function to be generated. + * @tparam T Return type of the Flink Function. * @return instance of GeneratedFunction */ - def generateFunction[T <: Function]( + def generateFunction[F <: Function, T <: Any]( name: String, - clazz: Class[T], + clazz: Class[F], bodyCode: String, - returnType: TypeInformation[Any]) - : GeneratedFunction[T] = { + returnType: TypeInformation[T]) + : GeneratedFunction[F, T] = { val funcName = newName(name) // Janino does not support generics, that's why we need @@ -298,14 +299,14 @@ class CodeGenerator( * valid Java class identifier. * @param records code for creating records * @param returnType expected return type - * @tparam T Flink Function to be generated. + * @tparam T Return type of the Flink Function. * @return instance of GeneratedFunction */ - def generateValuesInputFormat[T]( + def generateValuesInputFormat[T <: Row]( name: String, records: Seq[String], - returnType: TypeInformation[Any]) - : GeneratedFunction[GenericInputFormat[T]] = { + returnType: TypeInformation[T]) + : GeneratedInput[GenericInputFormat[T], T] = { val funcName = newName(name) addReusableOutRecord(returnType) @@ -343,7 +344,7 @@ class CodeGenerator( } """.stripMargin - GeneratedFunction[GenericInputFormat[T]](funcName, returnType, funcCode) + GeneratedInput(funcName, returnType, funcCode) } /** @@ -1094,7 +1095,7 @@ class CodeGenerator( // ---------------------------------------------------------------------------------------------- private def generateInputAccess( - inputType: TypeInformation[Any], + inputType: TypeInformation[_ <: Any], inputTerm: String, index: Int, pojoFieldMapping: Option[Array[Int]]) @@ -1122,7 +1123,7 @@ class CodeGenerator( } private def generateNullableInputFieldAccess( - inputType: TypeInformation[Any], + inputType: TypeInformation[_ <: Any], inputTerm: String, index: Int, pojoFieldMapping: Option[Array[Int]]) http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala index 94007de..0f1de21 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala @@ -24,11 +24,10 @@ import org.apache.calcite.plan.RelOptPlanner import org.apache.calcite.rex.{RexBuilder, RexNode} import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.typeutils.TypeConverter import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.types.Row import scala.collection.JavaConverters._ @@ -39,7 +38,7 @@ import scala.collection.JavaConverters._ class ExpressionReducer(config: TableConfig) extends RelOptPlanner.Executor with Compiler[MapFunction[Row, Row]] { - private val EMPTY_ROW_INFO = TypeConverter.DEFAULT_ROW_TYPE + private val EMPTY_ROW_INFO = new RowTypeInfo() private val EMPTY_ROW = new Row(0) override def reduce( @@ -82,14 +81,14 @@ class ExpressionReducer(config: TableConfig) resultType.getFieldNames, literals) - val generatedFunction = generator.generateFunction[MapFunction[Row, Row]]( + val generatedFunction = generator.generateFunction[MapFunction[Row, Row], Row]( "ExpressionReducer", classOf[MapFunction[Row, Row]], s""" |${result.code} |return ${result.resultTerm}; |""".stripMargin, - resultType.asInstanceOf[TypeInformation[Any]]) + resultType) val clazz = compile(getClass.getClassLoader, generatedFunction.name, generatedFunction.code) val function = clazz.newInstance() http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala index b4c293d..271f686 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/generated.scala @@ -18,6 +18,9 @@ package org.apache.flink.table.codegen +import org.apache.flink.api.common.functions +import org.apache.flink.api.common.functions.Function +import org.apache.flink.api.common.io.InputFormat import org.apache.flink.api.common.typeinfo.TypeInformation /** @@ -41,14 +44,32 @@ object GeneratedExpression { } /** - * Describes a generated [[org.apache.flink.api.common.functions.Function]] + * Describes a generated [[functions.Function]] * * @param name class name of the generated Function. * @param returnType the type information of the result type * @param code code of the generated Function. + * @tparam F type of function * @tparam T type of function */ -case class GeneratedFunction[T](name: String, returnType: TypeInformation[Any], code: String) +case class GeneratedFunction[F <: Function, T <: Any]( + name: String, + returnType: TypeInformation[T], + code: String) + +/** + * Describes a generated [[InputFormat]]. + * + * @param name class name of the generated input function. + * @param returnType the type information of the result type + * @param code code of the generated Function. + * @tparam F type of function + * @tparam T type of function + */ +case class GeneratedInput[F <: InputFormat[_, _], T <: Any]( + name: String, + returnType: TypeInformation[T], + code: String) /** * Describes a generated [[org.apache.flink.util.Collector]]. http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala index 3ba0285..20f810a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala @@ -22,14 +22,13 @@ import java.util import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.core.CorrelationId -import org.apache.calcite.rel.logical.{LogicalProject, LogicalTableFunctionScan} +import org.apache.calcite.rel.core.{CorrelationId, JoinRelType} +import org.apache.calcite.rel.logical.LogicalTableFunctionScan import org.apache.calcite.rex.{RexInputRef, RexNode} import org.apache.calcite.tools.RelBuilder import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.operators.join.JoinType -import org.apache.flink.table._ import org.apache.flink.table.api.{StreamTableEnvironment, TableEnvironment, UnresolvedException} import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory} import org.apache.flink.table.expressions._ @@ -37,7 +36,6 @@ import org.apache.flink.table.functions.TableFunction import org.apache.flink.table.functions.utils.TableSqlFunction import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._ import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl -import org.apache.flink.table.typeutils.TypeConverter import org.apache.flink.table.validate.{ValidationFailure, ValidationSuccess} import scala.collection.JavaConverters._ @@ -426,11 +424,18 @@ case class Join( } relBuilder.join( - TypeConverter.flinkJoinTypeToRelType(joinType), + convertJoinType(joinType), condition.map(_.toRexNode(relBuilder)).getOrElse(relBuilder.literal(true)), corSet.asJava) } + private def convertJoinType(joinType: JoinType) = joinType match { + case JoinType.INNER => JoinRelType.INNER + case JoinType.LEFT_OUTER => JoinRelType.LEFT + case JoinType.RIGHT_OUTER => JoinRelType.RIGHT + case JoinType.FULL_OUTER => JoinRelType.FULL + } + private def ambiguousName: Set[String] = left.output.map(_.name).toSet.intersect(right.output.map(_.name).toSet) @@ -481,13 +486,12 @@ case class Join( if (checkIfFilterCondition(x)) { localPredicateFound = true } - case x: BinaryComparison => { + case x: BinaryComparison => if (checkIfFilterCondition(x)) { localPredicateFound = true } else { nonEquiJoinPredicateFound = true } - } case x => failValidation( s"Unsupported condition type: ${x.getClass.getSimpleName}. Condition: $x") } http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala new file mode 100644 index 0000000..3883b14 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonAggregate.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes + +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.core.AggregateCall +import org.apache.flink.table.calcite.FlinkRelBuilder +import FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.runtime.aggregate.AggregateUtil._ + +import scala.collection.JavaConverters._ + +trait CommonAggregate { + + private[flink] def groupingToString(inputType: RelDataType, grouping: Array[Int]): String = { + + val inFields = inputType.getFieldNames.asScala + grouping.map( inFields(_) ).mkString(", ") + } + + private[flink] def aggregationToString( + inputType: RelDataType, + grouping: Array[Int], + rowType: RelDataType, + namedAggregates: Seq[CalcitePair[AggregateCall, String]], + namedProperties: Seq[NamedWindowProperty]) + : String = { + + val inFields = inputType.getFieldNames.asScala + val outFields = rowType.getFieldNames.asScala + + val groupStrings = grouping.map( inFields(_) ) + + val aggs = namedAggregates.map(_.getKey) + val aggStrings = aggs.map( a => s"${a.getAggregation}(${ + if (a.getArgList.size() > 0) { + inFields(a.getArgList.get(0)) + } else { + "*" + } + })") + + val propStrings = namedProperties.map(_.property.toString) + + (groupStrings ++ aggStrings ++ propStrings).zip(outFields).map { + case (f, o) => if (f == o) { + f + } else { + s"$f AS $o" + } + }.mkString(", ") + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala new file mode 100644 index 0000000..3f46258 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes + +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rex.{RexNode, RexProgram} +import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction} +import org.apache.flink.table.runtime.FlatMapRunner +import org.apache.flink.types.Row + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +trait CommonCalc { + + private[flink] def functionBody( + generator: CodeGenerator, + inputType: TypeInformation[Row], + rowType: RelDataType, + calcProgram: RexProgram, + config: TableConfig) + : String = { + + val returnType = FlinkTypeFactory.toInternalRowTypeInfo(rowType) + + val condition = calcProgram.getCondition + val expandedExpressions = calcProgram.getProjectList.map( + expr => calcProgram.expandLocalRef(expr)) + val projection = generator.generateResultExpression( + returnType, + rowType.getFieldNames, + expandedExpressions) + + // only projection + if (condition == null) { + s""" + |${projection.code} + |${generator.collectorTerm}.collect(${projection.resultTerm}); + |""".stripMargin + } + else { + val filterCondition = generator.generateExpression( + calcProgram.expandLocalRef(calcProgram.getCondition)) + // only filter + if (projection == null) { + s""" + |${filterCondition.code} + |if (${filterCondition.resultTerm}) { + | ${generator.collectorTerm}.collect(${generator.input1Term}); + |} + |""".stripMargin + } + // both filter and projection + else { + s""" + |${filterCondition.code} + |if (${filterCondition.resultTerm}) { + | ${projection.code} + | ${generator.collectorTerm}.collect(${projection.resultTerm}); + |} + |""".stripMargin + } + } + } + + private[flink] def calcMapFunction( + genFunction: GeneratedFunction[FlatMapFunction[Row, Row], Row]) + : RichFlatMapFunction[Row, Row] = { + + new FlatMapRunner[Row, Row]( + genFunction.name, + genFunction.code, + genFunction.returnType) + } + + private[flink] def conditionToString( + calcProgram: RexProgram, + expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = { + + val cond = calcProgram.getCondition + val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList + val localExprs = calcProgram.getExprList.asScala.toList + + if (cond != null) { + expression(cond, inFields, Some(localExprs)) + } else { + "" + } + } + + private[flink] def selectionToString( + calcProgram: RexProgram, + expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = { + + val proj = calcProgram.getProjectList.asScala.toList + val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList + val localExprs = calcProgram.getExprList.asScala.toList + val outFields = calcProgram.getOutputRowType.getFieldNames.asScala.toList + + proj + .map(expression(_, inFields, Some(localExprs))) + .zip(outFields).map { case (e, o) => + if (e != o) { + e + " AS " + o + } else { + e + } + }.mkString(", ") + } + + private[flink] def calcOpName( + calcProgram: RexProgram, + expression: (RexNode, List[String], Option[List[RexNode]]) => String) = { + + val conditionStr = conditionToString(calcProgram, expression) + val selectionStr = selectionToString(calcProgram, expression) + + s"${if (calcProgram.getCondition != null) { + s"where: ($conditionStr), " + } else { + "" + }}select: ($selectionStr)" + } + + private[flink] def calcToString( + calcProgram: RexProgram, + expression: (RexNode, List[String], Option[List[RexNode]]) => String) = { + + val name = calcOpName(calcProgram, expression) + s"Calc($name)" + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala new file mode 100644 index 0000000..61b7ffb --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCorrelate.scala @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.nodes + +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rex.{RexCall, RexNode} +import org.apache.calcite.sql.SemiJoinType +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.{TableConfig, TableException} +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.codegen.CodeGenUtils.primitiveDefaultValue +import org.apache.flink.table.codegen.GeneratedExpression.{ALWAYS_NULL, NO_CODE} +import org.apache.flink.table.codegen.{CodeGenerator, GeneratedCollector, GeneratedExpression, GeneratedFunction} +import org.apache.flink.table.functions.utils.TableSqlFunction +import org.apache.flink.table.runtime.{CorrelateFlatMapRunner, TableFunctionCollector} +import org.apache.flink.types.Row + +import scala.collection.JavaConverters._ + +/** + * Join a user-defined table function + */ +trait CommonCorrelate { + + /** + * Creates the [[CorrelateFlatMapRunner]] to execute the join of input table + * and user-defined table function. + */ + private[flink] def correlateMapFunction( + config: TableConfig, + inputTypeInfo: TypeInformation[Row], + udtfTypeInfo: TypeInformation[Any], + rowType: RelDataType, + joinType: SemiJoinType, + rexCall: RexCall, + condition: Option[RexNode], + pojoFieldMapping: Option[Array[Int]], // udtf return type pojo field mapping + ruleDescription: String) + : CorrelateFlatMapRunner[Row, Row] = { + + val returnType = FlinkTypeFactory.toInternalRowTypeInfo(rowType) + + val flatMap = generateFunction( + config, + inputTypeInfo, + udtfTypeInfo, + returnType, + rowType, + joinType, + rexCall, + pojoFieldMapping, + ruleDescription) + + val collector = generateCollector( + config, + inputTypeInfo, + udtfTypeInfo, + returnType, + rowType, + condition, + pojoFieldMapping) + + new CorrelateFlatMapRunner[Row, Row]( + flatMap.name, + flatMap.code, + collector.name, + collector.code, + flatMap.returnType) + + } + + /** + * Generates the flat map function to run the user-defined table function. + */ + private def generateFunction( + config: TableConfig, + inputTypeInfo: TypeInformation[Row], + udtfTypeInfo: TypeInformation[Any], + returnType: TypeInformation[Row], + rowType: RelDataType, + joinType: SemiJoinType, + rexCall: RexCall, + pojoFieldMapping: Option[Array[Int]], + ruleDescription: String) + : GeneratedFunction[FlatMapFunction[Row, Row], Row] = { + + val functionGenerator = new CodeGenerator( + config, + false, + inputTypeInfo, + Some(udtfTypeInfo), + None, + pojoFieldMapping) + + val (input1AccessExprs, input2AccessExprs) = functionGenerator.generateCorrelateAccessExprs + + val collectorTerm = functionGenerator + .addReusableConstructor(classOf[TableFunctionCollector[_]]) + .head + + val call = functionGenerator.generateExpression(rexCall) + var body = + s""" + |${call.resultTerm}.setCollector($collectorTerm); + |${call.code} + |""".stripMargin + + if (joinType == SemiJoinType.LEFT) { + // left outer join + + // in case of left outer join and the returned row of table function is empty, + // fill all fields of row with null + val input2NullExprs = input2AccessExprs.map { x => + GeneratedExpression( + primitiveDefaultValue(x.resultType), + ALWAYS_NULL, + NO_CODE, + x.resultType) + } + val outerResultExpr = functionGenerator.generateResultExpression( + input1AccessExprs ++ input2NullExprs, returnType, rowType.getFieldNames.asScala) + body += + s""" + |boolean hasOutput = $collectorTerm.isCollected(); + |if (!hasOutput) { + | ${outerResultExpr.code} + | ${functionGenerator.collectorTerm}.collect(${outerResultExpr.resultTerm}); + |} + |""".stripMargin + } else if (joinType != SemiJoinType.INNER) { + throw TableException(s"Unsupported SemiJoinType: $joinType for correlate join.") + } + + functionGenerator.generateFunction( + ruleDescription, + classOf[FlatMapFunction[Row, Row]], + body, + returnType) + } + + /** + * Generates table function collector. + */ + private[flink] def generateCollector( + config: TableConfig, + inputTypeInfo: TypeInformation[Row], + udtfTypeInfo: TypeInformation[Any], + returnType: TypeInformation[Row], + rowType: RelDataType, + condition: Option[RexNode], + pojoFieldMapping: Option[Array[Int]]) + : GeneratedCollector = { + + val generator = new CodeGenerator( + config, + false, + inputTypeInfo, + Some(udtfTypeInfo), + None, + pojoFieldMapping) + + val (input1AccessExprs, input2AccessExprs) = generator.generateCorrelateAccessExprs + + val crossResultExpr = generator.generateResultExpression( + input1AccessExprs ++ input2AccessExprs, + returnType, + rowType.getFieldNames.asScala) + + val collectorCode = if (condition.isEmpty) { + s""" + |${crossResultExpr.code} + |getCollector().collect(${crossResultExpr.resultTerm}); + |""".stripMargin + } else { + val filterGenerator = new CodeGenerator(config, false, udtfTypeInfo) + filterGenerator.input1Term = filterGenerator.input2Term + val filterCondition = filterGenerator.generateExpression(condition.get) + s""" + |${filterGenerator.reuseInputUnboxingCode()} + |${filterCondition.code} + |if (${filterCondition.resultTerm}) { + | ${crossResultExpr.code} + | getCollector().collect(${crossResultExpr.resultTerm}); + |} + |""".stripMargin + } + + generator.generateTableFunctionCollector( + "TableFunctionCollector", + collectorCode, + udtfTypeInfo) + } + + private[flink] def selectToString(rowType: RelDataType): String = { + rowType.getFieldNames.asScala.mkString(",") + } + + private[flink] def correlateOpName( + rexCall: RexCall, + sqlFunction: TableSqlFunction, + rowType: RelDataType) + : String = { + + s"correlate: ${correlateToString(rexCall, sqlFunction)}, select: ${selectToString(rowType)}" + } + + private[flink] def correlateToString(rexCall: RexCall, sqlFunction: TableSqlFunction): String = { + val udtfName = sqlFunction.getName + val operands = rexCall.getOperands.asScala.map(_.toString).mkString(",") + s"table($udtfName($operands))" + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala new file mode 100644 index 0000000..274b602 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonScan.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.codegen.CodeGenerator +import org.apache.flink.table.runtime.MapRunner +import org.apache.flink.types.Row + +/** + * Common class for batch and stream scans. + */ +trait CommonScan { + + /** + * We check if the input type is exactly the same as the internal row type. + * A conversion is necessary if types differ. + */ + private[flink] def needsConversion( + externalTypeInfo: TypeInformation[Any], + internalTypeInfo: TypeInformation[Row]) + : Boolean = { + + externalTypeInfo != internalTypeInfo + } + + private[flink] def getConversionMapper( + config: TableConfig, + inputType: TypeInformation[Any], + expectedType: TypeInformation[Row], + conversionOperatorName: String, + fieldNames: Seq[String], + inputPojoFieldMapping: Option[Array[Int]] = None) + : MapFunction[Any, Row] = { + + val generator = new CodeGenerator( + config, + false, + inputType, + None, + inputPojoFieldMapping) + val conversion = generator.generateConverterResultExpression(expectedType, fieldNames) + + val body = + s""" + |${conversion.code} + |return ${conversion.resultTerm}; + |""".stripMargin + + val genFunction = generator.generateFunction( + conversionOperatorName, + classOf[MapFunction[Any, Row]], + body, + expectedType) + + new MapRunner[Any, Row]( + genFunction.name, + genFunction.code, + genFunction.returnType) + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkAggregate.scala deleted file mode 100644 index 7290594..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkAggregate.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.plan.nodes - -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.core.AggregateCall -import org.apache.flink.table.calcite.FlinkRelBuilder -import FlinkRelBuilder.NamedWindowProperty -import org.apache.flink.table.runtime.aggregate.AggregateUtil._ - -import scala.collection.JavaConverters._ - -trait FlinkAggregate { - - private[flink] def groupingToString(inputType: RelDataType, grouping: Array[Int]): String = { - - val inFields = inputType.getFieldNames.asScala - grouping.map( inFields(_) ).mkString(", ") - } - - private[flink] def aggregationToString( - inputType: RelDataType, - grouping: Array[Int], - rowType: RelDataType, - namedAggregates: Seq[CalcitePair[AggregateCall, String]], - namedProperties: Seq[NamedWindowProperty]) - : String = { - - val inFields = inputType.getFieldNames.asScala - val outFields = rowType.getFieldNames.asScala - - val groupStrings = grouping.map( inFields(_) ) - - val aggs = namedAggregates.map(_.getKey) - val aggStrings = aggs.map( a => s"${a.getAggregation}(${ - if (a.getArgList.size() > 0) { - inFields(a.getArgList.get(0)) - } else { - "*" - } - })") - - val propStrings = namedProperties.map(_.property.toString) - - (groupStrings ++ aggStrings ++ propStrings).zip(outFields).map { - case (f, o) => if (f == o) { - f - } else { - s"$f AS $o" - } - }.mkString(", ") - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCalc.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCalc.scala deleted file mode 100644 index 5ebd3ee..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCalc.scala +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.plan.nodes - -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rex.{RexNode, RexProgram} -import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction} -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.api.TableConfig -import org.apache.flink.table.codegen.{CodeGenerator, GeneratedFunction} -import org.apache.flink.table.runtime.FlatMapRunner -import org.apache.flink.table.typeutils.TypeConverter._ - -import scala.collection.JavaConversions._ -import scala.collection.JavaConverters._ - -trait FlinkCalc { - - private[flink] def functionBody( - generator: CodeGenerator, - inputType: TypeInformation[Any], - rowType: RelDataType, - calcProgram: RexProgram, - config: TableConfig, - expectedType: Option[TypeInformation[Any]]): String = { - - val returnType = determineReturnType( - rowType, - expectedType, - config.getNullCheck, - config.getEfficientTypeUsage) - - val condition = calcProgram.getCondition - val expandedExpressions = calcProgram.getProjectList.map( - expr => calcProgram.expandLocalRef(expr)) - val projection = generator.generateResultExpression( - returnType, - rowType.getFieldNames, - expandedExpressions) - - // only projection - if (condition == null) { - s""" - |${projection.code} - |${generator.collectorTerm}.collect(${projection.resultTerm}); - |""".stripMargin - } - else { - val filterCondition = generator.generateExpression( - calcProgram.expandLocalRef(calcProgram.getCondition)) - // only filter - if (projection == null) { - // conversion - if (inputType != returnType) { - val conversion = generator.generateConverterResultExpression( - returnType, - rowType.getFieldNames) - - s""" - |${filterCondition.code} - |if (${filterCondition.resultTerm}) { - | ${conversion.code} - | ${generator.collectorTerm}.collect(${conversion.resultTerm}); - |} - |""".stripMargin - } - // no conversion - else { - s""" - |${filterCondition.code} - |if (${filterCondition.resultTerm}) { - | ${generator.collectorTerm}.collect(${generator.input1Term}); - |} - |""".stripMargin - } - } - // both filter and projection - else { - s""" - |${filterCondition.code} - |if (${filterCondition.resultTerm}) { - | ${projection.code} - | ${generator.collectorTerm}.collect(${projection.resultTerm}); - |} - |""".stripMargin - } - } - } - - private[flink] def calcMapFunction( - genFunction: GeneratedFunction[FlatMapFunction[Any, Any]]): RichFlatMapFunction[Any, Any] = { - - new FlatMapRunner[Any, Any]( - genFunction.name, - genFunction.code, - genFunction.returnType) - } - - private[flink] def conditionToString( - calcProgram: RexProgram, - expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = { - - val cond = calcProgram.getCondition - val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList - val localExprs = calcProgram.getExprList.asScala.toList - - if (cond != null) { - expression(cond, inFields, Some(localExprs)) - } else { - "" - } - } - - private[flink] def selectionToString( - calcProgram: RexProgram, - expression: (RexNode, List[String], Option[List[RexNode]]) => String): String = { - - val proj = calcProgram.getProjectList.asScala.toList - val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList - val localExprs = calcProgram.getExprList.asScala.toList - val outFields = calcProgram.getOutputRowType.getFieldNames.asScala.toList - - proj - .map(expression(_, inFields, Some(localExprs))) - .zip(outFields).map { case (e, o) => { - if (e != o) { - e + " AS " + o - } else { - e - } - } - }.mkString(", ") - } - - private[flink] def calcOpName( - calcProgram: RexProgram, - expression: (RexNode, List[String], Option[List[RexNode]]) => String) = { - - val conditionStr = conditionToString(calcProgram, expression) - val selectionStr = selectionToString(calcProgram, expression) - - s"${if (calcProgram.getCondition != null) { - s"where: ($conditionStr), " - } else { - "" - }}select: ($selectionStr)" - } - - private[flink] def calcToString( - calcProgram: RexProgram, - expression: (RexNode, List[String], Option[List[RexNode]]) => String) = { - - val name = calcOpName(calcProgram, expression) - s"Calc($name)" - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCorrelate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCorrelate.scala deleted file mode 100644 index c986602..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkCorrelate.scala +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.table.plan.nodes - -import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rex.{RexCall, RexNode} -import org.apache.calcite.sql.SemiJoinType -import org.apache.flink.api.common.functions.FlatMapFunction -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.table.codegen.{CodeGenerator, GeneratedCollector, GeneratedExpression, GeneratedFunction} -import org.apache.flink.table.codegen.CodeGenUtils.primitiveDefaultValue -import org.apache.flink.table.codegen.GeneratedExpression.{ALWAYS_NULL, NO_CODE} -import org.apache.flink.table.functions.utils.TableSqlFunction -import org.apache.flink.table.runtime.{CorrelateFlatMapRunner, TableFunctionCollector} -import org.apache.flink.table.typeutils.TypeConverter._ -import org.apache.flink.table.api.{TableConfig, TableException} - -import scala.collection.JavaConverters._ - -/** - * Join a user-defined table function - */ -trait FlinkCorrelate { - - /** - * Creates the [[CorrelateFlatMapRunner]] to execute the join of input table - * and user-defined table function. - */ - private[flink] def correlateMapFunction( - config: TableConfig, - inputTypeInfo: TypeInformation[Any], - udtfTypeInfo: TypeInformation[Any], - rowType: RelDataType, - joinType: SemiJoinType, - rexCall: RexCall, - condition: Option[RexNode], - expectedType: Option[TypeInformation[Any]], - pojoFieldMapping: Option[Array[Int]], // udtf return type pojo field mapping - ruleDescription: String) - : CorrelateFlatMapRunner[Any, Any] = { - - val returnType = determineReturnType( - rowType, - expectedType, - config.getNullCheck, - config.getEfficientTypeUsage) - - val flatMap = generateFunction( - config, - inputTypeInfo, - udtfTypeInfo, - returnType, - rowType, - joinType, - rexCall, - pojoFieldMapping, - ruleDescription) - - val collector = generateCollector( - config, - inputTypeInfo, - udtfTypeInfo, - returnType, - rowType, - condition, - pojoFieldMapping) - - new CorrelateFlatMapRunner[Any, Any]( - flatMap.name, - flatMap.code, - collector.name, - collector.code, - flatMap.returnType) - - } - - /** - * Generates the flat map function to run the user-defined table function. - */ - private def generateFunction( - config: TableConfig, - inputTypeInfo: TypeInformation[Any], - udtfTypeInfo: TypeInformation[Any], - returnType: TypeInformation[Any], - rowType: RelDataType, - joinType: SemiJoinType, - rexCall: RexCall, - pojoFieldMapping: Option[Array[Int]], - ruleDescription: String) - : GeneratedFunction[FlatMapFunction[Any, Any]] = { - - val functionGenerator = new CodeGenerator( - config, - false, - inputTypeInfo, - Some(udtfTypeInfo), - None, - pojoFieldMapping) - - val (input1AccessExprs, input2AccessExprs) = functionGenerator.generateCorrelateAccessExprs - - val collectorTerm = functionGenerator - .addReusableConstructor(classOf[TableFunctionCollector[_]]) - .head - - val call = functionGenerator.generateExpression(rexCall) - var body = - s""" - |${call.resultTerm}.setCollector($collectorTerm); - |${call.code} - |""".stripMargin - - if (joinType == SemiJoinType.LEFT) { - // left outer join - - // in case of left outer join and the returned row of table function is empty, - // fill all fields of row with null - val input2NullExprs = input2AccessExprs.map { x => - GeneratedExpression( - primitiveDefaultValue(x.resultType), - ALWAYS_NULL, - NO_CODE, - x.resultType) - } - val outerResultExpr = functionGenerator.generateResultExpression( - input1AccessExprs ++ input2NullExprs, returnType, rowType.getFieldNames.asScala) - body += - s""" - |boolean hasOutput = $collectorTerm.isCollected(); - |if (!hasOutput) { - | ${outerResultExpr.code} - | ${functionGenerator.collectorTerm}.collect(${outerResultExpr.resultTerm}); - |} - |""".stripMargin - } else if (joinType != SemiJoinType.INNER) { - throw TableException(s"Unsupported SemiJoinType: $joinType for correlate join.") - } - - functionGenerator.generateFunction( - ruleDescription, - classOf[FlatMapFunction[Any, Any]], - body, - returnType) - } - - /** - * Generates table function collector. - */ - private[flink] def generateCollector( - config: TableConfig, - inputTypeInfo: TypeInformation[Any], - udtfTypeInfo: TypeInformation[Any], - returnType: TypeInformation[Any], - rowType: RelDataType, - condition: Option[RexNode], - pojoFieldMapping: Option[Array[Int]]) - : GeneratedCollector = { - - val generator = new CodeGenerator( - config, - false, - inputTypeInfo, - Some(udtfTypeInfo), - None, - pojoFieldMapping) - - val (input1AccessExprs, input2AccessExprs) = generator.generateCorrelateAccessExprs - - val crossResultExpr = generator.generateResultExpression( - input1AccessExprs ++ input2AccessExprs, - returnType, - rowType.getFieldNames.asScala) - - val collectorCode = if (condition.isEmpty) { - s""" - |${crossResultExpr.code} - |getCollector().collect(${crossResultExpr.resultTerm}); - |""".stripMargin - } else { - val filterGenerator = new CodeGenerator(config, false, udtfTypeInfo) - filterGenerator.input1Term = filterGenerator.input2Term - val filterCondition = filterGenerator.generateExpression(condition.get) - s""" - |${filterGenerator.reuseInputUnboxingCode()} - |${filterCondition.code} - |if (${filterCondition.resultTerm}) { - | ${crossResultExpr.code} - | getCollector().collect(${crossResultExpr.resultTerm}); - |} - |""".stripMargin - } - - generator.generateTableFunctionCollector( - "TableFunctionCollector", - collectorCode, - udtfTypeInfo) - } - - private[flink] def selectToString(rowType: RelDataType): String = { - rowType.getFieldNames.asScala.mkString(",") - } - - private[flink] def correlateOpName( - rexCall: RexCall, - sqlFunction: TableSqlFunction, - rowType: RelDataType) - : String = { - - s"correlate: ${correlateToString(rexCall, sqlFunction)}, select: ${selectToString(rowType)}" - } - - private[flink] def correlateToString(rexCall: RexCall, sqlFunction: TableSqlFunction): String = { - val udtfName = sqlFunction.getName - val operands = rexCall.getOperands.asScala.map(_.toString).mkString(",") - s"table($udtfName($operands))" - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala index a7765d1..7ad9bd5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala @@ -66,43 +66,6 @@ trait FlinkRel { } } - private[flink] def getConversionMapper( - config: TableConfig, - nullableInput: Boolean, - inputType: TypeInformation[Any], - expectedType: TypeInformation[Any], - conversionOperatorName: String, - fieldNames: Seq[String], - inputPojoFieldMapping: Option[Array[Int]] = None) - : MapFunction[Any, Any] = { - - val generator = new CodeGenerator( - config, - nullableInput, - inputType, - None, - inputPojoFieldMapping) - val conversion = generator.generateConverterResultExpression(expectedType, fieldNames) - - val body = - s""" - |${conversion.code} - |return ${conversion.resultTerm}; - |""".stripMargin - - val genFunction = generator.generateFunction( - conversionOperatorName, - classOf[MapFunction[Any, Any]], - body, - expectedType) - - new MapRunner[Any, Any]( - genFunction.name, - genFunction.code, - genFunction.returnType) - - } - private[flink] def estimateRowSize(rowType: RelDataType): Double = { val fieldList = rowType.getFieldList http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala index 252bb2e..09262a6 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchScan.scala @@ -21,12 +21,12 @@ package org.apache.flink.table.plan.nodes.dataset import org.apache.calcite.plan._ import org.apache.calcite.rel.core.TableScan import org.apache.calcite.rel.metadata.RelMetadataQuery -import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet -import org.apache.flink.api.java.typeutils.PojoTypeInfo import org.apache.flink.table.api.TableConfig +import org.apache.flink.table.calcite.FlinkTypeFactory +import org.apache.flink.table.plan.nodes.CommonScan import org.apache.flink.table.plan.schema.FlinkTable -import org.apache.flink.table.typeutils.TypeConverter.determineReturnType +import org.apache.flink.types.Row import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -36,6 +36,7 @@ abstract class BatchScan( traitSet: RelTraitSet, table: RelOptTable) extends TableScan(cluster, traitSet, table) + with CommonScan with DataSetRel { override def toString: String = { @@ -48,50 +49,34 @@ abstract class BatchScan( planner.getCostFactory.makeCost(rowCnt, rowCnt, 0) } - protected def convertToExpectedType( + protected def convertToInternalRow( input: DataSet[Any], flinkTable: FlinkTable[_], - expectedType: Option[TypeInformation[Any]], - config: TableConfig): DataSet[Any] = { + config: TableConfig) + : DataSet[Row] = { val inputType = input.getType - expectedType match { + val internalType = FlinkTypeFactory.toInternalRowTypeInfo(getRowType) - // special case: - // if efficient type usage is enabled and no expected type is set - // we can simply forward the DataSet to the next operator. - // however, we cannot forward PojoTypes as their fields don't have an order - case None if config.getEfficientTypeUsage && !inputType.isInstanceOf[PojoTypeInfo[_]] => - input + // conversion + if (needsConversion(inputType, internalType)) { - case _ => - val determinedType = determineReturnType( - getRowType, - expectedType, - config.getNullCheck, - config.getEfficientTypeUsage) + val mapFunc = getConversionMapper( + config, + inputType, + internalType, + "DataSetSourceConversion", + getRowType.getFieldNames, + Some(flinkTable.fieldIndexes)) - // conversion - if (determinedType != inputType) { + val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})" - val mapFunc = getConversionMapper( - config, - nullableInput = false, - inputType, - determinedType, - "DataSetSourceConversion", - getRowType.getFieldNames, - Some(flinkTable.fieldIndexes)) - - val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})" - - input.map(mapFunc).name(opName) - } - // no conversion necessary, forward - else { - input - } + input.map(mapFunc).name(opName) + } + // no conversion necessary, forward + else { + input.asInstanceOf[DataSet[Row]] } } } http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala index 73dddc6..9b8e1ea 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/BatchTableSourceScan.scala @@ -27,6 +27,7 @@ import org.apache.flink.table.api.{BatchTableEnvironment, TableEnvironment} import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.plan.schema.TableSourceTable import org.apache.flink.table.sources.BatchTableSource +import org.apache.flink.types.Row /** Flink RelNode to read data from an external source defined by a [[BatchTableSource]]. */ class BatchTableSourceScan( @@ -62,13 +63,11 @@ class BatchTableSourceScan( .item("fields", TableEnvironment.getFieldNames(tableSource).mkString(", ")) } - override def translateToPlan( - tableEnv: BatchTableEnvironment, - expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { val config = tableEnv.getConfig val inputDataSet = tableSource.getDataSet(tableEnv.execEnv).asInstanceOf[DataSet[Any]] - convertToExpectedType(inputDataSet, new TableSourceTable(tableSource), expectedType, config) + convertToInternalRow(inputDataSet, new TableSourceTable(tableSource), config) } } http://git-wip-us.apache.org/repos/asf/flink/blob/6bc6b225/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala index 6771536..206e562 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala @@ -23,19 +23,15 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.AggregateCall import org.apache.calcite.rel.metadata.RelMetadataQuery import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} -import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.table.api.BatchTableEnvironment import org.apache.flink.table.calcite.FlinkTypeFactory -import org.apache.flink.table.plan.nodes.FlinkAggregate +import org.apache.flink.table.plan.nodes.CommonAggregate import org.apache.flink.table.runtime.aggregate.AggregateUtil import org.apache.flink.table.runtime.aggregate.AggregateUtil.CalcitePair -import org.apache.flink.table.typeutils.TypeConverter -import org.apache.flink.table.api.BatchTableEnvironment import org.apache.flink.types.Row -import scala.collection.JavaConverters._ - /** * Flink RelNode which matches along with a LogicalAggregate. */ @@ -49,7 +45,7 @@ class DataSetAggregate( grouping: Array[Int], inGroupingSet: Boolean) extends SingleRel(cluster, traitSet, inputNode) - with FlinkAggregate + with CommonAggregate with DataSetRel { override def deriveRowType(): RelDataType = rowRelDataType @@ -89,9 +85,7 @@ class DataSetAggregate( planner.getCostFactory.makeCost(rowCnt, rowCnt * aggCnt, rowCnt * rowSize) } - override def translateToPlan( - tableEnv: BatchTableEnvironment, - expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { val config = tableEnv.getConfig @@ -109,15 +103,7 @@ class DataSetAggregate( grouping, inGroupingSet) - val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan( - tableEnv, - // tell the input operator that this operator currently only supports Rows as input - Some(TypeConverter.DEFAULT_ROW_TYPE)) - - // get the output types - val fieldTypes: Array[TypeInformation[_]] = getRowType.getFieldList.asScala - .map(field => FlinkTypeFactory.toTypeInfo(field.getType)) - .toArray + val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv) val aggString = aggregationToString(inputType, grouping, getRowType, namedAggregates, Nil) val prepareOpName = s"prepare select: ($aggString)" @@ -125,46 +111,26 @@ class DataSetAggregate( .map(mapFunction) .name(prepareOpName) - val rowTypeInfo = new RowTypeInfo(fieldTypes: _*) - - val result = { - if (groupingKeys.length > 0) { - // grouped aggregation - val aggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " + - s"select: ($aggString)" - - mappedInput.asInstanceOf[DataSet[Row]] - .groupBy(groupingKeys: _*) - .reduceGroup(groupReduceFunction) - .returns(rowTypeInfo) - .name(aggOpName) - .asInstanceOf[DataSet[Any]] - } - else { - // global aggregation - val aggOpName = s"select:($aggString)" - mappedInput.asInstanceOf[DataSet[Row]] - .reduceGroup(groupReduceFunction) - .returns(rowTypeInfo) - .name(aggOpName) - .asInstanceOf[DataSet[Any]] - } - } + val rowTypeInfo = FlinkTypeFactory.toInternalRowTypeInfo(getRowType).asInstanceOf[RowTypeInfo] + + if (groupingKeys.length > 0) { + // grouped aggregation + val aggOpName = s"groupBy: (${groupingToString(inputType, grouping)}), " + + s"select: ($aggString)" - // if the expected type is not a Row, inject a mapper to convert to the expected type - expectedType match { - case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] => - val mapName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})" - result.map(getConversionMapper( - config = config, - nullableInput = false, - inputType = rowTypeInfo.asInstanceOf[TypeInformation[Any]], - expectedType = expectedType.get, - conversionOperatorName = "DataSetAggregateConversion", - fieldNames = getRowType.getFieldNames.asScala - )) - .name(mapName) - case _ => result + mappedInput.asInstanceOf[DataSet[Row]] + .groupBy(groupingKeys: _*) + .reduceGroup(groupReduceFunction) + .returns(rowTypeInfo) + .name(aggOpName) + } + else { + // global aggregation + val aggOpName = s"select:($aggString)" + mappedInput.asInstanceOf[DataSet[Row]] + .reduceGroup(groupReduceFunction) + .returns(rowTypeInfo) + .name(aggOpName) } } }