http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala new file mode 100644 index 0000000..8e95c93 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowInputMapRunner.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.slf4j.LoggerFactory + +/** + * MapRunner with [[CRow]] input. + */ +class CRowInputMapRunner[OUT]( + name: String, + code: String, + @transient returnType: TypeInformation[OUT]) + extends RichMapFunction[CRow, OUT] + with ResultTypeQueryable[OUT] + with Compiler[MapFunction[Row, OUT]] { + + val LOG = LoggerFactory.getLogger(this.getClass) + + private var function: MapFunction[Row, OUT] = _ + + override def open(parameters: Configuration): Unit = { + LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code") + val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) + LOG.debug("Instantiating MapFunction.") + function = clazz.newInstance() + } + + override def map(in: CRow): OUT = { + function.map(in.row) + } + + override def getProducedType: TypeInformation[OUT] = returnType +}
http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala new file mode 100644 index 0000000..966dea9 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowOutputMapRunner.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.slf4j.LoggerFactory + +/** + * MapRunner with [[CRow]] output. + */ +class CRowOutputMapRunner( + name: String, + code: String, + @transient returnType: TypeInformation[CRow]) + extends RichMapFunction[Any, CRow] + with ResultTypeQueryable[CRow] + with Compiler[MapFunction[Any, Row]] { + + val LOG = LoggerFactory.getLogger(this.getClass) + + private var function: MapFunction[Any, Row] = _ + private var outCRow: CRow = _ + + override def open(parameters: Configuration): Unit = { + LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code") + val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) + LOG.debug("Instantiating MapFunction.") + function = clazz.newInstance() + outCRow = new CRow(null, true) + } + + override def map(in: Any): CRow = { + outCRow.row = function.map(in) + outCRow + } + + override def getProducedType: TypeInformation[CRow] = returnType +} http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowWrappingCollector.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowWrappingCollector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowWrappingCollector.scala new file mode 100644 index 0000000..b2b062e --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowWrappingCollector.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime + +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * The collector is used to wrap a [[Row]] to a [[CRow]] + */ +class CRowWrappingCollector() extends Collector[Row] { + + var out: Collector[CRow] = _ + val outCRow: CRow = new CRow() + + def setChange(change: Boolean): Unit = this.outCRow.change = change + + override def collect(record: Row): Unit = { + outCRow.row = record + out.collect(outCRow) + } + + override def close(): Unit = out.close() +} http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala index b446306..2e37baf 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/FlatMapRunner.scala @@ -24,20 +24,21 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.ResultTypeQueryable import org.apache.flink.table.codegen.Compiler import org.apache.flink.configuration.Configuration +import org.apache.flink.types.Row import org.apache.flink.util.Collector import org.slf4j.LoggerFactory -class FlatMapRunner[IN, OUT]( +class FlatMapRunner( name: String, code: String, - @transient returnType: TypeInformation[OUT]) - extends RichFlatMapFunction[IN, OUT] - with ResultTypeQueryable[OUT] - with Compiler[FlatMapFunction[IN, OUT]] { + @transient returnType: TypeInformation[Row]) + extends RichFlatMapFunction[Row, Row] + with ResultTypeQueryable[Row] + with Compiler[FlatMapFunction[Row, Row]] { val LOG = LoggerFactory.getLogger(this.getClass) - private var function: FlatMapFunction[IN, OUT] = _ + private var function: FlatMapFunction[Row, Row] = _ override def open(parameters: Configuration): Unit = { LOG.debug(s"Compiling FlatMapFunction: $name \n\n Code:\n$code") @@ -48,10 +49,10 @@ class FlatMapRunner[IN, OUT]( FunctionUtils.openFunction(function, parameters) } - override def flatMap(in: IN, out: Collector[OUT]): Unit = + override def flatMap(in: Row, out: Collector[Row]): Unit = function.flatMap(in, out) - override def getProducedType: TypeInformation[OUT] = returnType + override def getProducedType: TypeInformation[Row] = returnType override def close(): Unit = { FunctionUtils.closeFunction(function) http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala index 377e0ff..dd9c015 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.runtime.aggregate import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} +import org.apache.flink.table.runtime.types.CRow import org.apache.flink.types.Row import org.slf4j.LoggerFactory @@ -30,28 +31,28 @@ import org.slf4j.LoggerFactory * @param genAggregations Generated aggregate helper function */ class AggregateAggFunction(genAggregations: GeneratedAggregationsFunction) - extends AggregateFunction[Row, Row, Row] with Compiler[GeneratedAggregations] { + extends AggregateFunction[CRow, Row, Row] with Compiler[GeneratedAggregations] { val LOG = LoggerFactory.getLogger(this.getClass) private var function: GeneratedAggregations = _ override def createAccumulator(): Row = { if (function == null) { - initFunction + initFunction() } function.createAccumulators() } - override def add(value: Row, accumulatorRow: Row): Unit = { + override def add(value: CRow, accumulatorRow: Row): Unit = { if (function == null) { - initFunction + initFunction() } - function.accumulate(accumulatorRow, value) + function.accumulate(accumulatorRow, value.row) } override def getResult(accumulatorRow: Row): Row = { if (function == null) { - initFunction + initFunction() } val output = function.createOutputRow() function.setAggregationResults(accumulatorRow, output) @@ -60,7 +61,7 @@ class AggregateAggFunction(genAggregations: GeneratedAggregationsFunction) override def merge(aAccumulatorRow: Row, bAccumulatorRow: Row): Row = { if (function == null) { - initFunction + initFunction() } function.mergeAccumulatorsPair(aAccumulatorRow, bAccumulatorRow) } http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala index 5e9efd0..768c9cb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala @@ -44,6 +44,7 @@ import org.apache.flink.table.functions.utils.AggSqlFunction import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._ import org.apache.flink.table.functions.{AggregateFunction => TableAggregateFunction} import org.apache.flink.table.plan.logical._ +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.apache.flink.table.typeutils.TypeCheckUtils._ import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo} import org.apache.flink.types.Row @@ -79,7 +80,7 @@ object AggregateUtil { isRowTimeType: Boolean, isPartitioned: Boolean, isRowsClause: Boolean) - : ProcessFunction[Row, Row] = { + : ProcessFunction[CRow, CRow] = { val (aggFields, aggregates) = transformToAggregateFunctions( @@ -116,13 +117,13 @@ object AggregateUtil { new RowTimeUnboundedRowsOver( genFunction, aggregationStateType, - inputTypeInfo) + CRowTypeInfo(inputTypeInfo)) } else { // RANGE unbounded over process function new RowTimeUnboundedRangeOver( genFunction, aggregationStateType, - inputTypeInfo) + CRowTypeInfo(inputTypeInfo)) } } else { if (isPartitioned) { @@ -153,13 +154,16 @@ object AggregateUtil { namedAggregates: Seq[CalcitePair[AggregateCall, String]], inputRowType: RelDataType, inputFieldTypes: Seq[TypeInformation[_]], - groupings: Array[Int]): ProcessFunction[Row, Row] = { + groupings: Array[Int], + generateRetraction: Boolean, + consumeRetraction: Boolean): ProcessFunction[CRow, CRow] = { val (aggFields, aggregates) = transformToAggregateFunctions( namedAggregates.map(_.getKey), inputRowType, - needRetraction = false) + consumeRetraction) + val aggMapping = aggregates.indices.map(_ + groupings.length).toArray val outputArity = groupings.length + aggregates.length @@ -178,14 +182,16 @@ object AggregateUtil { None, None, outputArity, - needRetract = false, + consumeRetraction, needMerge = false, needReset = false ) new GroupAggProcessFunction( genFunction, - aggregationStateType) + aggregationStateType, + generateRetraction) + } /** @@ -198,7 +204,7 @@ object AggregateUtil { * @param inputTypeInfo Physical type information of the row. * @param inputFieldTypeInfo Physical type information of the row's fields. * @param precedingOffset the preceding offset - * @param isRowsClause It is a tag that indicates whether the OVER clause is ROWS clause + * @param isRowsClause It is a tag that indicates whether the OVER clause is ROWS clause * @param isRowTimeType It is a tag that indicates whether the time type is rowTimeType * @return [[org.apache.flink.streaming.api.functions.ProcessFunction]] */ @@ -211,7 +217,7 @@ object AggregateUtil { precedingOffset: Long, isRowsClause: Boolean, isRowTimeType: Boolean) - : ProcessFunction[Row, Row] = { + : ProcessFunction[CRow, CRow] = { val needRetract = true val (aggFields, aggregates) = @@ -221,6 +227,7 @@ object AggregateUtil { needRetract) val aggregationStateType: RowTypeInfo = createAccumulatorRowType(aggregates) + val inputRowType = CRowTypeInfo(inputTypeInfo) val forwardMapping = (0 until inputType.getFieldCount).toArray val aggMapping = aggregates.indices.map(x => x + inputType.getFieldCount).toArray @@ -248,14 +255,14 @@ object AggregateUtil { new RowTimeBoundedRowsOver( genFunction, aggregationStateType, - inputTypeInfo, + inputRowType, precedingOffset ) } else { new RowTimeBoundedRangeOver( genFunction, aggregationStateType, - inputTypeInfo, + inputRowType, precedingOffset ) } @@ -265,13 +272,13 @@ object AggregateUtil { genFunction, precedingOffset, aggregationStateType, - inputTypeInfo) + inputRowType) } else { new ProcTimeBoundedRangeOver( genFunction, precedingOffset, aggregationStateType, - inputTypeInfo) + inputRowType) } } } @@ -932,7 +939,7 @@ object AggregateUtil { window: LogicalWindow, finalRowArity: Int, properties: Seq[NamedWindowProperty]) - : AllWindowFunction[Row, Row, DataStreamWindow] = { + : AllWindowFunction[Row, CRow, DataStreamWindow] = { if (isTimeWindow(window)) { val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) @@ -940,7 +947,7 @@ object AggregateUtil { startPos, endPos, finalRowArity) - .asInstanceOf[AllWindowFunction[Row, Row, DataStreamWindow]] + .asInstanceOf[AllWindowFunction[Row, CRow, DataStreamWindow]] } else { new IncrementalAggregateAllWindowFunction( finalRowArity) @@ -955,8 +962,8 @@ object AggregateUtil { numGroupingKeys: Int, numAggregates: Int, finalRowArity: Int, - properties: Seq[NamedWindowProperty]) - : WindowFunction[Row, Row, Tuple, DataStreamWindow] = { + properties: Seq[NamedWindowProperty]): + WindowFunction[Row, CRow, Tuple, DataStreamWindow] = { if (isTimeWindow(window)) { val (startPos, endPos) = computeWindowStartEndPropertyPos(properties) @@ -966,7 +973,7 @@ object AggregateUtil { startPos, endPos, finalRowArity) - .asInstanceOf[WindowFunction[Row, Row, Tuple, DataStreamWindow]] + .asInstanceOf[WindowFunction[Row, CRow, Tuple, DataStreamWindow]] } else { new IncrementalAggregateWindowFunction( numGroupingKeys, @@ -981,8 +988,9 @@ object AggregateUtil { inputType: RelDataType, inputFieldTypeInfo: Seq[TypeInformation[_]], outputType: RelDataType, + groupingKeys: Array[Int], needMerge: Boolean) - : (DataStreamAggFunction[Row, Row, Row], RowTypeInfo, RowTypeInfo) = { + : (DataStreamAggFunction[CRow, Row, Row], RowTypeInfo, RowTypeInfo) = { val needRetract = false val (aggFields, aggregates) = @@ -1002,7 +1010,7 @@ object AggregateUtil { aggFields, aggMapping, partialResults = false, - Array(), // no fields are forwarded + groupingKeys, None, None, outputArity, http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala index 95699a2..fabf200 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSessionWindowAggReduceGroupFunction.scala @@ -56,7 +56,7 @@ class DataSetSessionWindowAggReduceGroupFunction( extends RichGroupReduceFunction[Row, Row] with Compiler[GeneratedAggregations] { - private var collector: TimeWindowPropertyCollector = _ + private var collector: RowTimeWindowPropertyCollector = _ private val intermediateRowWindowStartPos = keysAndAggregatesArity private val intermediateRowWindowEndPos = keysAndAggregatesArity + 1 @@ -78,7 +78,7 @@ class DataSetSessionWindowAggReduceGroupFunction( output = function.createOutputRow() accumulators = function.createAccumulators() - collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) + collector = new RowTimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala index a221c53..56ed08a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetSlideWindowAggReduceGroupFunction.scala @@ -47,7 +47,7 @@ class DataSetSlideWindowAggReduceGroupFunction( extends RichGroupReduceFunction[Row, Row] with Compiler[GeneratedAggregations] { - private var collector: TimeWindowPropertyCollector = _ + private var collector: RowTimeWindowPropertyCollector = _ protected val windowStartPos: Int = keysAndAggregatesArity private var output: Row = _ @@ -68,7 +68,7 @@ class DataSetSlideWindowAggReduceGroupFunction( output = function.createOutputRow() accumulators = function.createAccumulators() - collector = new TimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) + collector = new RowTimeWindowPropertyCollector(finalRowWindowStartPos, finalRowWindowEndPos) } override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala index f4a1fc5..8af2c2e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetTumbleTimeWindowAggReduceGroupFunction.scala @@ -46,7 +46,7 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction( extends RichGroupReduceFunction[Row, Row] with Compiler[GeneratedAggregations] { - private var collector: TimeWindowPropertyCollector = _ + private var collector: RowTimeWindowPropertyCollector = _ protected var aggregateBuffer: Row = new Row(keysAndAggregatesArity + 1) private var output: Row = _ @@ -67,7 +67,7 @@ class DataSetTumbleTimeWindowAggReduceGroupFunction( output = function.createOutputRow() accumulators = function.createAccumulators() - collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos) + collector = new RowTimeWindowPropertyCollector(windowStartPos, windowEndPos) } override def reduce(records: Iterable[Row], out: Collector[Row]): Unit = { http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala index 81c900c..745f24d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala @@ -26,6 +26,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.common.state.ValueState import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} import org.slf4j.LoggerFactory +import org.apache.flink.table.runtime.types.CRow /** * Aggregate Function used for the groupby (without window) aggregate @@ -35,14 +36,17 @@ import org.slf4j.LoggerFactory */ class GroupAggProcessFunction( private val genAggregations: GeneratedAggregationsFunction, - private val aggregationStateType: RowTypeInfo) - extends ProcessFunction[Row, Row] + private val aggregationStateType: RowTypeInfo, + private val generateRetraction: Boolean) + extends ProcessFunction[CRow, CRow] with Compiler[GeneratedAggregations] { val LOG = LoggerFactory.getLogger(this.getClass) private var function: GeneratedAggregations = _ - private var output: Row = _ + private var newRow: CRow = _ + private var prevRow: CRow = _ + private var firstRow: Boolean = _ private var state: ValueState[Row] = _ override def open(config: Configuration) { @@ -54,7 +58,9 @@ class GroupAggProcessFunction( genAggregations.code) LOG.debug("Instantiating AggregateHelper.") function = clazz.newInstance() - output = function.createOutputRow() + + newRow = new CRow(function.createOutputRow(), true) + prevRow = new CRow(function.createOutputRow(), false) val stateDescriptor: ValueStateDescriptor[Row] = new ValueStateDescriptor[Row]("GroupAggregateState", aggregationStateType) @@ -62,29 +68,53 @@ class GroupAggProcessFunction( } override def processElement( - input: Row, - ctx: ProcessFunction[Row, Row]#Context, - out: Collector[Row]): Unit = { + inputC: CRow, + ctx: ProcessFunction[CRow, CRow]#Context, + out: Collector[CRow]): Unit = { + + val input = inputC.row // get accumulators var accumulators = state.value() if (null == accumulators) { + firstRow = true accumulators = function.createAccumulators() + } else { + firstRow = false } // Set group keys value to the final output - function.setForwardedFields(input, output) + function.setForwardedFields(input, newRow.row) + function.setForwardedFields(input, prevRow.row) - // accumulate new input row - function.accumulate(accumulators, input) + // Set previous aggregate result to the prevRow + function.setAggregationResults(accumulators, prevRow.row) - // set aggregation results to output - function.setAggregationResults(accumulators, output) + // update aggregate result and set to the newRow + if (inputC.change) { + // accumulate input + function.accumulate(accumulators, input) + function.setAggregationResults(accumulators, newRow.row) + } else { + // retract input + function.retract(accumulators, input) + function.setAggregationResults(accumulators, newRow.row) + } // update accumulators state.update(accumulators) - out.collect(output) - } + // if previousRow is not null, do retraction process + if (generateRetraction && !firstRow) { + if (prevRow.row.equals(newRow.row)) { + // ignore same newRow + return + } else { + // retract previous row + out.collect(prevRow) + } + } + out.collect(newRow) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala index ec9b654..711cc05 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllTimeWindowFunction.scala @@ -22,6 +22,7 @@ import java.lang.Iterable import org.apache.flink.types.Row import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.runtime.types.CRow import org.apache.flink.util.Collector /** @@ -39,17 +40,17 @@ class IncrementalAggregateAllTimeWindowFunction( extends IncrementalAggregateAllWindowFunction[TimeWindow]( finalRowArity) { - private var collector: TimeWindowPropertyCollector = _ + private var collector: CRowTimeWindowPropertyCollector = _ override def open(parameters: Configuration): Unit = { - collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos) + collector = new CRowTimeWindowPropertyCollector(windowStartPos, windowEndPos) super.open(parameters) } override def apply( window: TimeWindow, records: Iterable[Row], - out: Collector[Row]): Unit = { + out: Collector[CRow]): Unit = { // set collector and window collector.wrappedCollector = out http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala index f92be92..c190785 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateAllWindowFunction.scala @@ -23,6 +23,7 @@ import org.apache.flink.types.Row import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.windowing.RichAllWindowFunction import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.table.runtime.types.CRow import org.apache.flink.util.Collector /** @@ -32,12 +33,12 @@ import org.apache.flink.util.Collector */ class IncrementalAggregateAllWindowFunction[W <: Window]( private val finalRowArity: Int) - extends RichAllWindowFunction[Row, Row, W] { + extends RichAllWindowFunction[Row, CRow, W] { - private var output: Row = _ + private var output: CRow = _ override def open(parameters: Configuration): Unit = { - output = new Row(finalRowArity) + output = new CRow(new Row(finalRowArity), true) } /** @@ -47,7 +48,7 @@ class IncrementalAggregateAllWindowFunction[W <: Window]( override def apply( window: W, records: Iterable[Row], - out: Collector[Row]): Unit = { + out: Collector[CRow]): Unit = { val iterator = records.iterator @@ -55,7 +56,7 @@ class IncrementalAggregateAllWindowFunction[W <: Window]( val record = iterator.next() var i = 0 while (i < record.getArity) { - output.setField(i, record.getField(i)) + output.row.setField(i, record.getField(i)) i += 1 } out.collect(output) http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala index dccb4f6..809bbfd 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateTimeWindowFunction.scala @@ -23,6 +23,7 @@ import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.types.Row import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.table.runtime.types.CRow import org.apache.flink.util.Collector /** @@ -43,10 +44,10 @@ class IncrementalAggregateTimeWindowFunction( numAggregates, finalRowArity) { - private var collector: TimeWindowPropertyCollector = _ + private var collector: CRowTimeWindowPropertyCollector = _ override def open(parameters: Configuration): Unit = { - collector = new TimeWindowPropertyCollector(windowStartPos, windowEndPos) + collector = new CRowTimeWindowPropertyCollector(windowStartPos, windowEndPos) super.open(parameters) } @@ -54,7 +55,7 @@ class IncrementalAggregateTimeWindowFunction( key: Tuple, window: TimeWindow, records: Iterable[Row], - out: Collector[Row]): Unit = { + out: Collector[CRow]): Unit = { // set collector and window collector.wrappedCollector = out http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala index 983efb3..7e9d738 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/IncrementalAggregateWindowFunction.scala @@ -24,6 +24,7 @@ import org.apache.flink.types.Row import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.table.runtime.types.CRow import org.apache.flink.util.Collector /** @@ -37,12 +38,12 @@ class IncrementalAggregateWindowFunction[W <: Window]( private val numGroupingKey: Int, private val numAggregates: Int, private val finalRowArity: Int) - extends RichWindowFunction[Row, Row, Tuple, W] { + extends RichWindowFunction[Row, CRow, Tuple, W] { - private var output: Row = _ + private var output: CRow = _ override def open(parameters: Configuration): Unit = { - output = new Row(finalRowArity) + output = new CRow(new Row(finalRowArity), true) } /** @@ -53,7 +54,7 @@ class IncrementalAggregateWindowFunction[W <: Window]( key: Tuple, window: W, records: Iterable[Row], - out: Collector[Row]): Unit = { + out: Collector[CRow]): Unit = { val iterator = records.iterator @@ -62,12 +63,12 @@ class IncrementalAggregateWindowFunction[W <: Window]( var i = 0 while (i < numGroupingKey) { - output.setField(i, key.getField(i)) + output.row.setField(i, key.getField(i)) i += 1 } i = 0 while (i < numAggregates) { - output.setField(numGroupingKey + i, record.getField(i)) + output.row.setField(numGroupingKey + i, record.getField(i)) i += 1 } http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala index b63eb81..3fb506f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala @@ -31,7 +31,8 @@ import org.apache.flink.api.java.typeutils.ListTypeInfo import java.util.{ArrayList, List => JList} import org.apache.flink.api.common.typeinfo.BasicTypeInfo -import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler} +import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.slf4j.LoggerFactory /** @@ -47,10 +48,10 @@ class ProcTimeBoundedRangeOver( genAggregations: GeneratedAggregationsFunction, precedingTimeBoundary: Long, aggregatesTypeInfo: RowTypeInfo, - inputType: TypeInformation[Row]) - extends ProcessFunction[Row, Row] + inputType: TypeInformation[CRow]) + extends ProcessFunction[CRow, CRow] with Compiler[GeneratedAggregations] { - private var output: Row = _ + private var output: CRow = _ private var accumulatorState: ValueState[Row] = _ private var rowMapState: MapState[Long, JList[Row]] = _ @@ -66,11 +67,12 @@ class ProcTimeBoundedRangeOver( genAggregations.code) LOG.debug("Instantiating AggregateHelper.") function = clazz.newInstance() - output = function.createOutputRow() + output = new CRow(function.createOutputRow(), true) // We keep the elements received in a MapState indexed based on their ingestion time val rowListTypeInfo: TypeInformation[JList[Row]] = - new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] + new ListTypeInfo[Row](inputType.asInstanceOf[CRowTypeInfo].rowType) + .asInstanceOf[TypeInformation[JList[Row]]] val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = new MapStateDescriptor[Long, JList[Row]]("rowmapstate", BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) @@ -82,9 +84,9 @@ class ProcTimeBoundedRangeOver( } override def processElement( - input: Row, - ctx: ProcessFunction[Row, Row]#Context, - out: Collector[Row]): Unit = { + input: CRow, + ctx: ProcessFunction[CRow, CRow]#Context, + out: Collector[CRow]): Unit = { val currentTime = ctx.timerService.currentProcessingTime // buffer the event incoming event @@ -97,15 +99,15 @@ class ProcTimeBoundedRangeOver( // register timer to process event once the current millisecond passed ctx.timerService.registerProcessingTimeTimer(currentTime + 1) } - rowList.add(input) + rowList.add(input.row) rowMapState.put(currentTime, rowList) } override def onTimer( timestamp: Long, - ctx: ProcessFunction[Row, Row]#OnTimerContext, - out: Collector[Row]): Unit = { + ctx: ProcessFunction[CRow, CRow]#OnTimerContext, + out: Collector[CRow]): Unit = { // we consider the original timestamp of events that have registered this time trigger 1 ms ago val currentTime = timestamp - 1 @@ -166,10 +168,10 @@ class ProcTimeBoundedRangeOver( val input = currentElements.get(iElemenets) // set the fields of the last event to carry on with the aggregates - function.setForwardedFields(input, output) + function.setForwardedFields(input, output.row) // add the accumulators values to result - function.setAggregationResults(accumulators, output) + function.setAggregationResults(accumulators, output.row) out.collect(output) iElemenets += 1 } http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala index 31cfd73..0c7f44e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRowsOver.scala @@ -33,7 +33,8 @@ import org.apache.flink.api.java.typeutils.ListTypeInfo import java.util.{List => JList} import org.apache.flink.api.common.typeinfo.BasicTypeInfo -import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler} +import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.slf4j.LoggerFactory /** @@ -48,15 +49,15 @@ class ProcTimeBoundedRowsOver( genAggregations: GeneratedAggregationsFunction, precedingOffset: Long, aggregatesTypeInfo: RowTypeInfo, - inputType: TypeInformation[Row]) - extends ProcessFunction[Row, Row] + inputType: TypeInformation[CRow]) + extends ProcessFunction[CRow, CRow] with Compiler[GeneratedAggregations] { Preconditions.checkArgument(precedingOffset > 0) private var accumulatorState: ValueState[Row] = _ private var rowMapState: MapState[Long, JList[Row]] = _ - private var output: Row = _ + private var output: CRow = _ private var counterState: ValueState[Long] = _ private var smallestTsState: ValueState[Long] = _ @@ -73,13 +74,14 @@ class ProcTimeBoundedRowsOver( LOG.debug("Instantiating AggregateHelper.") function = clazz.newInstance() - output = function.createOutputRow() + output = new CRow(function.createOutputRow(), true) // We keep the elements received in a Map state keyed // by the ingestion time in the operator. // we also keep counter of processed elements // and timestamp of oldest element val rowListTypeInfo: TypeInformation[JList[Row]] = - new ListTypeInfo[Row](inputType).asInstanceOf[TypeInformation[JList[Row]]] + new ListTypeInfo[Row](inputType.asInstanceOf[CRowTypeInfo].rowType) + .asInstanceOf[TypeInformation[JList[Row]]] val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = new MapStateDescriptor[Long, JList[Row]]("windowBufferMapState", @@ -100,9 +102,11 @@ class ProcTimeBoundedRowsOver( } override def processElement( - input: Row, - ctx: ProcessFunction[Row, Row]#Context, - out: Collector[Row]): Unit = { + inputC: CRow, + ctx: ProcessFunction[CRow, CRow]#Context, + out: Collector[CRow]): Unit = { + + val input = inputC.row val currentTime = ctx.timerService.currentProcessingTime @@ -154,11 +158,11 @@ class ProcTimeBoundedRowsOver( } // copy forwarded fields in output row - function.setForwardedFields(input, output) + function.setForwardedFields(input, output.row) // accumulate current row and set aggregate in output row function.accumulate(accumulators, input) - function.setAggregationResults(accumulators, output) + function.setAggregationResults(accumulators, output.row) // update map state, accumulator state, counter and timestamp val currentTimeState = rowMapState.get(currentTime) http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala index 75209db..8a23132 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedNonPartitionedOver.scala @@ -23,9 +23,10 @@ import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext} import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction import org.apache.flink.streaming.api.functions.ProcessFunction -import org.apache.flink.types.Row import org.apache.flink.util.Collector -import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler} +import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} +import org.apache.flink.types.Row import org.slf4j.LoggerFactory /** @@ -37,12 +38,12 @@ import org.slf4j.LoggerFactory class ProcTimeUnboundedNonPartitionedOver( genAggregations: GeneratedAggregationsFunction, aggregationStateType: RowTypeInfo) - extends ProcessFunction[Row, Row] + extends ProcessFunction[CRow, CRow] with CheckpointedFunction with Compiler[GeneratedAggregations] { private var accumulators: Row = _ - private var output: Row = _ + private var output: CRow = _ private var state: ListState[Row] = _ val LOG = LoggerFactory.getLogger(this.getClass) @@ -58,7 +59,7 @@ class ProcTimeUnboundedNonPartitionedOver( LOG.debug("Instantiating AggregateHelper.") function = clazz.newInstance() - output = function.createOutputRow() + output = new CRow(function.createOutputRow(), true) if (null == accumulators) { val it = state.get().iterator() if (it.hasNext) { @@ -70,14 +71,16 @@ class ProcTimeUnboundedNonPartitionedOver( } override def processElement( - input: Row, - ctx: ProcessFunction[Row, Row]#Context, - out: Collector[Row]): Unit = { + inputC: CRow, + ctx: ProcessFunction[CRow, CRow]#Context, + out: Collector[CRow]): Unit = { + + val input = inputC.row - function.setForwardedFields(input, output) + function.setForwardedFields(input, output.row) function.accumulate(accumulators, input) - function.setAggregationResults(accumulators, output) + function.setAggregationResults(accumulators, output.row) out.collect(output) } http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala index 9baa6a3..847c1bf 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeUnboundedPartitionedOver.scala @@ -24,7 +24,8 @@ import org.apache.flink.util.Collector import org.apache.flink.api.common.state.ValueStateDescriptor import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.common.state.ValueState -import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler} +import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} +import org.apache.flink.table.runtime.types.CRow import org.slf4j.LoggerFactory /** @@ -36,10 +37,10 @@ import org.slf4j.LoggerFactory class ProcTimeUnboundedPartitionedOver( genAggregations: GeneratedAggregationsFunction, aggregationStateType: RowTypeInfo) - extends ProcessFunction[Row, Row] + extends ProcessFunction[CRow, CRow] with Compiler[GeneratedAggregations] { - private var output: Row = _ + private var output: CRow = _ private var state: ValueState[Row] = _ val LOG = LoggerFactory.getLogger(this.getClass) private var function: GeneratedAggregations = _ @@ -54,16 +55,18 @@ class ProcTimeUnboundedPartitionedOver( LOG.debug("Instantiating AggregateHelper.") function = clazz.newInstance() - output = function.createOutputRow() + output = new CRow(function.createOutputRow(), true) val stateDescriptor: ValueStateDescriptor[Row] = new ValueStateDescriptor[Row]("overState", aggregationStateType) state = getRuntimeContext.getState(stateDescriptor) } override def processElement( - input: Row, - ctx: ProcessFunction[Row, Row]#Context, - out: Collector[Row]): Unit = { + inputC: CRow, + ctx: ProcessFunction[CRow, CRow]#Context, + out: Collector[CRow]): Unit = { + + val input = inputC.row var accumulators = state.value() @@ -71,13 +74,12 @@ class ProcTimeUnboundedPartitionedOver( accumulators = function.createAccumulators() } - function.setForwardedFields(input, output) + function.setForwardedFields(input, output.row) function.accumulate(accumulators, input) - function.setAggregationResults(accumulators, output) + function.setAggregationResults(accumulators, output.row) state.update(accumulators) - out.collect(output) } http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala index ef97e71..4020d44 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRangeOver.scala @@ -17,14 +17,15 @@ */ package org.apache.flink.table.runtime.aggregate -import java.util.{List => JList, ArrayList => JArrayList} +import java.util.{ArrayList => JArrayList, List => JList} import org.apache.flink.api.common.state._ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} -import org.apache.flink.api.java.typeutils.ListTypeInfo +import org.apache.flink.api.java.typeutils.{ListTypeInfo, RowTypeInfo} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.ProcessFunction -import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler} +import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.apache.flink.types.Row import org.apache.flink.util.{Collector, Preconditions} import org.slf4j.LoggerFactory @@ -39,15 +40,15 @@ import org.slf4j.LoggerFactory */ class RowTimeBoundedRangeOver( genAggregations: GeneratedAggregationsFunction, - aggregationStateType: TypeInformation[Row], - inputRowType: TypeInformation[Row], + aggregationStateType: RowTypeInfo, + inputRowType: CRowTypeInfo, precedingOffset: Long) - extends ProcessFunction[Row, Row] + extends ProcessFunction[CRow, CRow] with Compiler[GeneratedAggregations] { Preconditions.checkNotNull(aggregationStateType) Preconditions.checkNotNull(precedingOffset) - private var output: Row = _ + private var output: CRow = _ // the state which keeps the last triggering timestamp private var lastTriggeringTsState: ValueState[Long] = _ @@ -74,7 +75,7 @@ class RowTimeBoundedRangeOver( LOG.debug("Instantiating AggregateHelper.") function = clazz.newInstance() - output = function.createOutputRow() + output = new CRow(function.createOutputRow(), true) val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) @@ -86,7 +87,8 @@ class RowTimeBoundedRangeOver( val keyTypeInformation: TypeInformation[Long] = BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]] - val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputRowType) + val valueTypeInformation: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputRowType.asInstanceOf[CRowTypeInfo].rowType) val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = new MapStateDescriptor[Long, JList[Row]]( @@ -98,9 +100,11 @@ class RowTimeBoundedRangeOver( } override def processElement( - input: Row, - ctx: ProcessFunction[Row, Row]#Context, - out: Collector[Row]): Unit = { + inputC: CRow, + ctx: ProcessFunction[CRow, CRow]#Context, + out: Collector[CRow]): Unit = { + + val input = inputC.row // triggering timestamp for trigger calculation val triggeringTs = ctx.timestamp @@ -125,8 +129,8 @@ class RowTimeBoundedRangeOver( override def onTimer( timestamp: Long, - ctx: ProcessFunction[Row, Row]#OnTimerContext, - out: Collector[Row]): Unit = { + ctx: ProcessFunction[CRow, CRow]#OnTimerContext, + out: Collector[CRow]): Unit = { // gets all window data from state for the calculation val inputs: JList[Row] = dataState.get(timestamp) @@ -172,13 +176,13 @@ class RowTimeBoundedRangeOver( } // set aggregate in output row - function.setAggregationResults(accumulators, output) + function.setAggregationResults(accumulators, output.row) // copy forwarded fields to output row and emit output row dataListIndex = 0 while (dataListIndex < inputs.size()) { aggregatesIndex = 0 - function.setForwardedFields(inputs.get(dataListIndex), output) + function.setForwardedFields(inputs.get(dataListIndex), output.row) out.collect(output) dataListIndex += 1 } http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala index 7169cf7..5ec6ec7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeBoundedRowsOver.scala @@ -27,7 +27,8 @@ import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.ProcessFunction import org.apache.flink.types.Row import org.apache.flink.util.{Collector, Preconditions} -import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler} +import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.slf4j.LoggerFactory /** @@ -41,15 +42,15 @@ import org.slf4j.LoggerFactory class RowTimeBoundedRowsOver( genAggregations: GeneratedAggregationsFunction, aggregationStateType: RowTypeInfo, - inputRowType: TypeInformation[Row], + inputRowType: CRowTypeInfo, precedingOffset: Long) - extends ProcessFunction[Row, Row] + extends ProcessFunction[CRow, CRow] with Compiler[GeneratedAggregations] { Preconditions.checkNotNull(aggregationStateType) Preconditions.checkNotNull(precedingOffset) - private var output: Row = _ + private var output: CRow = _ // the state which keeps the last triggering timestamp private var lastTriggeringTsState: ValueState[Long] = _ @@ -79,7 +80,7 @@ class RowTimeBoundedRowsOver( LOG.debug("Instantiating AggregateHelper.") function = clazz.newInstance() - output = function.createOutputRow() + output = new CRow(function.createOutputRow(), true) val lastTriggeringTsDescriptor: ValueStateDescriptor[Long] = new ValueStateDescriptor[Long]("lastTriggeringTsState", classOf[Long]) @@ -95,7 +96,8 @@ class RowTimeBoundedRowsOver( val keyTypeInformation: TypeInformation[Long] = BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]] - val valueTypeInformation: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputRowType) + val valueTypeInformation: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputRowType.asInstanceOf[CRowTypeInfo].rowType) val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = new MapStateDescriptor[Long, JList[Row]]( @@ -107,9 +109,11 @@ class RowTimeBoundedRowsOver( } override def processElement( - input: Row, - ctx: ProcessFunction[Row, Row]#Context, - out: Collector[Row]): Unit = { + inputC: CRow, + ctx: ProcessFunction[CRow, CRow]#Context, + out: Collector[CRow]): Unit = { + + val input = inputC.row // triggering timestamp for trigger calculation val triggeringTs = ctx.timestamp @@ -134,8 +138,8 @@ class RowTimeBoundedRowsOver( override def onTimer( timestamp: Long, - ctx: ProcessFunction[Row, Row]#OnTimerContext, - out: Collector[Row]): Unit = { + ctx: ProcessFunction[CRow, CRow]#OnTimerContext, + out: Collector[CRow]): Unit = { // gets all window data from state for the calculation val inputs: JList[Row] = dataState.get(timestamp) @@ -189,7 +193,7 @@ class RowTimeBoundedRowsOver( } // copy forwarded fields to output row - function.setForwardedFields(input, output) + function.setForwardedFields(input, output.row) // retract old row from accumulators if (null != retractRow) { @@ -198,7 +202,7 @@ class RowTimeBoundedRowsOver( // accumulate current row and set aggregate in output row function.accumulate(accumulators, input) - function.setAggregationResults(accumulators, output) + function.setAggregationResults(accumulators, output.row) i += 1 out.collect(output) http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala index 525d4d7..3e2a811 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala @@ -28,7 +28,8 @@ import org.apache.flink.util.{Collector, Preconditions} import org.apache.flink.api.common.state._ import org.apache.flink.api.java.typeutils.ListTypeInfo import org.apache.flink.streaming.api.operators.TimestampedCollector -import org.apache.flink.table.codegen.{GeneratedAggregationsFunction, Compiler} +import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} import org.slf4j.LoggerFactory @@ -42,11 +43,11 @@ import org.slf4j.LoggerFactory abstract class RowTimeUnboundedOver( genAggregations: GeneratedAggregationsFunction, intermediateType: TypeInformation[Row], - inputType: TypeInformation[Row]) - extends ProcessFunction[Row, Row] + inputType: TypeInformation[CRow]) + extends ProcessFunction[CRow, CRow] with Compiler[GeneratedAggregations] { - protected var output: Row = _ + protected var output: CRow = _ // state to hold the accumulators of the aggregations private var accumulatorState: ValueState[Row] = _ // state to hold rows until the next watermark arrives @@ -67,7 +68,7 @@ abstract class RowTimeUnboundedOver( LOG.debug("Instantiating AggregateHelper.") function = clazz.newInstance() - output = function.createOutputRow() + output = new CRow(function.createOutputRow(), true) sortedTimestamps = new util.LinkedList[Long]() // initialize accumulator state @@ -76,7 +77,8 @@ abstract class RowTimeUnboundedOver( accumulatorState = getRuntimeContext.getState[Row](accDescriptor) // initialize row state - val rowListTypeInfo: TypeInformation[JList[Row]] = new ListTypeInfo[Row](inputType) + val rowListTypeInfo: TypeInformation[JList[Row]] = + new ListTypeInfo[Row](inputType.asInstanceOf[CRowTypeInfo].rowType) val mapStateDescriptor: MapStateDescriptor[Long, JList[Row]] = new MapStateDescriptor[Long, JList[Row]]("rowmapstate", BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], rowListTypeInfo) @@ -87,15 +89,17 @@ abstract class RowTimeUnboundedOver( * Puts an element from the input stream into state if it is not late. * Registers a timer for the next watermark. * - * @param input The input value. + * @param inputC The input value. * @param ctx The ctx to register timer or get current time * @param out The collector for returning result values. * */ override def processElement( - input: Row, - ctx: ProcessFunction[Row, Row]#Context, - out: Collector[Row]): Unit = { + inputC: CRow, + ctx: ProcessFunction[CRow, CRow]#Context, + out: Collector[CRow]): Unit = { + + val input = inputC.row val timestamp = ctx.timestamp() val curWatermark = ctx.timerService().currentWatermark() @@ -126,11 +130,11 @@ abstract class RowTimeUnboundedOver( */ override def onTimer( timestamp: Long, - ctx: ProcessFunction[Row, Row]#OnTimerContext, - out: Collector[Row]): Unit = { + ctx: ProcessFunction[CRow, CRow]#OnTimerContext, + out: Collector[CRow]): Unit = { - Preconditions.checkArgument(out.isInstanceOf[TimestampedCollector[Row]]) - val collector = out.asInstanceOf[TimestampedCollector[Row]] + Preconditions.checkArgument(out.isInstanceOf[TimestampedCollector[CRow]]) + val collector = out.asInstanceOf[TimestampedCollector[CRow]] val keyIterator = rowMapState.keys.iterator if (keyIterator.hasNext) { @@ -206,7 +210,7 @@ abstract class RowTimeUnboundedOver( def processElementsWithSameTimestamp( curRowList: JList[Row], lastAccumulator: Row, - out: Collector[Row]): Unit + out: Collector[CRow]): Unit } @@ -217,7 +221,7 @@ abstract class RowTimeUnboundedOver( class RowTimeUnboundedRowsOver( genAggregations: GeneratedAggregationsFunction, intermediateType: TypeInformation[Row], - inputType: TypeInformation[Row]) + inputType: TypeInformation[CRow]) extends RowTimeUnboundedOver( genAggregations: GeneratedAggregationsFunction, intermediateType, @@ -226,7 +230,7 @@ class RowTimeUnboundedRowsOver( override def processElementsWithSameTimestamp( curRowList: JList[Row], lastAccumulator: Row, - out: Collector[Row]): Unit = { + out: Collector[CRow]): Unit = { var i = 0 while (i < curRowList.size) { @@ -234,11 +238,11 @@ class RowTimeUnboundedRowsOver( var j = 0 // copy forwarded fields to output row - function.setForwardedFields(curRow, output) + function.setForwardedFields(curRow, output.row) // update accumulators and copy aggregates to output row function.accumulate(lastAccumulator, curRow) - function.setAggregationResults(lastAccumulator, output) + function.setAggregationResults(lastAccumulator, output.row) // emit output row out.collect(output) i += 1 @@ -255,7 +259,7 @@ class RowTimeUnboundedRowsOver( class RowTimeUnboundedRangeOver( genAggregations: GeneratedAggregationsFunction, intermediateType: TypeInformation[Row], - inputType: TypeInformation[Row]) + inputType: TypeInformation[CRow]) extends RowTimeUnboundedOver( genAggregations: GeneratedAggregationsFunction, intermediateType, @@ -264,7 +268,7 @@ class RowTimeUnboundedRangeOver( override def processElementsWithSameTimestamp( curRowList: JList[Row], lastAccumulator: Row, - out: Collector[Row]): Unit = { + out: Collector[CRow]): Unit = { var i = 0 // all same timestamp data should have same aggregation value. @@ -281,10 +285,10 @@ class RowTimeUnboundedRangeOver( val curRow = curRowList.get(i) // copy forwarded fields to output row - function.setForwardedFields(curRow, output) + function.setForwardedFields(curRow, output.row) //copy aggregates to output row - function.setAggregationResults(lastAccumulator, output) + function.setAggregationResults(lastAccumulator, output.row) out.collect(output) i += 1 } http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala index 9502607..0c8ae00 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala @@ -19,6 +19,7 @@ package org.apache.flink.table.runtime.aggregate import org.apache.calcite.runtime.SqlFunctions +import org.apache.flink.table.runtime.types.CRow import org.apache.flink.types.Row import org.apache.flink.util.Collector @@ -26,29 +27,48 @@ import org.apache.flink.util.Collector * Adds TimeWindow properties to specified fields of a row before it emits the row to a wrapped * collector. */ -class TimeWindowPropertyCollector(windowStartOffset: Option[Int], windowEndOffset: Option[Int]) - extends Collector[Row] { +abstract class TimeWindowPropertyCollector[T]( + windowStartOffset: Option[Int], + windowEndOffset: Option[Int]) + extends Collector[T] { - var wrappedCollector: Collector[Row] = _ + var wrappedCollector: Collector[T] = _ + var output: Row = _ var windowStart:Long = _ var windowEnd:Long = _ - override def collect(record: Row): Unit = { + def getRow(record: T): Row - val lastFieldPos = record.getArity - 1 + override def collect(record: T): Unit = { + + output = getRow(record) + val lastFieldPos = output.getArity - 1 if (windowStartOffset.isDefined) { - record.setField( + output.setField( lastFieldPos + windowStartOffset.get, SqlFunctions.internalToTimestamp(windowStart)) } if (windowEndOffset.isDefined) { - record.setField( + output.setField( lastFieldPos + windowEndOffset.get, SqlFunctions.internalToTimestamp(windowEnd)) } + wrappedCollector.collect(record) } override def close(): Unit = wrappedCollector.close() } + +class RowTimeWindowPropertyCollector(startOffset: Option[Int], endOffset: Option[Int]) + extends TimeWindowPropertyCollector[Row](startOffset, endOffset) { + + override def getRow(record: Row): Row = record +} + +class CRowTimeWindowPropertyCollector(startOffset: Option[Int], endOffset: Option[Int]) + extends TimeWindowPropertyCollector[CRow](startOffset, endOffset) { + + override def getRow(record: CRow): Row = record.row +} http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala new file mode 100644 index 0000000..ec73fa6 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/CRowValuesInputFormat.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.io + +import org.apache.flink.api.common.io.{GenericInputFormat, NonParallelInput} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.core.io.GenericInputSplit +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.slf4j.LoggerFactory + +class CRowValuesInputFormat( + name: String, + code: String, + @transient returnType: TypeInformation[CRow]) + extends GenericInputFormat[CRow] + with NonParallelInput + with ResultTypeQueryable[CRow] + with Compiler[GenericInputFormat[Row]] { + + val LOG = LoggerFactory.getLogger(this.getClass) + + private var format: GenericInputFormat[Row] = _ + + override def open(split: GenericInputSplit): Unit = { + LOG.debug(s"Compiling GenericInputFormat: $name \n\n Code:\n$code") + val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) + LOG.debug("Instantiating GenericInputFormat.") + format = clazz.newInstance() + } + + override def reachedEnd(): Boolean = format.reachedEnd() + + override def nextRecord(reuse: CRow): CRow = { + reuse.row = format.nextRecord(reuse.row) + reuse.change = true + reuse + } + + override def getProducedType: TypeInformation[CRow] = returnType +} http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala index 1a339e6..d536b39 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/io/ValuesInputFormat.scala @@ -23,20 +23,21 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.ResultTypeQueryable import org.apache.flink.table.codegen.Compiler import org.apache.flink.core.io.GenericInputSplit +import org.apache.flink.types.Row import org.slf4j.LoggerFactory -class ValuesInputFormat[OUT]( +class ValuesInputFormat( name: String, code: String, - @transient returnType: TypeInformation[OUT]) - extends GenericInputFormat[OUT] + @transient returnType: TypeInformation[Row]) + extends GenericInputFormat[Row] with NonParallelInput - with ResultTypeQueryable[OUT] - with Compiler[GenericInputFormat[OUT]] { + with ResultTypeQueryable[Row] + with Compiler[GenericInputFormat[Row]] { val LOG = LoggerFactory.getLogger(this.getClass) - private var format: GenericInputFormat[OUT] = _ + private var format: GenericInputFormat[Row] = _ override def open(split: GenericInputSplit): Unit = { LOG.debug(s"Compiling GenericInputFormat: $name \n\n Code:\n$code") @@ -47,7 +48,7 @@ class ValuesInputFormat[OUT]( override def reachedEnd(): Boolean = format.reachedEnd() - override def nextRecord(reuse: OUT): OUT = format.nextRecord(reuse) + override def nextRecord(reuse: Row): Row = format.nextRecord(reuse) - override def getProducedType: TypeInformation[OUT] = returnType + override def getProducedType: TypeInformation[Row] = returnType } http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRow.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRow.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRow.scala new file mode 100644 index 0000000..25ec8c4 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRow.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.types + +import org.apache.flink.types.Row + +/** + * Wrapper for a [[Row]] to add retraction information. + * + * If [[change]] is true, the [[CRow]] is an accumulate message, if it is false it is a + * retraction message. + * + * @param row The wrapped [[Row]]. + * @param change true for an accumulate message, false for a retraction message. + */ +class CRow(var row: Row, var change: Boolean) { + + def this() { + this(null, true) + } + + override def toString: String = s"${if(change) "+" else "-"}$row" + + override def equals(other: scala.Any): Boolean = { + val otherCRow = other.asInstanceOf[CRow] + row.equals(otherCRow.row) && change == otherCRow.change + } +} + +object CRow { + + def apply(): CRow = { + new CRow() + } + + def apply(row: Row, change: Boolean): CRow = { + new CRow(row, change) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowComparator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowComparator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowComparator.scala new file mode 100644 index 0000000..d848c65 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowComparator.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.types + +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.core.memory.{DataInputView, DataOutputView, MemorySegment} +import org.apache.flink.types.Row + +class CRowComparator(val rowComp: TypeComparator[Row]) extends TypeComparator[CRow] { + + override def hash(record: CRow): Int = rowComp.hash(record.row) + + override def setReference(toCompare: CRow): Unit = rowComp.setReference(toCompare.row) + + override def equalToReference(candidate: CRow): Boolean = rowComp.equalToReference(candidate.row) + + override def compareToReference(otherComp: TypeComparator[CRow]): Int = { + val otherCRowComp = otherComp.asInstanceOf[CRowComparator] + rowComp.compareToReference(otherCRowComp.rowComp) + } + + override def compare(first: CRow, second: CRow): Int = { + rowComp.compare(first.row, second.row) + } + + override def compareSerialized(firstSource: DataInputView, secondSource: DataInputView): Int = { + rowComp.compareSerialized(firstSource, secondSource) + } + + override def supportsNormalizedKey(): Boolean = rowComp.supportsNormalizedKey() + + override def supportsSerializationWithKeyNormalization(): Boolean = + rowComp.supportsSerializationWithKeyNormalization() + + override def getNormalizeKeyLen: Int = rowComp.getNormalizeKeyLen + + override def isNormalizedKeyPrefixOnly(keyBytes: Int): Boolean = + rowComp.isNormalizedKeyPrefixOnly(keyBytes) + + override def putNormalizedKey( + record: CRow, + target: MemorySegment, + offset: Int, + numBytes: Int): Unit = rowComp.putNormalizedKey(record.row, target, offset, numBytes) + + override def writeWithKeyNormalization(record: CRow, target: DataOutputView): Unit = { + rowComp.writeWithKeyNormalization(record.row, target) + target.writeBoolean(record.change) + } + + override def readWithKeyDenormalization(reuse: CRow, source: DataInputView): CRow = { + val row = rowComp.readWithKeyDenormalization(reuse.row, source) + reuse.row = row + reuse.change = source.readBoolean() + reuse + } + + override def invertNormalizedKey(): Boolean = rowComp.invertNormalizedKey() + + override def duplicate(): TypeComparator[CRow] = new CRowComparator(rowComp.duplicate()) + + override def extractKeys(record: scala.Any, target: Array[AnyRef], index: Int): Int = + rowComp.extractKeys(record.asInstanceOf[CRow].row, target, index) + + override def getFlatComparators: Array[TypeComparator[_]] = + rowComp.getFlatComparators +} http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala new file mode 100644 index 0000000..1f56a98 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowSerializer.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.types + +import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.core.memory.{DataInputView, DataOutputView} +import org.apache.flink.types.Row + +class CRowSerializer(val rowSerializer: TypeSerializer[Row]) extends TypeSerializer[CRow] { + + override def isImmutableType: Boolean = false + + override def duplicate(): TypeSerializer[CRow] = new CRowSerializer(rowSerializer.duplicate()) + + override def createInstance(): CRow = new CRow(rowSerializer.createInstance(), true) + + override def copy(from: CRow): CRow = new CRow(rowSerializer.copy(from.row), from.change) + + override def copy(from: CRow, reuse: CRow): CRow = { + rowSerializer.copy(from.row, reuse.row) + reuse.change = from.change + reuse + } + + override def getLength: Int = -1 + + override def serialize(record: CRow, target: DataOutputView): Unit = { + rowSerializer.serialize(record.row, target) + target.writeBoolean(record.change) + } + + override def deserialize(source: DataInputView): CRow = { + val row = rowSerializer.deserialize(source) + val change = source.readBoolean() + new CRow(row, change) + } + + override def deserialize(reuse: CRow, source: DataInputView): CRow = { + rowSerializer.deserialize(reuse.row, source) + reuse.change = source.readBoolean() + reuse + } + + override def copy(source: DataInputView, target: DataOutputView): Unit = { + rowSerializer.copy(source, target) + target.writeBoolean(source.readBoolean()) + } + + override def canEqual(obj: Any): Boolean = obj.isInstanceOf[CRowSerializer] + + override def equals(obj: Any): Boolean = { + + if (canEqual(obj)) { + val other = obj.asInstanceOf[CRowSerializer] + rowSerializer.equals(other.rowSerializer) + } else { + false + } + } + + override def hashCode: Int = rowSerializer.hashCode() * 13 +} http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowTypeInfo.scala new file mode 100644 index 0000000..456207a --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/types/CRowTypeInfo.scala @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.types + +import java.util + +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeutils.{CompositeType, TypeComparator, TypeSerializer} +import org.apache.flink.api.common.typeutils.CompositeType.{FlatFieldDescriptor, TypeComparatorBuilder} +import org.apache.flink.api.java.typeutils.RowTypeInfo +import org.apache.flink.types.Row + +class CRowTypeInfo(val rowType: RowTypeInfo) extends CompositeType[CRow](classOf[CRow]) { + + override def getFieldNames: Array[String] = rowType.getFieldNames + + override def getFieldIndex(fieldName: String): Int = rowType.getFieldIndex(fieldName) + + override def getTypeAt[X](fieldExpression: String): TypeInformation[X] = + rowType.getTypeAt(fieldExpression) + + override def getTypeAt[X](pos: Int): TypeInformation[X] = + rowType.getTypeAt(pos) + + override def getFlatFields( + fieldExpression: String, + offset: Int, + result: util.List[FlatFieldDescriptor]): Unit = + rowType.getFlatFields(fieldExpression, offset, result) + + override def isBasicType: Boolean = rowType.isBasicType + + override def isTupleType: Boolean = rowType.isTupleType + + override def getArity: Int = rowType.getArity + + override def getTotalFields: Int = rowType.getTotalFields + + override def createSerializer(config: ExecutionConfig): TypeSerializer[CRow] = + new CRowSerializer(rowType.createSerializer(config)) + + // not implemented because we override createComparator + override protected def createTypeComparatorBuilder(): TypeComparatorBuilder[CRow] = null + + override def createComparator( + logicalKeyFields: Array[Int], + orders: Array[Boolean], + logicalFieldOffset: Int, + config: ExecutionConfig): TypeComparator[CRow] = { + + val rowComparator = rowType.createComparator( + logicalKeyFields, + orders, + logicalFieldOffset, + config) + + new CRowComparator(rowComparator) + } + + override def equals(obj: scala.Any): Boolean = { + if (this.canEqual(obj)) { + rowType.equals(obj.asInstanceOf[CRowTypeInfo].rowType) + } else { + false + } + } + + override def canEqual(obj: scala.Any): Boolean = obj.isInstanceOf[CRowTypeInfo] + +} + +object CRowTypeInfo { + + def apply(rowType: TypeInformation[Row]): CRowTypeInfo = { + rowType match { + case r: RowTypeInfo => new CRowTypeInfo(r) + } + } + +} + http://git-wip-us.apache.org/repos/asf/flink/blob/27bf4cab/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala index c37ee74..4a2fcdf 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/CsvTableSink.scala @@ -25,6 +25,7 @@ import org.apache.flink.types.Row import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.core.fs.FileSystem.WriteMode import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} /** * A simple [[TableSink]] to emit data as CSV files. @@ -133,3 +134,4 @@ class CsvFormatter(fieldDelim: String) extends MapFunction[Row, String] { builder.mkString } } +