http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala new file mode 100644 index 0000000..9c6eea8 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.calcite + +import java.util + +import com.google.common.collect.ImmutableList +import org.apache.calcite.jdbc.CalciteSchema +import org.apache.calcite.plan.RelOptTable.ViewExpander +import org.apache.calcite.plan._ +import org.apache.calcite.prepare.CalciteCatalogReader +import org.apache.calcite.rel.RelRoot +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rex.RexBuilder +import org.apache.calcite.schema.SchemaPlus +import org.apache.calcite.sql.parser.{SqlParser, SqlParseException => CSqlParseException} +import org.apache.calcite.sql.validate.SqlValidator +import org.apache.calcite.sql.{SqlNode, SqlOperatorTable} +import org.apache.calcite.sql2rel.{RelDecorrelator, SqlRexConvertletTable, SqlToRelConverter} +import org.apache.calcite.tools.{FrameworkConfig, RelConversionException} +import org.apache.flink.table.api.{SqlParserException, TableException, ValidationException} + +import scala.collection.JavaConversions._ + +/** + * NOTE: this is heavily inspired by Calcite's PlannerImpl. + * We need it in order to share the planner between the Table API relational plans + * and the SQL relation plans that are created by the Calcite parser. + * The main difference is that we do not create a new RelOptPlanner in the ready() method. + */ +class FlinkPlannerImpl( + config: FrameworkConfig, + planner: RelOptPlanner, + typeFactory: FlinkTypeFactory) { + + val operatorTable: SqlOperatorTable = config.getOperatorTable + /** Holds the trait definitions to be registered with planner. May be null. */ + val traitDefs: ImmutableList[RelTraitDef[_ <: RelTrait]] = config.getTraitDefs + val parserConfig: SqlParser.Config = config.getParserConfig + val convertletTable: SqlRexConvertletTable = config.getConvertletTable + val defaultSchema: SchemaPlus = config.getDefaultSchema + + var validator: FlinkCalciteSqlValidator = _ + var validatedSqlNode: SqlNode = _ + var root: RelRoot = _ + + private def ready() { + if (this.traitDefs != null) { + planner.clearRelTraitDefs() + for (traitDef <- this.traitDefs) { + planner.addRelTraitDef(traitDef) + } + } + } + + def parse(sql: String): SqlNode = { + try { + ready() + val parser: SqlParser = SqlParser.create(sql, parserConfig) + val sqlNode: SqlNode = parser.parseStmt + sqlNode + } catch { + case e: CSqlParseException => + throw SqlParserException(s"SQL parse failed. ${e.getMessage}", e) + } + } + + def validate(sqlNode: SqlNode): SqlNode = { + validator = new FlinkCalciteSqlValidator(operatorTable, createCatalogReader, typeFactory) + validator.setIdentifierExpansion(true) + try { + validatedSqlNode = validator.validate(sqlNode) + } + catch { + case e: RuntimeException => + throw new ValidationException(s"SQL validation failed. ${e.getMessage}", e) + } + validatedSqlNode + } + + def rel(sql: SqlNode): RelRoot = { + try { + assert(validatedSqlNode != null) + val rexBuilder: RexBuilder = createRexBuilder + val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder) + val config = SqlToRelConverter.configBuilder() + .withTrimUnusedFields(false).withConvertTableAccess(false).build() + val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter( + new ViewExpanderImpl, validator, createCatalogReader, cluster, convertletTable, config) + root = sqlToRelConverter.convertQuery(validatedSqlNode, false, true) + // we disable automatic flattening in order to let composite types pass without modification + // we might enable it again once Calcite has better support for structured types + // root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true)) + root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel)) + root + } catch { + case e: RelConversionException => throw TableException(e.getMessage) + } + } + + /** Implements [[org.apache.calcite.plan.RelOptTable.ViewExpander]] + * interface for [[org.apache.calcite.tools.Planner]]. */ + class ViewExpanderImpl extends ViewExpander { + + override def expandView( + rowType: RelDataType, + queryString: String, + schemaPath: util.List[String], + viewPath: util.List[String]): RelRoot = { + + val parser: SqlParser = SqlParser.create(queryString, parserConfig) + var sqlNode: SqlNode = null + try { + sqlNode = parser.parseQuery + } + catch { + case e: CSqlParseException => + throw SqlParserException(s"SQL parse failed. ${e.getMessage}", e) + } + val catalogReader: CalciteCatalogReader = createCatalogReader.withSchemaPath(schemaPath) + val validator: SqlValidator = + new FlinkCalciteSqlValidator(operatorTable, catalogReader, typeFactory) + validator.setIdentifierExpansion(true) + val validatedSqlNode: SqlNode = validator.validate(sqlNode) + val rexBuilder: RexBuilder = createRexBuilder + val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder) + val config: SqlToRelConverter.Config = SqlToRelConverter.configBuilder + .withTrimUnusedFields(false).withConvertTableAccess(false).build + val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter( + new ViewExpanderImpl, validator, catalogReader, cluster, convertletTable, config) + root = sqlToRelConverter.convertQuery(validatedSqlNode, true, false) + root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true)) + root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel)) + FlinkPlannerImpl.this.root + } + } + + private def createCatalogReader: CalciteCatalogReader = { + val rootSchema: SchemaPlus = FlinkPlannerImpl.rootSchema(defaultSchema) + new CalciteCatalogReader( + CalciteSchema.from(rootSchema), + parserConfig.caseSensitive, + CalciteSchema.from(defaultSchema).path(null), + typeFactory) + } + + private def createRexBuilder: RexBuilder = { + new RexBuilder(typeFactory) + } + +} + +object FlinkPlannerImpl { + private def rootSchema(schema: SchemaPlus): SchemaPlus = { + if (schema.getParentSchema == null) { + schema + } + else { + rootSchema(schema.getParentSchema) + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala new file mode 100644 index 0000000..8465ec6 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkRelBuilder.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.calcite + +import java.util.Collections + +import org.apache.calcite.plan.volcano.VolcanoPlanner +import java.lang.Iterable + +import org.apache.calcite.jdbc.CalciteSchema +import org.apache.calcite.plan._ +import org.apache.calcite.prepare.CalciteCatalogReader +import org.apache.calcite.rel.logical.LogicalAggregate +import org.apache.calcite.rex.RexBuilder +import org.apache.calcite.tools.RelBuilder.{AggCall, GroupKey} +import org.apache.calcite.tools.{FrameworkConfig, RelBuilder} +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty +import org.apache.flink.table.expressions.WindowProperty +import org.apache.flink.table.plan.logical.LogicalWindow +import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate + +/** + * Flink specific [[RelBuilder]] that changes the default type factory to a [[FlinkTypeFactory]]. + */ +class FlinkRelBuilder( + context: Context, + relOptCluster: RelOptCluster, + relOptSchema: RelOptSchema) + extends RelBuilder( + context, + relOptCluster, + relOptSchema) { + + def getPlanner: RelOptPlanner = cluster.getPlanner + + def getCluster: RelOptCluster = relOptCluster + + override def getTypeFactory: FlinkTypeFactory = + super.getTypeFactory.asInstanceOf[FlinkTypeFactory] + + def aggregate( + window: LogicalWindow, + groupKey: GroupKey, + namedProperties: Seq[NamedWindowProperty], + aggCalls: Iterable[AggCall]) + : RelBuilder = { + // build logical aggregate + val aggregate = super.aggregate(groupKey, aggCalls).build().asInstanceOf[LogicalAggregate] + + // build logical window aggregate from it + push(LogicalWindowAggregate.create(window, namedProperties, aggregate)) + this + } + +} + +object FlinkRelBuilder { + + def create(config: FrameworkConfig): FlinkRelBuilder = { + + // create Flink type factory + val typeSystem = config.getTypeSystem + val typeFactory = new FlinkTypeFactory(typeSystem) + + // create context instances with Flink type factory + val planner = new VolcanoPlanner(config.getCostFactory, Contexts.empty()) + planner.setExecutor(config.getExecutor) + planner.addRelTraitDef(ConventionTraitDef.INSTANCE) + val cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory)) + val calciteSchema = CalciteSchema.from(config.getDefaultSchema) + val relOptSchema = new CalciteCatalogReader( + calciteSchema, + config.getParserConfig.caseSensitive(), + Collections.emptyList(), + typeFactory) + + new FlinkRelBuilder(config.getContext, cluster, relOptSchema) + } + + /** + * Information necessary to create a window aggregate. + * + * Similar to [[RelBuilder.AggCall]] or [[RelBuilder.GroupKey]]. + */ + case class NamedWindowProperty(name: String, property: WindowProperty) + +} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala new file mode 100644 index 0000000..f3e2f91 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.calcite + +import org.apache.calcite.avatica.util.TimeUnit +import org.apache.calcite.jdbc.JavaTypeFactoryImpl +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeSystem} +import org.apache.calcite.sql.SqlIntervalQualifier +import org.apache.calcite.sql.`type`.SqlTypeName +import org.apache.calcite.sql.`type`.SqlTypeName._ +import org.apache.calcite.sql.parser.SqlParserPos +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.{NothingTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo +import org.apache.flink.api.java.typeutils.ValueTypeInfo._ +import org.apache.flink.table.api.TableException +import org.apache.flink.table.plan.schema.{CompositeRelDataType, GenericRelDataType} +import org.apache.flink.table.typeutils.TimeIntervalTypeInfo +import org.apache.flink.table.typeutils.TypeCheckUtils.isSimple +import org.apache.flink.table.plan.schema.ArrayRelDataType +import org.apache.flink.table.calcite.FlinkTypeFactory.typeInfoToSqlTypeName + +import scala.collection.mutable + +/** + * Flink specific type factory that represents the interface between Flink's [[TypeInformation]] + * and Calcite's [[RelDataType]]. + */ +class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImpl(typeSystem) { + + // NOTE: for future data types it might be necessary to + // override more methods of RelDataTypeFactoryImpl + + private val seenTypes = mutable.HashMap[TypeInformation[_], RelDataType]() + + def createTypeFromTypeInfo(typeInfo: TypeInformation[_]): RelDataType = { + // simple type can be converted to SQL types and vice versa + if (isSimple(typeInfo)) { + val sqlType = typeInfoToSqlTypeName(typeInfo) + sqlType match { + + case INTERVAL_YEAR_MONTH => + createSqlIntervalType( + new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO)) + + case INTERVAL_DAY_SECOND => + createSqlIntervalType( + new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO)) + + case _ => + createSqlType(sqlType) + } + } + // advanced types require specific RelDataType + // for storing the original TypeInformation + else { + seenTypes.getOrElseUpdate(typeInfo, canonize(createAdvancedType(typeInfo))) + } + } + + /** + * Creates a struct type with the input fieldNames and input fieldTypes using FlinkTypeFactory + * + * @param fieldNames field names + * @param fieldTypes field types, every element is Flink's [[TypeInformation]] + * @return a struct type with the input fieldNames and input fieldTypes + */ + def buildRowDataType( + fieldNames: Array[String], + fieldTypes: Array[TypeInformation[_]]) + : RelDataType = { + val rowDataTypeBuilder = builder + fieldNames + .zip(fieldTypes) + .foreach { f => + rowDataTypeBuilder.add(f._1, createTypeFromTypeInfo(f._2)).nullable(true) + } + rowDataTypeBuilder.build + } + + override def createSqlType(typeName: SqlTypeName, precision: Int): RelDataType = { + // it might happen that inferred VARCHAR types overflow as we set them to Int.MaxValue + // always set those to default value + if (typeName == VARCHAR && precision < 0) { + createSqlType(typeName, getTypeSystem.getDefaultPrecision(typeName)) + } else { + super.createSqlType(typeName, precision) + } + } + + override def createArrayType(elementType: RelDataType, maxCardinality: Long): RelDataType = + new ArrayRelDataType( + ObjectArrayTypeInfo.getInfoFor(FlinkTypeFactory.toTypeInfo(elementType)), + elementType, + true) + + private def createAdvancedType(typeInfo: TypeInformation[_]): RelDataType = typeInfo match { + case ct: CompositeType[_] => + new CompositeRelDataType(ct, this) + + case pa: PrimitiveArrayTypeInfo[_] => + new ArrayRelDataType(pa, createTypeFromTypeInfo(pa.getComponentType), false) + + case oa: ObjectArrayTypeInfo[_, _] => + new ArrayRelDataType(oa, createTypeFromTypeInfo(oa.getComponentInfo), true) + + case ti: TypeInformation[_] => + new GenericRelDataType(typeInfo, getTypeSystem.asInstanceOf[FlinkTypeSystem]) + + case ti@_ => + throw TableException(s"Unsupported type information: $ti") + } + + override def createTypeWithNullability( + relDataType: RelDataType, + nullable: Boolean) + : RelDataType = relDataType match { + case composite: CompositeRelDataType => + // at the moment we do not care about nullability + composite + case _ => + super.createTypeWithNullability(relDataType, nullable) + } +} + +object FlinkTypeFactory { + + private def typeInfoToSqlTypeName(typeInfo: TypeInformation[_]): SqlTypeName = typeInfo match { + case BOOLEAN_TYPE_INFO => BOOLEAN + case BYTE_TYPE_INFO => TINYINT + case SHORT_TYPE_INFO => SMALLINT + case INT_TYPE_INFO => INTEGER + case LONG_TYPE_INFO => BIGINT + case FLOAT_TYPE_INFO => FLOAT + case DOUBLE_TYPE_INFO => DOUBLE + case STRING_TYPE_INFO => VARCHAR + case BIG_DEC_TYPE_INFO => DECIMAL + + // temporal types + case SqlTimeTypeInfo.DATE => DATE + case SqlTimeTypeInfo.TIME => TIME + case SqlTimeTypeInfo.TIMESTAMP => TIMESTAMP + case TimeIntervalTypeInfo.INTERVAL_MONTHS => INTERVAL_YEAR_MONTH + case TimeIntervalTypeInfo.INTERVAL_MILLIS => INTERVAL_DAY_SECOND + + case CHAR_TYPE_INFO | CHAR_VALUE_TYPE_INFO => + throw TableException("Character type is not supported.") + + case _@t => + throw TableException(s"Type is not supported: $t") + } + + def toTypeInfo(relDataType: RelDataType): TypeInformation[_] = relDataType.getSqlTypeName match { + case BOOLEAN => BOOLEAN_TYPE_INFO + case TINYINT => BYTE_TYPE_INFO + case SMALLINT => SHORT_TYPE_INFO + case INTEGER => INT_TYPE_INFO + case BIGINT => LONG_TYPE_INFO + case FLOAT => FLOAT_TYPE_INFO + case DOUBLE => DOUBLE_TYPE_INFO + case VARCHAR | CHAR => STRING_TYPE_INFO + case DECIMAL => BIG_DEC_TYPE_INFO + + // temporal types + case DATE => SqlTimeTypeInfo.DATE + case TIME => SqlTimeTypeInfo.TIME + case TIMESTAMP => SqlTimeTypeInfo.TIMESTAMP + case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => TimeIntervalTypeInfo.INTERVAL_MONTHS + case typeName if DAY_INTERVAL_TYPES.contains(typeName) => TimeIntervalTypeInfo.INTERVAL_MILLIS + + case NULL => + throw TableException("Type NULL is not supported. Null values must have a supported type.") + + // symbol for special flags e.g. TRIM's BOTH, LEADING, TRAILING + // are represented as integer + case SYMBOL => INT_TYPE_INFO + + // extract encapsulated TypeInformation + case ANY if relDataType.isInstanceOf[GenericRelDataType] => + val genericRelDataType = relDataType.asInstanceOf[GenericRelDataType] + genericRelDataType.typeInfo + + case ROW if relDataType.isInstanceOf[CompositeRelDataType] => + val compositeRelDataType = relDataType.asInstanceOf[CompositeRelDataType] + compositeRelDataType.compositeType + + // ROW and CURSOR for UDTF case, whose type info will never be used, just a placeholder + case ROW | CURSOR => new NothingTypeInfo + + case ARRAY if relDataType.isInstanceOf[ArrayRelDataType] => + val arrayRelDataType = relDataType.asInstanceOf[ArrayRelDataType] + arrayRelDataType.typeInfo + + case _@t => + throw TableException(s"Type is not supported: $t") + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeSystem.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeSystem.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeSystem.scala new file mode 100644 index 0000000..5935297 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeSystem.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.calcite + +import org.apache.calcite.rel.`type`.RelDataTypeSystemImpl +import org.apache.calcite.sql.`type`.SqlTypeName + +/** + * Custom type system for Flink. + */ +class FlinkTypeSystem extends RelDataTypeSystemImpl { + + // we cannot use Int.MaxValue because of an overflow in Calcites type inference logic + // half should be enough for all use cases + override def getMaxNumericScale: Int = Int.MaxValue / 2 + + // we cannot use Int.MaxValue because of an overflow in Calcites type inference logic + // half should be enough for all use cases + override def getMaxNumericPrecision: Int = Int.MaxValue / 2 + + override def getDefaultPrecision(typeName: SqlTypeName): Int = typeName match { + + // by default all VARCHARs can have the Java default length + case SqlTypeName.VARCHAR => + Int.MaxValue + + // we currenty support only timestamps with milliseconds precision + case SqlTypeName.TIMESTAMP => + 3 + + case _ => + super.getDefaultPrecision(typeName) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenException.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenException.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenException.scala new file mode 100644 index 0000000..1f2e9a9 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenException.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.codegen + +/** + * Exception for all errors occurring during code generation. + */ +class CodeGenException(msg: String) extends RuntimeException(msg) http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala new file mode 100644 index 0000000..f8885a2 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.codegen + +import java.lang.reflect.{Field, Method} +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.calcite.util.BuiltInMethod +import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo._ +import org.apache.flink.api.common.typeinfo.{FractionalTypeInfo, SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.typeutils.{PojoTypeInfo, RowTypeInfo, TupleTypeInfo, TypeExtractor} +import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.table.typeutils.{TimeIntervalTypeInfo, TypeCheckUtils} + +object CodeGenUtils { + + private val nameCounter = new AtomicInteger + + def newName(name: String): String = { + s"$name$$${nameCounter.getAndIncrement}" + } + + // when casting we first need to unbox Primitives, for example, + // float a = 1.0f; + // byte b = (byte) a; + // works, but for boxed types we need this: + // Float a = 1.0f; + // Byte b = (byte)(float) a; + def primitiveTypeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe match { + case INT_TYPE_INFO => "int" + case LONG_TYPE_INFO => "long" + case SHORT_TYPE_INFO => "short" + case BYTE_TYPE_INFO => "byte" + case FLOAT_TYPE_INFO => "float" + case DOUBLE_TYPE_INFO => "double" + case BOOLEAN_TYPE_INFO => "boolean" + case CHAR_TYPE_INFO => "char" + + // From PrimitiveArrayTypeInfo we would get class "int[]", scala reflections + // does not seem to like this, so we manually give the correct type here. + case INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]" + case LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]" + case SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]" + case BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]" + case FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]" + case DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]" + case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]" + case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]" + + // internal primitive representation of time points + case SqlTimeTypeInfo.DATE => "int" + case SqlTimeTypeInfo.TIME => "int" + case SqlTimeTypeInfo.TIMESTAMP => "long" + + // internal primitive representation of time intervals + case TimeIntervalTypeInfo.INTERVAL_MONTHS => "int" + case TimeIntervalTypeInfo.INTERVAL_MILLIS => "long" + + case _ => + tpe.getTypeClass.getCanonicalName + } + + def boxedTypeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe match { + // From PrimitiveArrayTypeInfo we would get class "int[]", scala reflections + // does not seem to like this, so we manually give the correct type here. + case INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]" + case LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]" + case SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]" + case BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]" + case FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]" + case DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]" + case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]" + case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]" + + case _ => + tpe.getTypeClass.getCanonicalName + } + + def primitiveDefaultValue(tpe: TypeInformation[_]): String = tpe match { + case INT_TYPE_INFO => "-1" + case LONG_TYPE_INFO => "-1L" + case SHORT_TYPE_INFO => "-1" + case BYTE_TYPE_INFO => "-1" + case FLOAT_TYPE_INFO => "-1.0f" + case DOUBLE_TYPE_INFO => "-1.0d" + case BOOLEAN_TYPE_INFO => "false" + case STRING_TYPE_INFO => "\"\"" + case CHAR_TYPE_INFO => "'\\0'" + case SqlTimeTypeInfo.DATE | SqlTimeTypeInfo.TIME => "-1" + case SqlTimeTypeInfo.TIMESTAMP => "-1L" + case TimeIntervalTypeInfo.INTERVAL_MONTHS => "-1" + case TimeIntervalTypeInfo.INTERVAL_MILLIS => "-1L" + + case _ => "null" + } + + def superPrimitive(typeInfo: TypeInformation[_]): String = typeInfo match { + case _: FractionalTypeInfo[_] => "double" + case _ => "long" + } + + def qualifyMethod(method: Method): String = + method.getDeclaringClass.getCanonicalName + "." + method.getName + + def qualifyEnum(enum: Enum[_]): String = + enum.getClass.getCanonicalName + "." + enum.name() + + def internalToTimePointCode(resultType: TypeInformation[_], resultTerm: String) = + resultType match { + case SqlTimeTypeInfo.DATE => + s"${qualifyMethod(BuiltInMethod.INTERNAL_TO_DATE.method)}($resultTerm)" + case SqlTimeTypeInfo.TIME => + s"${qualifyMethod(BuiltInMethod.INTERNAL_TO_TIME.method)}($resultTerm)" + case SqlTimeTypeInfo.TIMESTAMP => + s"${qualifyMethod(BuiltInMethod.INTERNAL_TO_TIMESTAMP.method)}($resultTerm)" + } + + def timePointToInternalCode(resultType: TypeInformation[_], resultTerm: String) = + resultType match { + case SqlTimeTypeInfo.DATE => + s"${qualifyMethod(BuiltInMethod.DATE_TO_INT.method)}($resultTerm)" + case SqlTimeTypeInfo.TIME => + s"${qualifyMethod(BuiltInMethod.TIME_TO_INT.method)}($resultTerm)" + case SqlTimeTypeInfo.TIMESTAMP => + s"${qualifyMethod(BuiltInMethod.TIMESTAMP_TO_LONG.method)}($resultTerm)" + } + + def compareEnum(term: String, enum: Enum[_]): Boolean = term == qualifyEnum(enum) + + def getEnum(genExpr: GeneratedExpression): Enum[_] = { + val split = genExpr.resultTerm.split('.') + val value = split.last + val clazz = genExpr.resultType.getTypeClass + enumValueOf(clazz, value) + } + + def enumValueOf[T <: Enum[T]](cls: Class[_], stringValue: String): Enum[_] = + Enum.valueOf(cls.asInstanceOf[Class[T]], stringValue).asInstanceOf[Enum[_]] + + // ---------------------------------------------------------------------------------------------- + + def requireNumeric(genExpr: GeneratedExpression) = + if (!TypeCheckUtils.isNumeric(genExpr.resultType)) { + throw new CodeGenException("Numeric expression type expected, but was " + + s"'${genExpr.resultType}'.") + } + + def requireComparable(genExpr: GeneratedExpression) = + if (!TypeCheckUtils.isComparable(genExpr.resultType)) { + throw new CodeGenException(s"Comparable type expected, but was '${genExpr.resultType}'.") + } + + def requireString(genExpr: GeneratedExpression) = + if (!TypeCheckUtils.isString(genExpr.resultType)) { + throw new CodeGenException("String expression type expected.") + } + + def requireBoolean(genExpr: GeneratedExpression) = + if (!TypeCheckUtils.isBoolean(genExpr.resultType)) { + throw new CodeGenException("Boolean expression type expected.") + } + + def requireTemporal(genExpr: GeneratedExpression) = + if (!TypeCheckUtils.isTemporal(genExpr.resultType)) { + throw new CodeGenException("Temporal expression type expected.") + } + + def requireTimeInterval(genExpr: GeneratedExpression) = + if (!TypeCheckUtils.isTimeInterval(genExpr.resultType)) { + throw new CodeGenException("Interval expression type expected.") + } + + def requireArray(genExpr: GeneratedExpression) = + if (!TypeCheckUtils.isArray(genExpr.resultType)) { + throw new CodeGenException("Array expression type expected.") + } + + def requireInteger(genExpr: GeneratedExpression) = + if (!TypeCheckUtils.isInteger(genExpr.resultType)) { + throw new CodeGenException("Integer expression type expected.") + } + + // ---------------------------------------------------------------------------------------------- + + def isReference(genExpr: GeneratedExpression): Boolean = isReference(genExpr.resultType) + + def isReference(typeInfo: TypeInformation[_]): Boolean = typeInfo match { + case INT_TYPE_INFO + | LONG_TYPE_INFO + | SHORT_TYPE_INFO + | BYTE_TYPE_INFO + | FLOAT_TYPE_INFO + | DOUBLE_TYPE_INFO + | BOOLEAN_TYPE_INFO + | CHAR_TYPE_INFO => false + case _ => true + } + + // ---------------------------------------------------------------------------------------------- + + sealed abstract class FieldAccessor + + case class ObjectFieldAccessor(field: Field) extends FieldAccessor + + case class ObjectGenericFieldAccessor(name: String) extends FieldAccessor + + case class ObjectPrivateFieldAccessor(field: Field) extends FieldAccessor + + case class ObjectMethodAccessor(methodName: String) extends FieldAccessor + + case class ProductAccessor(i: Int) extends FieldAccessor + + def fieldAccessorFor(compType: CompositeType[_], index: Int): FieldAccessor = { + compType match { + case ri: RowTypeInfo => + ProductAccessor(index) + + case cc: CaseClassTypeInfo[_] => + ObjectMethodAccessor(cc.getFieldNames()(index)) + + case javaTup: TupleTypeInfo[_] => + ObjectGenericFieldAccessor("f" + index) + + case pt: PojoTypeInfo[_] => + val fieldName = pt.getFieldNames()(index) + getFieldAccessor(pt.getTypeClass, fieldName) + + case _ => throw new CodeGenException("Unsupported composite type.") + } + } + + def getFieldAccessor(clazz: Class[_], fieldName: String): FieldAccessor = { + val field = TypeExtractor.getDeclaredField(clazz, fieldName) + if (field.isAccessible) { + ObjectFieldAccessor(field) + } + else { + ObjectPrivateFieldAccessor(field) + } + } + + def isFieldPrimitive(field: Field): Boolean = field.getType.isPrimitive + + def reflectiveFieldReadAccess(fieldTerm: String, field: Field, objectTerm: String): String = + field.getType match { + case java.lang.Integer.TYPE => s"$fieldTerm.getInt($objectTerm)" + case java.lang.Long.TYPE => s"$fieldTerm.getLong($objectTerm)" + case java.lang.Short.TYPE => s"$fieldTerm.getShort($objectTerm)" + case java.lang.Byte.TYPE => s"$fieldTerm.getByte($objectTerm)" + case java.lang.Float.TYPE => s"$fieldTerm.getFloat($objectTerm)" + case java.lang.Double.TYPE => s"$fieldTerm.getDouble($objectTerm)" + case java.lang.Boolean.TYPE => s"$fieldTerm.getBoolean($objectTerm)" + case java.lang.Character.TYPE => s"$fieldTerm.getChar($objectTerm)" + case _ => s"(${field.getType.getCanonicalName}) $fieldTerm.get($objectTerm)" + } + + def reflectiveFieldWriteAccess( + fieldTerm: String, + field: Field, + objectTerm: String, + valueTerm: String) + : String = + field.getType match { + case java.lang.Integer.TYPE => s"$fieldTerm.setInt($objectTerm, $valueTerm)" + case java.lang.Long.TYPE => s"$fieldTerm.setLong($objectTerm, $valueTerm)" + case java.lang.Short.TYPE => s"$fieldTerm.setShort($objectTerm, $valueTerm)" + case java.lang.Byte.TYPE => s"$fieldTerm.setByte($objectTerm, $valueTerm)" + case java.lang.Float.TYPE => s"$fieldTerm.setFloat($objectTerm, $valueTerm)" + case java.lang.Double.TYPE => s"$fieldTerm.setDouble($objectTerm, $valueTerm)" + case java.lang.Boolean.TYPE => s"$fieldTerm.setBoolean($objectTerm, $valueTerm)" + case java.lang.Character.TYPE => s"$fieldTerm.setChar($objectTerm, $valueTerm)" + case _ => s"$fieldTerm.set($objectTerm, $valueTerm)" + } +}