[FLINK-5915] [table] Forward the complete aggregate ArgList to aggregate runtime functions.
This closes #3647. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/91c90c5d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/91c90c5d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/91c90c5d Branch: refs/heads/table-retraction Commit: 91c90c5d2d38c286e128777aa7b8e14fc5321575 Parents: 4889028 Author: shaoxuan-wang <wshaox...@gmail.com> Authored: Fri Mar 31 17:07:09 2017 +0800 Committer: Fabian Hueske <fhue...@apache.org> Committed: Fri Mar 31 15:14:37 2017 +0200 ---------------------------------------------------------------------- .../table/runtime/aggregate/AggregateAggFunction.scala | 4 ++-- .../flink/table/runtime/aggregate/AggregateUtil.scala | 12 +++++++----- .../BoundedProcessingOverRowProcessFunction.scala | 6 +++--- .../table/runtime/aggregate/DataSetAggFunction.scala | 4 ++-- .../table/runtime/aggregate/DataSetPreAggFunction.scala | 4 ++-- .../runtime/aggregate/DataSetWindowAggMapFunction.scala | 4 ++-- .../RangeClauseBoundedOverProcessFunction.scala | 6 +++--- .../RowsClauseBoundedOverProcessFunction.scala | 6 +++--- .../UnboundedEventTimeOverProcessFunction.scala | 10 +++++----- ...dedNonPartitionedProcessingOverProcessFunction.scala | 4 ++-- .../UnboundedProcessingOverProcessFunction.scala | 4 ++-- 11 files changed, 33 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/91c90c5d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala index 11d55e5..c608b97 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateAggFunction.scala @@ -33,7 +33,7 @@ import org.apache.flink.types.Row */ class AggregateAggFunction( private val aggregates: Array[AggregateFunction[_]], - private val aggFields: Array[Int]) + private val aggFields: Array[Array[Int]]) extends DataStreamAggFunc[Row, Row, Row] { override def createAccumulator(): Row = { @@ -51,7 +51,7 @@ class AggregateAggFunction( var i = 0 while (i < aggregates.length) { val acc = accumulatorRow.getField(i).asInstanceOf[Accumulator] - val v = value.getField(aggFields(i)) + val v = value.getField(aggFields(i)(0)) aggregates(i).accumulate(acc, v) i += 1 } http://git-wip-us.apache.org/repos/asf/flink/blob/91c90c5d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala index 88e9d68..74dc5cd 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala @@ -45,6 +45,7 @@ import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeIn import org.apache.flink.types.Row import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer object AggregateUtil { @@ -886,10 +887,10 @@ object AggregateUtil { aggregateCalls: Seq[AggregateCall], inputType: RelDataType, needRetraction: Boolean) - : (Array[Int], Array[TableAggregateFunction[_ <: Any]]) = { + : (Array[Array[Int]], Array[TableAggregateFunction[_ <: Any]]) = { // store the aggregate fields of each aggregate function, by the same order of aggregates. - val aggFieldIndexes = new Array[Int](aggregateCalls.size) + val aggFieldIndexes = new Array[Array[Int]](aggregateCalls.size) val aggregates = new Array[TableAggregateFunction[_ <: Any]](aggregateCalls.size) // create aggregate function instances by function type and aggregate field data type. @@ -897,7 +898,7 @@ object AggregateUtil { val argList: util.List[Integer] = aggregateCall.getArgList if (argList.isEmpty) { if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) { - aggFieldIndexes(index) = 0 + aggFieldIndexes(index) = Array[Int](0) } else { throw new TableException("Aggregate fields should not be empty.") } @@ -905,9 +906,10 @@ object AggregateUtil { if (argList.size() > 1) { throw new TableException("Currently, do not support aggregate on multi fields.") } - aggFieldIndexes(index) = argList.get(0) + aggFieldIndexes(index) = argList.asScala.map(i => i.intValue).toArray } - val sqlTypeName = inputType.getFieldList.get(aggFieldIndexes(index)).getType.getSqlTypeName + val sqlTypeName = inputType.getFieldList.get(aggFieldIndexes(index)(0)).getType + .getSqlTypeName aggregateCall.getAggregation match { case _: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction => http://git-wip-us.apache.org/repos/asf/flink/blob/91c90c5d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala index 454b177..1b990ec 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/BoundedProcessingOverRowProcessFunction.scala @@ -37,7 +37,7 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo class BoundedProcessingOverRowProcessFunction( private val aggregates: Array[AggregateFunction[_]], - private val aggFields: Array[Int], + private val aggFields: Array[Array[Int]], private val precedingOffset: Long, private val forwardedFieldCount: Int, private val aggregatesTypeInfo: RowTypeInfo, @@ -118,7 +118,7 @@ class BoundedProcessingOverRowProcessFunction( i = 0 while (i < aggregates.length) { val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] - aggregates(i).retract(accumulator, retractList.get(0).getField(aggFields(i))) + aggregates(i).retract(accumulator, retractList.get(0).getField(aggFields(i)(0))) i += 1 } retractList.remove(0) @@ -157,7 +157,7 @@ class BoundedProcessingOverRowProcessFunction( while (i < aggregates.length) { val index = forwardedFieldCount + i val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] - aggregates(i).accumulate(accumulator, input.getField(aggFields(i))) + aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0))) output.setField(index, aggregates(i).getValue(accumulator)) i += 1 } http://git-wip-us.apache.org/repos/asf/flink/blob/91c90c5d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala index 5babcf2..867943e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetAggFunction.scala @@ -38,7 +38,7 @@ import org.apache.flink.util.{Collector, Preconditions} */ class DataSetAggFunction( private val aggregates: Array[AggregateFunction[_ <: Any]], - private val aggInFields: Array[Int], + private val aggInFields: Array[Array[Int]], private val aggOutMapping: Array[(Int, Int)], private val gkeyOutMapping: Array[(Int, Int)], private val groupingSetsMapping: Array[(Int, Int)], @@ -82,7 +82,7 @@ class DataSetAggFunction( // accumulate i = 0 while (i < aggregates.length) { - aggregates(i).accumulate(accumulators(i), record.getField(aggInFields(i))) + aggregates(i).accumulate(accumulators(i), record.getField(aggInFields(i)(0))) i += 1 } http://git-wip-us.apache.org/repos/asf/flink/blob/91c90c5d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala index c18dcdc..db49a53 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetPreAggFunction.scala @@ -35,7 +35,7 @@ import org.apache.flink.util.{Collector, Preconditions} */ class DataSetPreAggFunction( private val aggregates: Array[AggregateFunction[_ <: Any]], - private val aggInFields: Array[Int], + private val aggInFields: Array[Array[Int]], private val groupingKeys: Array[Int]) extends AbstractRichFunction with GroupCombineFunction[Row, Row] @@ -78,7 +78,7 @@ class DataSetPreAggFunction( // accumulate i = 0 while (i < aggregates.length) { - aggregates(i).accumulate(accumulators(i), record.getField(aggInFields(i))) + aggregates(i).accumulate(accumulators(i), record.getField(aggInFields(i)(0))) i += 1 } // check if this record is the last record http://git-wip-us.apache.org/repos/asf/flink/blob/91c90c5d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala index f7f3387..5cc7ada 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetWindowAggMapFunction.scala @@ -36,7 +36,7 @@ import org.apache.flink.util.Preconditions */ class DataSetWindowAggMapFunction( private val aggregates: Array[AggregateFunction[_]], - private val aggFields: Array[Int], + private val aggFields: Array[Array[Int]], private val groupingKeys: Array[Int], private val timeFieldPos: Int, // time field position in input row private val tumbleTimeWindowSize: Option[Long], @@ -62,7 +62,7 @@ class DataSetWindowAggMapFunction( var i = 0 while (i < aggregates.length) { val agg = aggregates(i) - val fieldValue = input.getField(aggFields(i)) + val fieldValue = input.getField(aggFields(i)(0)) val accumulator = agg.createAccumulator() agg.accumulate(accumulator, fieldValue) output.setField(groupingKeys.length + i, accumulator) http://git-wip-us.apache.org/repos/asf/flink/blob/91c90c5d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala index 0c8555b..9a57c56 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RangeClauseBoundedOverProcessFunction.scala @@ -40,7 +40,7 @@ import org.apache.flink.util.{Collector, Preconditions} */ class RangeClauseBoundedOverProcessFunction( private val aggregates: Array[AggregateFunction[_]], - private val aggFields: Array[Int], + private val aggFields: Array[Array[Int]], private val forwardedFieldCount: Int, private val aggregationStateType: RowTypeInfo, private val inputRowType: RowTypeInfo, @@ -160,7 +160,7 @@ class RangeClauseBoundedOverProcessFunction( val accumulator = accumulators.getField(aggregatesIndex).asInstanceOf[Accumulator] aggregates(aggregatesIndex) .retract(accumulator, retractDataList.get(dataListIndex) - .getField(aggFields(aggregatesIndex))) + .getField(aggFields(aggregatesIndex)(0))) aggregatesIndex += 1 } dataListIndex += 1 @@ -177,7 +177,7 @@ class RangeClauseBoundedOverProcessFunction( while (aggregatesIndex < aggregates.length) { val accumulator = accumulators.getField(aggregatesIndex).asInstanceOf[Accumulator] aggregates(aggregatesIndex).accumulate(accumulator, inputs.get(dataListIndex) - .getField(aggFields(aggregatesIndex))) + .getField(aggFields(aggregatesIndex)(0))) aggregatesIndex += 1 } dataListIndex += 1 http://git-wip-us.apache.org/repos/asf/flink/blob/91c90c5d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala index 1678d57..8cc1f26 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowsClauseBoundedOverProcessFunction.scala @@ -41,7 +41,7 @@ import org.apache.flink.util.{Collector, Preconditions} */ class RowsClauseBoundedOverProcessFunction( private val aggregates: Array[AggregateFunction[_]], - private val aggFields: Array[Int], + private val aggFields: Array[Array[Int]], private val forwardedFieldCount: Int, private val aggregationStateType: RowTypeInfo, private val inputRowType: RowTypeInfo, @@ -202,7 +202,7 @@ class RowsClauseBoundedOverProcessFunction( i = 0 while (i < aggregates.length) { val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] - aggregates(i).retract(accumulator, retractRow.getField(aggFields(i))) + aggregates(i).retract(accumulator, retractRow.getField(aggFields(i)(0))) i += 1 } } @@ -212,7 +212,7 @@ class RowsClauseBoundedOverProcessFunction( while (i < aggregates.length) { val index = forwardedFieldCount + i val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] - aggregates(i).accumulate(accumulator, input.getField(aggFields(i))) + aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0))) output.setField(index, aggregates(i).getValue(accumulator)) i += 1 } http://git-wip-us.apache.org/repos/asf/flink/blob/91c90c5d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala index 92faf7d..3f7f35f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedEventTimeOverProcessFunction.scala @@ -43,7 +43,7 @@ import org.apache.flink.table.functions.{Accumulator, AggregateFunction} */ abstract class UnboundedEventTimeOverProcessFunction( private val aggregates: Array[AggregateFunction[_]], - private val aggFields: Array[Int], + private val aggFields: Array[Array[Int]], private val forwardedFieldCount: Int, private val intermediateType: TypeInformation[Row], private val inputType: TypeInformation[Row]) @@ -217,7 +217,7 @@ abstract class UnboundedEventTimeOverProcessFunction( */ class UnboundedEventTimeRowsOverProcessFunction( aggregates: Array[AggregateFunction[_]], - aggFields: Array[Int], + aggFields: Array[Array[Int]], forwardedFieldCount: Int, intermediateType: TypeInformation[Row], inputType: TypeInformation[Row]) @@ -250,7 +250,7 @@ class UnboundedEventTimeRowsOverProcessFunction( while (i < aggregates.length) { val index = forwardedFieldCount + i val accumulator = lastAccumulator.getField(i).asInstanceOf[Accumulator] - aggregates(i).accumulate(accumulator, curRow.getField(aggFields(i))) + aggregates(i).accumulate(accumulator, curRow.getField(aggFields(i)(0))) output.setField(index, aggregates(i).getValue(accumulator)) i += 1 } @@ -269,7 +269,7 @@ class UnboundedEventTimeRowsOverProcessFunction( */ class UnboundedEventTimeRangeOverProcessFunction( aggregates: Array[AggregateFunction[_]], - aggFields: Array[Int], + aggFields: Array[Array[Int]], forwardedFieldCount: Int, intermediateType: TypeInformation[Row], inputType: TypeInformation[Row]) @@ -294,7 +294,7 @@ class UnboundedEventTimeRangeOverProcessFunction( while (i < aggregates.length) { val index = forwardedFieldCount + i val accumulator = lastAccumulator.getField(i).asInstanceOf[Accumulator] - aggregates(i).accumulate(accumulator, curRow.getField(aggFields(i))) + aggregates(i).accumulate(accumulator, curRow.getField(aggFields(i)(0))) i += 1 } j += 1 http://git-wip-us.apache.org/repos/asf/flink/blob/91c90c5d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala index 7750511..713d91b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedNonPartitionedProcessingOverProcessFunction.scala @@ -37,7 +37,7 @@ import org.apache.flink.util.{Collector, Preconditions} */ class UnboundedNonPartitionedProcessingOverProcessFunction( private val aggregates: Array[AggregateFunction[_]], - private val aggFields: Array[Int], + private val aggFields: Array[Array[Int]], private val forwardedFieldCount: Int, private val aggregationStateType: RowTypeInfo) extends ProcessFunction[Row, Row] with CheckpointedFunction{ @@ -82,7 +82,7 @@ class UnboundedNonPartitionedProcessingOverProcessFunction( while (i < aggregates.length) { val index = forwardedFieldCount + i val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] - aggregates(i).accumulate(accumulator, input.getField(aggFields(i))) + aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0))) output.setField(index, aggregates(i).getValue(accumulator)) i += 1 } http://git-wip-us.apache.org/repos/asf/flink/blob/91c90c5d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala index 41f8e8c..0276290 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/UnboundedProcessingOverProcessFunction.scala @@ -28,7 +28,7 @@ import org.apache.flink.table.functions.{Accumulator, AggregateFunction} class UnboundedProcessingOverProcessFunction( private val aggregates: Array[AggregateFunction[_]], - private val aggFields: Array[Int], + private val aggFields: Array[Array[Int]], private val forwardedFieldCount: Int, private val aggregationStateType: RowTypeInfo) extends ProcessFunction[Row, Row]{ @@ -75,7 +75,7 @@ class UnboundedProcessingOverProcessFunction( while (i < aggregates.length) { val index = forwardedFieldCount + i val accumulator = accumulators.getField(i).asInstanceOf[Accumulator] - aggregates(i).accumulate(accumulator, input.getField(aggFields(i))) + aggregates(i).accumulate(accumulator, input.getField(aggFields(i)(0))) output.setField(index, aggregates(i).getValue(accumulator)) i += 1 }