This is an automated email from the ASF dual-hosted git repository. dianfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new bba028a [FLINK-20858][python][table-planner-blink] Port StreamExecPythonGroupWindowAggregate and BatchExecPythonGroupWindowAggregate to Java (#14665) bba028a is described below commit bba028a38f056b7c111944a7a32307c3e812857f Author: HuangXingBo <hxbks...@gmail.com> AuthorDate: Mon Jan 18 20:21:46 2021 +0800 [FLINK-20858][python][table-planner-blink] Port StreamExecPythonGroupWindowAggregate and BatchExecPythonGroupWindowAggregate to Java (#14665) --- .../batch/BatchExecPythonGroupWindowAggregate.java | 216 +++++++++++++ .../StreamExecPythonGroupWindowAggregate.java | 342 +++++++++++++++++++++ .../BatchExecPythonGroupWindowAggregate.scala | 179 ----------- .../StreamExecPythonGroupWindowAggregate.scala | 270 ---------------- .../BatchPhysicalPythonGroupWindowAggregate.scala | 2 +- 5 files changed, 559 insertions(+), 450 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java new file mode 100644 index 0000000..299ed9c --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.batch; + +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.config.ExecutionConfigOptions; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.functions.python.PythonFunctionInfo; +import org.apache.flink.table.planner.calcite.FlinkRelBuilder; +import org.apache.flink.table.planner.codegen.agg.batch.WindowCodeGenerator; +import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.expressions.PlannerRowtimeAttribute; +import org.apache.flink.table.planner.expressions.PlannerWindowEnd; +import org.apache.flink.table.planner.expressions.PlannerWindowProperty; +import org.apache.flink.table.planner.expressions.PlannerWindowStart; +import org.apache.flink.table.planner.plan.logical.LogicalWindow; +import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonAggregate; +import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.calcite.rel.core.AggregateCall; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.Arrays; + +/** Batch [[ExecNode]] for group widow aggregate (Python user defined aggregate function). */ +public class BatchExecPythonGroupWindowAggregate extends CommonExecPythonAggregate + implements BatchExecNode<RowData> { + + private static final String ARROW_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME = + "org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch." + + "BatchArrowPythonGroupWindowAggregateFunctionOperator"; + + private final int[] grouping; + private final int[] groupingSet; + private final AggregateCall[] aggCalls; + private final LogicalWindow window; + private final int inputTimeFieldIndex; + private final FlinkRelBuilder.PlannerNamedWindowProperty[] namedWindowProperties; + + public BatchExecPythonGroupWindowAggregate( + int[] grouping, + int[] groupingSet, + AggregateCall[] aggCalls, + LogicalWindow window, + int inputTimeFieldIndex, + FlinkRelBuilder.PlannerNamedWindowProperty[] namedWindowProperties, + ExecEdge inputEdge, + RowType outputType, + String description) { + super(inputEdge, outputType, description); + this.grouping = grouping; + this.groupingSet = groupingSet; + this.aggCalls = aggCalls; + this.window = window; + this.inputTimeFieldIndex = inputTimeFieldIndex; + this.namedWindowProperties = namedWindowProperties; + } + + @Override + protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) { + final ExecNode<RowData> inputNode = (ExecNode<RowData>) getInputNodes().get(0); + final Transformation<RowData> inputTransform = inputNode.translateToPlan(planner); + final RowType inputRowType = (RowType) inputNode.getOutputType(); + final RowType outputRowType = InternalTypeInfo.of(getOutputType()).toRowType(); + + final Tuple2<Long, Long> windowSizeAndSlideSize = WindowCodeGenerator.getWindowDef(window); + final TableConfig tableConfig = planner.getTableConfig(); + final Configuration config = + CommonPythonUtil.getMergedConfig(planner.getExecEnv(), tableConfig); + int groupBufferLimitSize = + config.getInteger(ExecutionConfigOptions.TABLE_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT); + + OneInputTransformation<RowData, RowData> transform = + createPythonOneInputTransformation( + inputTransform, + inputRowType, + outputRowType, + groupBufferLimitSize, + windowSizeAndSlideSize.f0, + windowSizeAndSlideSize.f1, + config); + if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(config)) { + transform.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON); + } + return transform; + } + + private OneInputTransformation<RowData, RowData> createPythonOneInputTransformation( + Transformation<RowData> inputTransform, + RowType inputRowType, + RowType outputRowType, + int maxLimitSize, + long windowSize, + long slideSize, + Configuration config) { + int[] namePropertyTypeArray = + Arrays.stream(namedWindowProperties) + .mapToInt( + p -> { + PlannerWindowProperty property = p.property(); + if (property instanceof PlannerWindowStart) { + return 0; + } + if (property instanceof PlannerWindowEnd) { + return 1; + } + if (property instanceof PlannerRowtimeAttribute) { + return 2; + } + throw new TableException("Unexpected property " + property); + }) + .toArray(); + Tuple2<int[], PythonFunctionInfo[]> aggInfos = + extractPythonAggregateFunctionInfosFromAggregateCall(aggCalls); + int[] pythonUdafInputOffsets = aggInfos.f0; + PythonFunctionInfo[] pythonFunctionInfos = aggInfos.f1; + OneInputStreamOperator<RowData, RowData> pythonOperator = + getPythonGroupWindowAggregateFunctionOperator( + config, + inputRowType, + outputRowType, + maxLimitSize, + windowSize, + slideSize, + namePropertyTypeArray, + pythonUdafInputOffsets, + pythonFunctionInfos); + return new OneInputTransformation<>( + inputTransform, + getDesc(), + pythonOperator, + InternalTypeInfo.of(outputRowType), + inputTransform.getParallelism()); + } + + @SuppressWarnings("unchecked") + private OneInputStreamOperator<RowData, RowData> getPythonGroupWindowAggregateFunctionOperator( + Configuration config, + RowType inputRowType, + RowType outputRowType, + int maxLimitSize, + long windowSize, + long slideSize, + int[] namePropertyTypeArray, + int[] udafInputOffsets, + PythonFunctionInfo[] pythonFunctionInfos) { + Class<?> clazz = + CommonPythonUtil.loadClass( + ARROW_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME); + try { + Constructor<?> ctor = + clazz.getConstructor( + Configuration.class, + PythonFunctionInfo[].class, + RowType.class, + RowType.class, + int.class, + int.class, + long.class, + long.class, + int[].class, + int[].class, + int[].class, + int[].class); + return (OneInputStreamOperator<RowData, RowData>) + ctor.newInstance( + config, + pythonFunctionInfos, + inputRowType, + outputRowType, + inputTimeFieldIndex, + maxLimitSize, + windowSize, + slideSize, + namePropertyTypeArray, + grouping, + groupingSet, + udafInputOffsets); + } catch (NoSuchMethodException + | InstantiationException + | IllegalAccessException + | InvocationTargetException e) { + throw new TableException( + "Python BatchArrowPythonGroupWindowAggregateFunctionOperator constructed failed.", + e); + } + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java new file mode 100644 index 0000000..dae2eb2 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.ManagedMemoryUseCase; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.expressions.FieldReferenceExpression; +import org.apache.flink.table.expressions.ValueLiteralExpression; +import org.apache.flink.table.functions.python.PythonFunctionInfo; +import org.apache.flink.table.planner.calcite.FlinkRelBuilder; +import org.apache.flink.table.planner.calcite.FlinkTypeFactory; +import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.expressions.PlannerProctimeAttribute; +import org.apache.flink.table.planner.expressions.PlannerRowtimeAttribute; +import org.apache.flink.table.planner.expressions.PlannerWindowEnd; +import org.apache.flink.table.planner.expressions.PlannerWindowProperty; +import org.apache.flink.table.planner.expressions.PlannerWindowStart; +import org.apache.flink.table.planner.plan.logical.LogicalWindow; +import org.apache.flink.table.planner.plan.logical.SlidingGroupWindow; +import org.apache.flink.table.planner.plan.logical.TumblingGroupWindow; +import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecPythonAggregate; +import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil; +import org.apache.flink.table.planner.plan.utils.KeySelectorUtil; +import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy; +import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; +import org.apache.flink.table.runtime.operators.window.assigners.CountSlidingWindowAssigner; +import org.apache.flink.table.runtime.operators.window.assigners.CountTumblingWindowAssigner; +import org.apache.flink.table.runtime.operators.window.assigners.SlidingWindowAssigner; +import org.apache.flink.table.runtime.operators.window.assigners.TumblingWindowAssigner; +import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner; +import org.apache.flink.table.runtime.operators.window.triggers.ElementTriggers; +import org.apache.flink.table.runtime.operators.window.triggers.EventTimeTriggers; +import org.apache.flink.table.runtime.operators.window.triggers.ProcessingTimeTriggers; +import org.apache.flink.table.runtime.operators.window.triggers.Trigger; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.calcite.rel.core.AggregateCall; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.Arrays; + +import static org.apache.flink.table.planner.plan.utils.AggregateUtil.hasRowIntervalType; +import static org.apache.flink.table.planner.plan.utils.AggregateUtil.hasTimeIntervalType; +import static org.apache.flink.table.planner.plan.utils.AggregateUtil.isProctimeAttribute; +import static org.apache.flink.table.planner.plan.utils.AggregateUtil.isRowtimeAttribute; +import static org.apache.flink.table.planner.plan.utils.AggregateUtil.timeFieldIndex; +import static org.apache.flink.table.planner.plan.utils.AggregateUtil.toDuration; +import static org.apache.flink.table.planner.plan.utils.AggregateUtil.toLong; + +/** Stream [[ExecNode]] for group widow aggregate (Python user defined aggregate function). */ +public class StreamExecPythonGroupWindowAggregate extends CommonExecPythonAggregate + implements StreamExecNode<RowData> { + private static final Logger LOGGER = + LoggerFactory.getLogger(StreamExecPythonGroupWindowAggregate.class); + + private static final String ARROW_STREAM_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME = + "org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream." + + "StreamArrowPythonGroupWindowAggregateFunctionOperator"; + + private final int[] grouping; + private final AggregateCall[] aggCalls; + private final LogicalWindow window; + private final FlinkRelBuilder.PlannerNamedWindowProperty[] namedWindowProperties; + private final WindowEmitStrategy emitStrategy; + + public StreamExecPythonGroupWindowAggregate( + int[] grouping, + AggregateCall[] aggCalls, + LogicalWindow window, + FlinkRelBuilder.PlannerNamedWindowProperty[] namedWindowProperties, + WindowEmitStrategy emitStrategy, + ExecEdge inputEdge, + RowType outputType, + String description) { + super(inputEdge, outputType, description); + this.grouping = grouping; + this.aggCalls = aggCalls; + this.window = window; + this.namedWindowProperties = namedWindowProperties; + this.emitStrategy = emitStrategy; + } + + @Override + protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) { + final boolean isCountWindow; + if (window instanceof TumblingGroupWindow) { + isCountWindow = hasRowIntervalType(((TumblingGroupWindow) window).size()); + } else if (window instanceof SlidingGroupWindow) { + isCountWindow = hasRowIntervalType(((SlidingGroupWindow) window).size()); + } else { + isCountWindow = false; + } + + final TableConfig tableConfig = planner.getTableConfig(); + if (isCountWindow + && grouping.length > 0 + && tableConfig.getMinIdleStateRetentionTime() < 0) { + LOGGER.warn( + "No state retention interval configured for a query which accumulates state." + + " Please provide a query configuration with valid retention interval to" + + " prevent excessive state size. You may specify a retention time of 0 to" + + " not clean up the state."); + } + + final ExecNode<RowData> inputNode = (ExecNode<RowData>) getInputNodes().get(0); + final Transformation<RowData> inputTransform = inputNode.translateToPlan(planner); + final RowType inputRowType = (RowType) inputNode.getOutputType(); + final RowType outputRowType = InternalTypeInfo.of(getOutputType()).toRowType(); + + final int inputTimeFieldIndex; + if (isRowtimeAttribute(window.timeAttribute())) { + inputTimeFieldIndex = + timeFieldIndex( + FlinkTypeFactory.INSTANCE().buildRelNodeRowType(inputRowType), + planner.getRelBuilder(), + window.timeAttribute()); + if (inputTimeFieldIndex < 0) { + throw new TableException( + "Group window must defined on a time attribute, " + + "but the time attribute can't be found.\n" + + "This should never happen. Please file an issue."); + } + } else { + inputTimeFieldIndex = -1; + } + Tuple2<WindowAssigner<?>, Trigger<?>> windowAssignerAndTrigger = + generateWindowAssignerAndTrigger(); + WindowAssigner<?> windowAssigner = windowAssignerAndTrigger.f0; + Trigger<?> trigger = windowAssignerAndTrigger.f1; + Configuration config = CommonPythonUtil.getMergedConfig(planner.getExecEnv(), tableConfig); + OneInputTransformation<RowData, RowData> transform = + createPythonStreamWindowGroupOneInputTransformation( + inputTransform, + inputRowType, + outputRowType, + inputTimeFieldIndex, + windowAssigner, + trigger, + emitStrategy.getAllowLateness(), + config); + if (inputsContainSingleton()) { + transform.setParallelism(1); + transform.setMaxParallelism(1); + } + + if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(config)) { + transform.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON); + } + // set KeyType and Selector for state + final RowDataKeySelector selector = + KeySelectorUtil.getRowDataSelector(grouping, InternalTypeInfo.of(inputRowType)); + transform.setStateKeySelector(selector); + transform.setStateKeyType(selector.getProducedType()); + return transform; + } + + private Tuple2<WindowAssigner<?>, Trigger<?>> generateWindowAssignerAndTrigger() { + WindowAssigner<?> windowAssiger; + Trigger<?> trigger; + if (window instanceof TumblingGroupWindow) { + TumblingGroupWindow tumblingWindow = (TumblingGroupWindow) window; + FieldReferenceExpression timeField = tumblingWindow.timeField(); + ValueLiteralExpression size = tumblingWindow.size(); + if (isProctimeAttribute(timeField) && hasTimeIntervalType(size)) { + windowAssiger = TumblingWindowAssigner.of(toDuration(size)).withProcessingTime(); + trigger = ProcessingTimeTriggers.afterEndOfWindow(); + } else if (isRowtimeAttribute(timeField) && hasTimeIntervalType(size)) { + windowAssiger = TumblingWindowAssigner.of(toDuration(size)).withEventTime(); + trigger = EventTimeTriggers.afterEndOfWindow(); + } else if (isProctimeAttribute(timeField) && hasRowIntervalType(size)) { + windowAssiger = CountTumblingWindowAssigner.of(toLong(size)); + trigger = ElementTriggers.count(toLong(size)); + } else { + // TODO: EventTimeTumblingGroupWindow should sort the stream on event time + // before applying the windowing logic. Otherwise, this would be the same as a + // ProcessingTimeTumblingGroupWindow + throw new UnsupportedOperationException( + "Event-time grouping windows on row intervals are currently not supported."); + } + } else if (window instanceof SlidingGroupWindow) { + SlidingGroupWindow slidingWindow = (SlidingGroupWindow) window; + FieldReferenceExpression timeField = slidingWindow.timeField(); + ValueLiteralExpression size = slidingWindow.size(); + ValueLiteralExpression slide = slidingWindow.slide(); + if (isProctimeAttribute(timeField) && hasTimeIntervalType(size)) { + windowAssiger = SlidingWindowAssigner.of(toDuration(size), toDuration(slide)); + trigger = ProcessingTimeTriggers.afterEndOfWindow(); + } else if (isRowtimeAttribute(timeField) && hasTimeIntervalType(size)) { + windowAssiger = SlidingWindowAssigner.of(toDuration(size), toDuration(slide)); + trigger = EventTimeTriggers.afterEndOfWindow(); + } else if (isProctimeAttribute(timeField) && hasRowIntervalType(size)) { + windowAssiger = CountSlidingWindowAssigner.of(toLong(size), toLong(slide)); + trigger = ElementTriggers.count(toLong(size)); + } else { + // TODO: EventTimeTumblingGroupWindow should sort the stream on event time + // before applying the windowing logic. Otherwise, this would be the same as a + // ProcessingTimeTumblingGroupWindow + throw new UnsupportedOperationException( + "Event-time grouping windows on row intervals are currently not supported."); + } + } else { + throw new TableException("Unsupported window: " + window.toString()); + } + return Tuple2.of(windowAssiger, trigger); + } + + private OneInputTransformation<RowData, RowData> + createPythonStreamWindowGroupOneInputTransformation( + Transformation<RowData> inputTransform, + RowType inputRowType, + RowType outputRowType, + int inputTimeFieldIndex, + WindowAssigner<?> windowAssigner, + Trigger<?> trigger, + long allowance, + Configuration config) { + int[] namePropertyTypeArray = + Arrays.stream(namedWindowProperties) + .mapToInt( + p -> { + PlannerWindowProperty property = p.property(); + if (property instanceof PlannerWindowStart) { + return 0; + } + if (property instanceof PlannerWindowEnd) { + return 1; + } + if (property instanceof PlannerRowtimeAttribute) { + return 2; + } + if (property instanceof PlannerProctimeAttribute) { + return 3; + } + throw new TableException("Unexpected property " + property); + }) + .toArray(); + + Tuple2<int[], PythonFunctionInfo[]> aggInfos = + extractPythonAggregateFunctionInfosFromAggregateCall(aggCalls); + int[] pythonUdafInputOffsets = aggInfos.f0; + PythonFunctionInfo[] pythonFunctionInfos = aggInfos.f1; + OneInputStreamOperator<RowData, RowData> pythonOperator = + getPythonStreamGroupWindowAggregateFunctionOperator( + config, + inputRowType, + outputRowType, + windowAssigner, + trigger, + allowance, + inputTimeFieldIndex, + namePropertyTypeArray, + pythonUdafInputOffsets, + pythonFunctionInfos); + return new OneInputTransformation<>( + inputTransform, + getDesc(), + pythonOperator, + InternalTypeInfo.of(outputRowType), + inputTransform.getParallelism()); + } + + @SuppressWarnings("unchecked") + private OneInputStreamOperator<RowData, RowData> + getPythonStreamGroupWindowAggregateFunctionOperator( + Configuration config, + RowType inputRowType, + RowType outputRowType, + WindowAssigner<?> windowAssigner, + Trigger<?> trigger, + long allowance, + int inputTimeFieldIndex, + int[] namedProperties, + int[] udafInputOffsets, + PythonFunctionInfo[] pythonFunctionInfos) { + Class clazz = + CommonPythonUtil.loadClass( + ARROW_STREAM_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME); + try { + Constructor<OneInputStreamOperator<RowData, RowData>> ctor = + clazz.getConstructor( + Configuration.class, + PythonFunctionInfo[].class, + RowType.class, + RowType.class, + int.class, + WindowAssigner.class, + Trigger.class, + long.class, + int[].class, + int[].class, + int[].class); + return ctor.newInstance( + config, + pythonFunctionInfos, + inputRowType, + outputRowType, + inputTimeFieldIndex, + windowAssigner, + trigger, + allowance, + namedProperties, + grouping, + udafInputOffsets); + } catch (NoSuchMethodException + | IllegalAccessException + | InstantiationException + | InvocationTargetException e) { + throw new TableException( + "Python StreamArrowPythonGroupWindowAggregateFunctionOperator constructed failed.", + e); + } + } +} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.scala deleted file mode 100644 index 35f450c..0000000 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.scala +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.nodes.exec.batch - -import org.apache.flink.api.dag.Transformation -import org.apache.flink.configuration.Configuration -import org.apache.flink.core.memory.ManagedMemoryUseCase -import org.apache.flink.streaming.api.operators.OneInputStreamOperator -import org.apache.flink.streaming.api.transformations.OneInputTransformation -import org.apache.flink.table.api.config.ExecutionConfigOptions -import org.apache.flink.table.data.RowData -import org.apache.flink.table.functions.python.PythonFunctionInfo -import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty -import org.apache.flink.table.planner.codegen.agg.batch.WindowCodeGenerator -import org.apache.flink.table.planner.delegation.PlannerBase -import org.apache.flink.table.planner.expressions.{PlannerRowtimeAttribute, PlannerWindowEnd, PlannerWindowStart} -import org.apache.flink.table.planner.plan.logical.LogicalWindow -import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecPythonGroupWindowAggregate.ARROW_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME -import org.apache.flink.table.planner.plan.nodes.exec.common.CommonPythonAggregate -import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode, ExecNodeBase} -import org.apache.flink.table.runtime.typeutils.InternalTypeInfo -import org.apache.flink.table.types.logical.RowType - -import org.apache.calcite.rel.core.AggregateCall - -import java.util.Collections - -/** - * Batch [[ExecNode]] for group widow aggregate (Python user defined aggregate function). - * - * <p>Note: This class can't be ported to Java, - * because java class can't extend scala interface with default implementation. - * FLINK-20858 will port this class to Java. - */ -class BatchExecPythonGroupWindowAggregate( - grouping: Array[Int], - auxGrouping: Array[Int], - aggCalls: Array[AggregateCall], - window: LogicalWindow, - inputTimeFieldIndex: Int, - namedWindowProperties: Array[PlannerNamedWindowProperty], - inputEdge: ExecEdge, - outputType: RowType, - description: String) - extends ExecNodeBase[RowData](Collections.singletonList(inputEdge), outputType, description) - with BatchExecNode[RowData] - with CommonPythonAggregate { - - override protected def translateToPlanInternal(planner: PlannerBase): Transformation[RowData] = { - val inputNode = getInputNodes.get(0).asInstanceOf[ExecNode[RowData]] - val inputTransform = inputNode.translateToPlan(planner) - - val windowSizeAndSlideSize = WindowCodeGenerator.getWindowDef(window) - - val groupBufferLimitSize = planner.getTableConfig.getConfiguration.getInteger( - ExecutionConfigOptions.TABLE_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT) - - val config = getConfig(planner.getExecEnv, planner.getTableConfig) - val transform = createPythonOneInputTransformation( - inputTransform, - inputNode.getOutputType.asInstanceOf[RowType], - outputType, - inputTimeFieldIndex, - groupBufferLimitSize, - windowSizeAndSlideSize.f0, - windowSizeAndSlideSize.f1, - config) - - if (isPythonWorkerUsingManagedMemory(config)) { - transform.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON) - } - transform - } - - private[this] def createPythonOneInputTransformation( - inputTransform: Transformation[RowData], - inputRowType: RowType, - outputRowType: RowType, - inputTimeFieldIndex: Int, - maxLimitSize: Int, - windowSize: Long, - slideSize: Long, - config: Configuration): OneInputTransformation[RowData, RowData] = { - val namePropertyTypeArray = namedWindowProperties.map { - case PlannerNamedWindowProperty(_, p) => p match { - case PlannerWindowStart(_) => 0 - case PlannerWindowEnd(_) => 1 - case PlannerRowtimeAttribute(_) => 2 - } - }.toArray - - val (pythonUdafInputOffsets, pythonFunctionInfos) = - extractPythonAggregateFunctionInfosFromAggregateCall(aggCalls) - - val pythonOperator = getPythonGroupWindowAggregateFunctionOperator( - config, - inputRowType, - outputRowType, - inputTimeFieldIndex, - maxLimitSize, - windowSize, - slideSize, - namePropertyTypeArray, - pythonUdafInputOffsets, - pythonFunctionInfos) - - new OneInputTransformation( - inputTransform, - "BatchExecPythonGroupWindowAggregate", - pythonOperator, - InternalTypeInfo.of(outputRowType), - inputTransform.getParallelism) - } - - private[this] def getPythonGroupWindowAggregateFunctionOperator( - config: Configuration, - inputRowType: RowType, - outputRowType: RowType, - inputTimeFieldIndex: Int, - maxLimitSize: Int, - windowSize: Long, - slideSize: Long, - namedWindowProperties: Array[Int], - udafInputOffsets: Array[Int], - pythonFunctionInfos: Array[PythonFunctionInfo]): OneInputStreamOperator[RowData, RowData] = { - val clazz = loadClass(ARROW_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME) - - val ctor = clazz.getConstructor( - classOf[Configuration], - classOf[Array[PythonFunctionInfo]], - classOf[RowType], - classOf[RowType], - classOf[Int], - classOf[Int], - classOf[Long], - classOf[Long], - classOf[Array[Int]], - classOf[Array[Int]], - classOf[Array[Int]], - classOf[Array[Int]]) - - ctor.newInstance( - config, - pythonFunctionInfos, - inputRowType, - outputRowType, - Integer.valueOf(inputTimeFieldIndex), - Integer.valueOf(maxLimitSize), - java.lang.Long.valueOf(windowSize), - java.lang.Long.valueOf(slideSize), - namedWindowProperties, - grouping, - grouping ++ auxGrouping, - udafInputOffsets) - .asInstanceOf[OneInputStreamOperator[RowData, RowData]] - } -} - -object BatchExecPythonGroupWindowAggregate { - val ARROW_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME: String = - "org.apache.flink.table.runtime.operators.python.aggregate.arrow.batch." + - "BatchArrowPythonGroupWindowAggregateFunctionOperator" -} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.scala deleted file mode 100644 index 9f08aaa..0000000 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.scala +++ /dev/null @@ -1,270 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.nodes.exec.stream - -import org.apache.flink.api.dag.Transformation -import org.apache.flink.configuration.Configuration -import org.apache.flink.core.memory.ManagedMemoryUseCase -import org.apache.flink.streaming.api.operators.OneInputStreamOperator -import org.apache.flink.streaming.api.transformations.OneInputTransformation -import org.apache.flink.table.api.TableException -import org.apache.flink.table.data.RowData -import org.apache.flink.table.functions.python.PythonFunctionInfo -import org.apache.flink.table.planner.calcite.FlinkRelBuilder.PlannerNamedWindowProperty -import org.apache.flink.table.planner.calcite.FlinkTypeFactory -import org.apache.flink.table.planner.delegation.PlannerBase -import org.apache.flink.table.planner.expressions.{PlannerProctimeAttribute, PlannerRowtimeAttribute, PlannerWindowEnd, PlannerWindowStart} -import org.apache.flink.table.planner.plan.logical.{LogicalWindow, SlidingGroupWindow, TumblingGroupWindow} -import org.apache.flink.table.planner.plan.nodes.exec.common.CommonPythonAggregate -import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecPythonGroupWindowAggregate.ARROW_STREAM_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME -import org.apache.flink.table.planner.plan.nodes.exec.{ExecEdge, ExecNode, ExecNodeBase} -import org.apache.flink.table.planner.plan.utils.AggregateUtil.{hasRowIntervalType, hasTimeIntervalType, isProctimeAttribute, isRowtimeAttribute, timeFieldIndex, toDuration, toLong} -import org.apache.flink.table.planner.plan.utils.{KeySelectorUtil, WindowEmitStrategy} -import org.apache.flink.table.planner.utils.Logging -import org.apache.flink.table.runtime.operators.window.assigners.{CountSlidingWindowAssigner, CountTumblingWindowAssigner, SlidingWindowAssigner, TumblingWindowAssigner, WindowAssigner} -import org.apache.flink.table.runtime.operators.window.triggers.{ElementTriggers, EventTimeTriggers, ProcessingTimeTriggers, Trigger} -import org.apache.flink.table.runtime.typeutils.InternalTypeInfo -import org.apache.flink.table.types.logical.RowType - -import org.apache.calcite.rel.core.AggregateCall - -import java.util.Collections - -/** - * Stream [[ExecNode]] for group widow aggregate (Python user defined aggregate function). - * - * <p>Note: This class can't be ported to Java, - * because java class can't extend scala interface with default implementation. - * FLINK-20858 will port this class to Java. - */ -class StreamExecPythonGroupWindowAggregate( - grouping: Array[Int], - aggCalls: Array[AggregateCall], - window: LogicalWindow, - namedWindowProperties: Array[PlannerNamedWindowProperty], - emitStrategy: WindowEmitStrategy, - inputEdge: ExecEdge, - outputType: RowType, - description: String) - extends ExecNodeBase[RowData](Collections.singletonList(inputEdge), outputType, description) - with StreamExecNode[RowData] - with CommonPythonAggregate - with Logging { - - override def translateToPlanInternal(planner: PlannerBase): Transformation[RowData] = { - val isCountWindow = window match { - case TumblingGroupWindow(_, _, size) if hasRowIntervalType(size) => true - case SlidingGroupWindow(_, _, size, _) if hasRowIntervalType(size) => true - case _ => false - } - - val config = planner.getTableConfig - if (isCountWindow && grouping.length > 0 && config.getMinIdleStateRetentionTime < 0) { - LOG.warn( - "No state retention interval configured for a query which accumulates state. " + - "Please provide a query configuration with valid retention interval to prevent " + - "excessive state size. You may specify a retention time of 0 to not clean up the state.") - } - - val inputNode = getInputNodes.get(0).asInstanceOf[ExecNode[RowData]] - val inputRowType = inputNode.getOutputType.asInstanceOf[RowType] - - val inputTimeFieldIndex = if (isRowtimeAttribute(window.timeAttribute)) { - val timeIndex = timeFieldIndex( - FlinkTypeFactory.INSTANCE.buildRelNodeRowType(inputRowType), - planner.getRelBuilder, - window.timeAttribute) - if (timeIndex < 0) { - throw new TableException( - s"Group window PythonAggregate must defined on a time attribute, " + - "but the time attribute can't be found.\n" + - "This should never happen. Please file an issue.") - } - timeIndex - } else { - -1 - } - - val inputTransform = inputNode.translateToPlan(planner) - val (windowAssigner, trigger) = generateWindowAssignerAndTrigger() - val mergedConfig = getConfig(planner.getExecEnv, planner.getTableConfig) - val transform = createPythonStreamWindowGroupOneInputTransformation( - inputTransform, - inputNode.getOutputType.asInstanceOf[RowType], - outputType, - inputTimeFieldIndex, - windowAssigner, - trigger, - emitStrategy.getAllowLateness, - mergedConfig) - - if (inputsContainSingleton()) { - transform.setParallelism(1) - transform.setMaxParallelism(1) - } - - // set KeyType and Selector for state - val inputRowTypeInfo = inputTransform.getOutputType.asInstanceOf[InternalTypeInfo[RowData]] - val selector = KeySelectorUtil.getRowDataSelector(grouping, inputRowTypeInfo) - transform.setStateKeySelector(selector) - transform.setStateKeyType(selector.getProducedType) - - if (isPythonWorkerUsingManagedMemory(mergedConfig)) { - transform.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON) - } - transform - } - - private[this] def generateWindowAssignerAndTrigger(): (WindowAssigner[_], Trigger[_]) = { - window match { - case TumblingGroupWindow(_, timeField, size) - if isProctimeAttribute(timeField) && hasTimeIntervalType(size) => - (TumblingWindowAssigner.of(toDuration(size)).withProcessingTime(), - ProcessingTimeTriggers.afterEndOfWindow()) - - case TumblingGroupWindow(_, timeField, size) - if isRowtimeAttribute(timeField) && hasTimeIntervalType(size) => - (TumblingWindowAssigner.of(toDuration(size)).withEventTime(), - EventTimeTriggers.afterEndOfWindow()) - - case TumblingGroupWindow(_, timeField, size) - if isProctimeAttribute(timeField) && hasRowIntervalType(size) => - (CountTumblingWindowAssigner.of(toLong(size)), - ElementTriggers.count(toLong(size))) - - case TumblingGroupWindow(_, _, _) => - // TODO: EventTimeTumblingGroupWindow should sort the stream on event time - // before applying the windowing logic. Otherwise, this would be the same as a - // ProcessingTimeTumblingGroupWindow - throw new UnsupportedOperationException( - "Event-time grouping windows on row intervals are currently not supported.") - - case SlidingGroupWindow(_, timeField, size, slide) - if isProctimeAttribute(timeField) && hasTimeIntervalType(size) => - (SlidingWindowAssigner.of(toDuration(size), toDuration(slide)), - ProcessingTimeTriggers.afterEndOfWindow()) - - case SlidingGroupWindow(_, timeField, size, slide) - if isRowtimeAttribute(timeField) && hasTimeIntervalType(size) => - (SlidingWindowAssigner.of(toDuration(size), toDuration(slide)), - EventTimeTriggers.afterEndOfWindow()) - - case SlidingGroupWindow(_, timeField, size, slide) - if isProctimeAttribute(timeField) && hasRowIntervalType(size) => - (CountSlidingWindowAssigner.of(toLong(size), toLong(slide)), - ElementTriggers.count(toLong(size))) - - case SlidingGroupWindow(_, _, _, _) => - // TODO: EventTimeTumblingGroupWindow should sort the stream on event time - // before applying the windowing logic. Otherwise, this would be the same as a - // ProcessingTimeTumblingGroupWindow - throw new UnsupportedOperationException( - "Event-time grouping windows on row intervals are currently not supported.") - } - } - - private[this] def createPythonStreamWindowGroupOneInputTransformation( - inputTransform: Transformation[RowData], - inputRowType: RowType, - outputRowType: RowType, - inputTimeFieldIndex: Int, - windowAssigner: WindowAssigner[_], - trigger: Trigger[_], - allowance: Long, - config: Configuration): OneInputTransformation[RowData, RowData] = { - - val namePropertyTypeArray = namedWindowProperties - .map { - case PlannerNamedWindowProperty(_, p) => p match { - case PlannerWindowStart(_) => 0 - case PlannerWindowEnd(_) => 1 - case PlannerRowtimeAttribute(_) => 2 - case PlannerProctimeAttribute(_) => 3 - } - }.toArray - - val (pythonUdafInputOffsets, pythonFunctionInfos) = - extractPythonAggregateFunctionInfosFromAggregateCall(aggCalls) - val pythonOperator = getPythonStreamGroupWindowAggregateFunctionOperator( - config, - inputRowType, - outputRowType, - windowAssigner, - trigger, - allowance, - inputTimeFieldIndex, - namePropertyTypeArray, - pythonUdafInputOffsets, - pythonFunctionInfos) - - new OneInputTransformation( - inputTransform, - "StreamExecPythonGroupWindowAggregate", - pythonOperator, - InternalTypeInfo.of(outputRowType), - inputTransform.getParallelism) - } - - private[this] def getPythonStreamGroupWindowAggregateFunctionOperator( - config: Configuration, - inputRowType: RowType, - outputRowType: RowType, - windowAssigner: WindowAssigner[_], - trigger: Trigger[_], - allowance: Long, - inputTimeFieldIndex: Int, - namedProperties: Array[Int], - udafInputOffsets: Array[Int], - pythonFunctionInfos: Array[PythonFunctionInfo]): OneInputStreamOperator[RowData, RowData] = { - val clazz = loadClass(ARROW_STREAM_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME) - - val ctor = clazz.getConstructor( - classOf[Configuration], - classOf[Array[PythonFunctionInfo]], - classOf[RowType], - classOf[RowType], - classOf[Int], - classOf[WindowAssigner[_]], - classOf[Trigger[_]], - classOf[Long], - classOf[Array[Int]], - classOf[Array[Int]], - classOf[Array[Int]]) - - ctor.newInstance( - config, - pythonFunctionInfos, - inputRowType, - outputRowType, - Integer.valueOf(inputTimeFieldIndex), - windowAssigner, - trigger, - java.lang.Long.valueOf(allowance), - namedProperties, - grouping, - udafInputOffsets) - .asInstanceOf[OneInputStreamOperator[RowData, RowData]] - } -} - -object StreamExecPythonGroupWindowAggregate { - val ARROW_STREAM_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME: String = - "org.apache.flink.table.runtime.operators.python.aggregate.arrow.stream." + - "StreamArrowPythonGroupWindowAggregateFunctionOperator" -} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonGroupWindowAggregate.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonGroupWindowAggregate.scala index a6e8c21..2ae4e47 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonGroupWindowAggregate.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalPythonGroupWindowAggregate.scala @@ -100,7 +100,7 @@ class BatchPhysicalPythonGroupWindowAggregate( override def translateToExecNode(): ExecNode[_] = { new BatchExecPythonGroupWindowAggregate( grouping, - auxGrouping, + grouping ++ auxGrouping, aggCalls.toArray, window, inputTimeFieldIndex,