This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch native in repository https://gitbox.apache.org/repos/asf/flink.git
commit f0d96eb3f94bb41dd3dd8e35d0d6aaab7ca69670 Author: Yun Gao <gaoyunhen...@gmail.com> AuthorDate: Sun May 28 10:28:32 2023 +0800 Tmp save --- .../natives/StreamExecColumnToRowExchange.java | 8 +- .../NativeRowColumnTranslationProcessor.java | 348 ++++++++++++++++++++- .../stream/StreamExecGlobalWindowAggregate.java | 219 ++++++++++++- .../exec/stream/StreamExecGroupAggregate.java | 171 +++++++++- .../StreamExecIncrementalGroupAggregate.java | 181 +++++++++-- .../plan/nodes/exec/stream/StreamExecJoin.java | 115 ++++++- .../exec/stream/StreamExecLocalGroupAggregate.java | 177 ++++++++++- .../stream/StreamExecLocalWindowAggregate.java | 231 +++++++++++++- .../exec/stream/StreamExecWindowAggregate.java | 231 +++++++++++++- .../AbstractExecNodeExactlyOnceVisitor.java | 2 +- 10 files changed, 1567 insertions(+), 116 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/natives/StreamExecColumnToRowExchange.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/natives/StreamExecColumnToRowExchange.java index 3e4f253d786..a0ae1e2802e 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/natives/StreamExecColumnToRowExchange.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/natives/StreamExecColumnToRowExchange.java @@ -30,11 +30,7 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.table.api.TableException; import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.delegation.PlannerBase; -import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata; -import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import org.apache.flink.table.planner.plan.nodes.exec.*; import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecExchange; import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange; import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode; @@ -57,7 +53,7 @@ import static org.apache.flink.runtime.state.KeyGroupRangeAssignment.DEFAULT_LOW minPlanVersion = FlinkVersion.v1_15, minStateVersion = FlinkVersion.v1_15) public class StreamExecColumnToRowExchange extends CommonExecExchange - implements StreamExecNode<RowData> { + implements StreamExecNode<RowData>, NativeSupportedExec { public static final String EXCHANGE_TRANSFORMATION = "exchange"; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/NativeRowColumnTranslationProcessor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/NativeRowColumnTranslationProcessor.java index 40f8b6e6f9f..e6f21b360d4 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/NativeRowColumnTranslationProcessor.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/NativeRowColumnTranslationProcessor.java @@ -19,26 +19,348 @@ package org.apache.flink.table.planner.plan.nodes.exec.processor; import org.apache.flink.configuration.ReadableConfig; - import org.apache.flink.streaming.api.transformations.StreamExchangeMode; -import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; +import org.apache.flink.table.planner.plan.nodes.exec.*; +import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange; +import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecUnion; +import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecExchange; +import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecUnion; +import org.apache.flink.table.planner.plan.nodes.exec.natives.StreamExecColumnToColumnExchange; +import org.apache.flink.table.planner.plan.nodes.exec.natives.StreamExecColumnToRowExchange; +import org.apache.flink.table.planner.plan.nodes.exec.natives.StreamExecRowToColumnExchange; +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange; +import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecUnion; +import org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor; +import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph; -import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import static org.apache.flink.util.Preconditions.checkArgument; -import org.apache.flink.table.planner.plan.nodes.exec.NativeSupportedExec; +public class NativeRowColumnTranslationProcessor implements ExecNodeGraphProcessor { -import org.apache.flink.table. planner.plan.nodes.exec.batch.BatchExecUnion; -import org.apache.flink.table.planner.plan. nodes.exec.natives.StreamExecColumnToColumnExchange; -import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange; -import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecExchange; -import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecUnion; -import org.apache.flink.table.planner.plan.nodes.exec.natives. StreamExecColumnToColumnExchange; + private final ReadableConfig tableConfig; + + public NativeRowColumnTranslationProcessor(ReadableConfig tableConfig) { + this.tableConfig = tableConfig; + } + + @Override + public ExecNodeGraph process(ExecNodeGraph execGraph, ProcessorContext context) { + System.out.println(execGraph.getRootNodes()); + ReadableConfig config = context.getPlanner().getTableConfig(); + + Map<ExecNode<?>, RowOrColumnarEdge> reusableRowOrColumnarEdges = new HashMap<>(); + Map<ExecNode<?>, RowOrColumnarNode> reusableRowOrColumnarNodes = new HashMap<>(); + AbstractExecNodeExactlyOnceVisitor visitor = + new AbstractExecNodeExactlyOnceVisitor() { + @Override + protected void visitNode(ExecNode<?> node) { + if (node instanceof CommonExecExchange || node instanceof CommonExecUnion) { + visitInputs(node); + return; + } + + boolean acceptColumnar = supportsNative(node, config); + getInputEdgeRegion( + node, reusableRowOrColumnarEdges, reusableRowOrColumnarNodes); + for (int i = 0; i < node.getInputEdges().size(); i++) { + ExecEdge edge = node.getInputEdges().get(i); + ExecEdge newEdge = + duplicateAndConnectEdges( + acceptColumnar, + node, + edge, + reusableRowOrColumnarEdges, + reusableRowOrColumnarNodes, + config); + + node.replaceInputEdge(i, newEdge); + } + visitInputs(node); + } + }; + execGraph.getRootNodes().forEach(visitor::visit); + + return execGraph; + } + + private ExecEdge duplicateAndConnectEdges( + boolean isColumnar, + ExecNode<?> target, + ExecEdge edge, + Map<ExecNode<?>, RowOrColumnarEdge> edges, + Map<ExecNode<?>, RowOrColumnarNode> inputs, + ReadableConfig config) { + + ExecNode<?> source = edge.getSource(); + + boolean found = false; + ExecNode<?> duplicated = null; + + if (edges.containsKey(source)) { + RowOrColumnarEdge rowOrCol = edges.get(source); + ExecNode<?> origin = rowOrCol.originEdge(); + + duplicated = isColumnar ? rowOrCol.asColumnarEdge() : rowOrCol.asRowEdge(); + List<ExecEdge> newEdges = new ArrayList<>(); + for (ExecEdge inputEdge : origin.getInputEdges()) { + newEdges.add( + duplicateAndConnectEdges( + isColumnar, duplicated, inputEdge, edges, inputs, config)); + } + + duplicated.setInputEdges(newEdges); + found = true; + } + if (!found + && !inputs.containsKey(source) + && source instanceof CommonExecExchange + && source.getInputEdges().size() == 1) { + source = source.getInputEdges().get(0).getSource(); + } + + if (!found && inputs.containsKey(source)) { + RowOrColumnarNode rowOrCol = inputs.get(source); + + duplicated = isColumnar ? rowOrCol.asColumnarNode(config) : rowOrCol.asRowNode(config); + found = true; + } + if (!found) { + + throw new IllegalArgumentException(); + } + + return new ExecEdge(duplicated, target, edge.getShuffle(), edge.getExchangeMode()); + } + + private void getInputEdgeRegion( + ExecNode<?> node, + Map<ExecNode<?>, RowOrColumnarEdge> reusableRowOrColumnarEdges, + Map<ExecNode<?>, RowOrColumnarNode> reusableRowOrColumnarNodes) { + for (ExecEdge edge : node.getInputEdges()) { + ExecNode<?> source = edge.getSource(); + if (lisEdge(source)) { + if (node instanceof CommonExecExchange) { + // exchange node is swollen by the node + reusableRowOrColumnarNodes.putIfAbsent( + source, new RowOrColumnarNode(source, node)); + continue; + + } else { + reusableRowOrColumnarNodes.putIfAbsent( + source, new RowOrColumnarNode(source, null)); + } + + } else { + getInputEdgeRegion(source, reusableRowOrColumnarEdges, reusableRowOrColumnarNodes); + } + + if (isEdge(node)) { + reusableRowOrColumnarEdges.put(node, new RowOrColumnarEdge(node)); + } + } + } + + private static boolean isEdge(ExecNode<?> node) { + return node instanceof CommonExecExchange || node instanceof CommonExecUnion; + } + + private class RowOrColumnarEdge { + private final ExecNode<?> edge; + private final boolean isExchange; + + private ExecNode<?> dupRowEdge; + + private ExecNode<?> dupColEdge; + + RowOrColumnarEdge(ExecNode<?> edge) { + checkArgument(isEdge(edge)); + this.edge = edge; + this.isExchange = edge instanceof CommonExecExchange; + } + + ExecNode<?> originEdge() { + return edge; + } + + ExecNode<?> asColumnarEdge() { + if (dupColEdge == null) { + dupColEdge = + lisExchange + ? duplicateEdge() + : new StreamExecColumnToColumnExchange( + tableConfig, + edge.getInputProperty(0), + (RowType) edge.getOutputType(), + "ColumnToColumn" + edge.getDescription()); + } + return dupColEdge; + } + + ExecNode<?> asRowEdge() { + if (dupRowEdge == null) { + dupColEdge = duplicateEdge(); + } + return dupColEdge; + } + + private ExecNode<?> duplicateEdge() { + if (edge instanceof BatchExecUnion) { + + BatchExecUnion union = (BatchExecUnion) edge; + BatchExecUnion newEdge = + new BatchExecUnion( + tableConfig, + union.getInputProperties(), + (RowType) union.getOutputType(), + union.getDescription()); + + newEdge.setInputEdges(new ArrayList<>(union.getInputEdges())); + return newEdge; + } + if (edge instanceof StreamExecUnion) { + StreamExecUnion union = (StreamExecUnion) edge; + StreamExecUnion newEdge = + new StreamExecUnion( + tableConfig, + union.getInputProperties(), + (RowType) union.getOutputType(), + union.getDescription()); + + newEdge.setInputEdges(new ArrayList<>(union.getInputEdges())); + return newEdge; + } + if (edge instanceof BatchExecExchange) { + BatchExecExchange exchange = (BatchExecExchange) edge; + BatchExecExchange newEdge = + new BatchExecExchange( + tableConfig, + exchange.getInputProperties().get(0), + (RowType) exchange.getOutputType(), + exchange.getDescription()); + + newEdge.setInputEdges(new ArrayList<>(exchange.getInputEdges())); + return newEdge; + } + + if (edge instanceof StreamExecExchange) { + StreamExecExchange exchange = (StreamExecExchange) edge; + StreamExecExchange newEdge = + new StreamExecExchange( + tableConfig, + exchange.getInputProperties().get(0), + (RowType) exchange.getOutputType(), + exchange.getDescription()); + + newEdge.setInputEdges(new ArrayList<>(exchange.getInputEdges())); + return newEdge; + } + throw new IllegalArgumentException(); + } + } + + private class RowOrColumnarNode { + private final ExecNode<?> node; + private final ExecNode<?> exchange; + + private ExecNode<?> rowNode; + private ExecNode<?> colNode; + + RowOrColumnarNode(ExecNode<?> node, ExecNode<?> exchange) { + this.node = node; + this.exchange = exchange; + checkArgument( + !isEdge(node) && (exchange == null || exchange instanceof CommonExecExchange)); + } + + ExecNode<?> asColumnarNode(ReadableConfig config) { + if (colNode == null) { + if (supportsNative(node, config)) { + if (exchange == null) { + colNode = node; + } else { + new StreamExecColumnToColumnExchange( + tableConfig, + exchange.getInputProperty(0), + (RowType) exchange.getOutputType(), + "ColumnToColumn" + exchange.getDescription()); + colNode.setInputEdges( + Collections.singletonList( + new ExecEdge( + node, + colNode, + ExecEdge.FORWARD_SHUFFLE, + StreamExchangeMode.PIPELINED))); + } + } else { + colNode = + exchange == null + ? new StreamExecRowToColumnExchange( + tableConfig, + InputProperty.builder() + .requiredDistribution( + KEEP_INPUT_AS_IS_DISTRIBUTION) + .build(), + (RowType) node.getOutputType(), + "Inserted Row to Column") + : new StreamExecRowToColumnExchange( + tableConfig, + exchange.getInputProperty(0), + (RowType) exchange.getOutputType(), + "RowToColumn" + exchange.getDescription()); -public class NativeRowColumnTranslationProcessor { + colNode.setInputEdges( + Collections.singletonList( + new ExecEdge( + node, + colNode, + ExecEdge.FORWARD_SHUFFLE, + StreamExchangeMode.PIPELINED))); + } + } + return colNode; + } + ExecNode<?> asRowNode(ReadableConfig config) { + if (rowNode == null) { + if (!supportsNative(node, config)) { + rowNode = exchange == null ? node : exchange; + } else { + rowNode = + exchange == null + ? new StreamExecColumnToRowExchange( + tableConfig, + InputProperty.builder() + .requiredDistribution( + KEEP_INPUT_AS_IS_DISTRIBUTION) + .build(), + (RowType) node.getOutputType(), + "Inserted Column to Row") + : new StreamExecColumnToRowExchange( + tableConfig, + exchange.getInputProperty(0), + (RowType) exchange.getOutputType(), + "ColumnToRow" + exchange.getDescription()); + rowNode.setInputEdges( + Collections.singletonList( + new ExecEdge( + node, + rowNode, + ExecEdge.FORWARD_SHUFFLE, + StreamExchangeMode.PIPELINED))); + } + } + return rowNode; + } + } + private static boolean supportsNative(ExecNode<?> exec, ReadableConfig config) { + return exec instanceof NativeSupportedExec + && ((NativeSupportedExec) exec).canTranslateNative(config); + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java index 9b23711724a..bc792099078 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java @@ -21,6 +21,10 @@ package org.apache.flink.table.planner.plan.nodes.exec.stream; import org.apache.flink.FlinkVersion; import org.apache.flink.api.dag.Transformation; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.streaming.api.natives.types.NativeMemoryRegion; +import org.apache.flink.streaming.api.natives.types.NativeMemoryRegionTypeInfo; +import org.apache.flink.streaming.api.natives.types.NativeType; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.transformations.OneInputTransformation; @@ -28,17 +32,16 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.codegen.CodeGeneratorContext; import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator; import org.apache.flink.table.planner.delegation.PlannerBase; -import org.apache.flink.table.planner.plan.logical.WindowingStrategy; +import org.apache.flink.table.planner.plan.logical.*; import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata; import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonNativeUtil; import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; -import org.apache.flink.table.planner.plan.utils.AggregateInfoList; -import org.apache.flink.table.planner.plan.utils.AggregateUtil; -import org.apache.flink.table.planner.plan.utils.KeySelectorUtil; +import org.apache.flink.table.planner.plan.utils.*; import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; import org.apache.flink.table.planner.utils.TableConfigUtils; import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction; @@ -46,6 +49,7 @@ import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty; import org.apache.flink.table.runtime.groupwindow.WindowProperty; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; import org.apache.flink.table.runtime.operators.aggregate.window.SlicingWindowAggOperatorBuilder; +import org.apache.flink.table.runtime.operators.natives.*; import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer; @@ -61,10 +65,15 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro import org.apache.calcite.rel.core.AggregateCall; import org.apache.calcite.tools.RelBuilder; +import javax.annotation.Nullable; +import javax.sound.sampled.UnsupportedAudioFileException; import java.time.ZoneId; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -150,6 +159,11 @@ public class StreamExecGlobalWindowAggregate extends StreamExecWindowAggregateBa @Override protected Transformation<RowData> translateToPlanInternal( PlannerBase planner, ExecNodeConfig config) { + Transformation<RowData> nativePlan = translateToNativePlan(planner, config); + if (nativePlan != null) { + return nativePlan; + } + final ExecEdge inputEdge = getInputEdges().get(0); final Transformation<RowData> inputTransform = (Transformation<RowData>) inputEdge.translateToPlan(planner); @@ -292,4 +306,201 @@ public class StreamExecGlobalWindowAggregate extends StreamExecWindowAggregateBa sliceAssigner, shifTimeZone); } + + @SuppressWarnings("unchecked") + @Nullable + private Transformation<RowData> translateToNativePlan( + PlannerBase planner, ExecNodeConfig config) { + final ZoneId shiftTimeZone = + TimeWindowUtil.getShiftTimeZone( + windowing.getTimeAttributeType(), + TableConfigUtils.getLocalTimeZone(config)); + + // TODO support other window types + if (!(windowing.getWindow() instanceof TumblingWindowSpec)) { + return null; + } + + NativeWindowAggInfo.WindowStrategy windowStrategy; + int timeFieldIndex; + + int sliceEndIndex = -1; + int windowEndIndex = -1; + + WindowSpec windowSpec = windowing.getWindow(); + + if (windowing instanceof WindowAttachedWindowingStrategy) { + windowEndIndex = ((WindowAttachedWindowingStrategy) windowing).getWindowEnd(); + timeFieldIndex = Integer.MAX_VALUE; + windowStrategy = NativeWindowAggInfo.WindowStrategy.WINDOW_ATTACHED; + } else if (windowing instanceof SliceAttachedWindowingStrategy) { + sliceEndIndex = ((SliceAttachedWindowingStrategy) windowing).getSliceEnd(); + timeFieldIndex = Integer.MAX_VALUE; + windowStrategy = NativeWindowAggInfo.WindowStrategy.SLICE_ATTACHED; + } else if (windowing instanceof TimeAttributeWindowingStrategy) { + if (windowing.isRowtime()) { + timeFieldIndex = + ((TimeAttributeWindowingStrategy) windowing).getTimeAttributeIndex(); + } else { + + timeFieldIndex = -1; + } + windowStrategy = NativeWindowAggInfo.WindowStrategy.TIME_ATTRIBUTE; + } else { + throw new UnsupportedOperationException(windowing + " is not supported yet."); + } + + final ExecEdge inputEdge = getInputEdges().get(0); + + Transformation<NativeMemoryRegion> columnarInput = + (Transformation<NativeMemoryRegion>) inputEdge.translateToPlanNative(planner); + + final RowType inputRowType = (RowType) inputEdge.getOutputType(); + + if (!CommonNativeUtil.isSupportedDataType(inputRowType.getChildren())) { + return null; + } + + final AggregateInfoList aggInfoList = getStateBackedAggInfoList(); + + if (!CommonNativeUtil.isSupportedAggregation(aggInfoList)) { + return null; + } + + int numDistinct = aggInfoList.distinctInfos().length; + int numAggregations = aggInfoList.aggInfos().length; + + int[][] aggInputindex = new int[numAggregations][]; + int baseOffset = grouping.length; + int index = 0; + for (AggregateInfo info : aggInfoList.aggInfos()) { + int[] inputindex = new int[info.externalAccTypes().length]; + aggInputindex[index++] = inputIndex; + for (int i = 0; i < info.externalAccTypes().length; ++i) { + inputIndex[i] = baseOffset++; + } + } + int distinctStateOffset = baseOffset; + + int[] distinctIndex = new int[numAggregations]; + int[] filterIndex = new int[numAggregations]; + Arrays.fill(distinctIndex, -1); + Arrays.fill(filterIndex, -1); + + NativeDistinctInfo[] distinctInfos = new NativeDistinctInfo[numDistinct]; + index = 0; + for (DistinctInfo info : aggInfoList.distinctInfos()) { + ArrayList<Integer> filters = new ArrayList<>(info.filterArgs().length()); + info.filterArgs().foreach(filter -> filters.add((Integer) filter)); + + ArrayList<Integer> aggIds = new ArrayList<>(info.aggIndexes().length()); + int finalIndex = index; + + AtomicInteger filter = new AtomicInteger(0); + info.aggIndexes() + .foreach( + aggId -> { + distinctIndex[(int) aggId] = finalIndex; + filterIndex[(int) aggId] = filters.get(filter.getAndIncrement()); + aggIds.add((Integer) aggId); + aggIds.add((Integer) aggId); + return true; + }); + + List<LogicalType> logicalKeyTypes = + info.keyType().getChildren().isEmpty() + ? Collections.singletonList(info.keyType().getLogicalType()) + : info.keyType().getChildren().stream() + .map(DataType::getLogicalType) + .collect(Collectors.toList()); + if (!CommonNativeUtil.isSupportedDataType(logicalKeyTypes)) { + return null; + } + + NativeType[] keyTypes = CommonNativeUtil.getNativeDataType(logicalKeyTypes); + NativeDistinctInfo distinctInfo = + new NativeDistinctInfo( + info.consumeRetraction(), + distinctStateOffset++, + keyTypes, + info.argIndexes(), + aggIds.stream().mapToInt(aggId -> aggId).toArray(), + filters.stream().mapToInt(aggId -> aggId).toArray()); + distinctInfos[index++] = distinctInfo; + } + + NativeAggFunctionInfo[] aggFunctions = new NativeAggFunctionInfo[numAggregations]; + index = 0; + + for (AggregateInfo info : aggInfoList.aggInfos()) { + NativeAggFunctionInfo function = + new NativeAggFunctionInfo( + info.consumeRetraction(), + distinctIndex[info.aggIndex()], + info.aggIndex(), + aggInputIndex[index++], + distinctIndex[info.aggIndex()] >= 0 + ? filterIndex[info.aggIndex()] + : info.agg().filterArg, + CommonNativeUtil.getNativeAggFunctionType(info), + NativeTypeUtils.toNativeType( + info.externalResultType().getLogicalType())); + aggFunctions[info.aggIndex()] = function; + } + + NativeGroupAggInfo aggInfo = + new NativeGroupAggInfo( + false, + false, + aggInfoList.countStarInserted(), + aggInfoList.getIndexOfCountStar(), + config.get(TaskManagerOptions.TASK_NATIVE_BATCH_SIZE), + config.get(TaskManagerOptions.TASK_NATIVE_FINAL_AGG_BUFFER_SIZE), + config.get(TaskManagerOptions.TASK_NATIVE_AGG_DISTINCT_TABLE_INITIAL_SIZE), + config.get(TaskManagerOptions.TASK_NATIVE_AGG_RESULT_TABLE_INITIAL_SIZE), + config.get(TaskManagerOptions.TASK_NATIVE_AGG_STRING_SET_INITIAL_SIZE), + new int[0], + grouping, + CommonNativeUtil.getNativeDataType(inputRowType.getChildren()), + aggFunctions, + distinctInfos); + CommonNativeUtil.validateGroupAggInfo(aggInfo); + + OneInputStreamOperator<NativeMemoryRegion, NativeMemoryRegion> operator; + if (windowSpec instanceof TumblingWindowSpec) { + long offset = 0; + long size = ((TumblingWindowSpec) windowSpec).getSize().toMillis(); + if (((TumblingWindowSpec) windowSpec).getOffset() != null) { + offset = ((TumblingWindowSpec) windowSpec).getOffset().toMillis(); + } + NativeWindowAggInfo windowInfo = + new NativeWindowAggInfo( + timeFieldIndex, + sliceEndIndex, + windowEndIndex, + size, + offset, + windowStrategy, + shiftTimeZone.getId(), + aggInfo); + operator = new NativeTumbleGlobalWindowOperator(windowInfo); + } else { + throw new UnsupportedOperationException(windowSpec + " is not supported yet"); + } + + final OneInputTransformation<NativeMemoryRegion, NativeMemoryRegion> transform = + ExecNodeUtil.createOneInputTransformation( + columnarInput, + createTransformationMeta(GLOBAL_WINDOW_AGGREGATE_TRANSFORMATION, config), + operator, + new NativeMemoryRegionTypeInfo( + NativeTypeUtils.toNativeType(getOutputType())), + columnarInput.getParallelism()); + RowDataKeySelector selector = + KeySelectorUtil.getRowDataSelector(grouping, InternalTypeInfo.of(inputRowType)); + // set KeyType and Selector for state + transform.setStateKeySelector(null); + transform.setStateKeyType(selector.getProducedType()); + return (OneInputTransformation) transform; + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java index cc2b87976e1..85f530aac66 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java @@ -18,9 +18,16 @@ package org.apache.flink.table.planner.plan.nodes.exec.stream; +import org.apache.calcite.rel.core.AggregateCall; import org.apache.flink.FlinkVersion; import org.apache.flink.api.dag.Transformation; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.streaming.api.natives.types.NativeMemoryRegion; +import org.apache.flink.streaming.api.natives.types.NativeMemoryRegionTypeInfo; +import org.apache.flink.streaming.api.natives.types.NativeType; import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.transformations.OneInputTransformation; @@ -30,16 +37,10 @@ import org.apache.flink.table.planner.codegen.CodeGeneratorContext; import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator; import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator; import org.apache.flink.table.planner.delegation.PlannerBase; -import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata; -import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import org.apache.flink.table.planner.plan.nodes.exec.*; +import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonNativeUtil; import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; -import org.apache.flink.table.planner.plan.utils.AggregateInfoList; -import org.apache.flink.table.planner.plan.utils.AggregateUtil; -import org.apache.flink.table.planner.plan.utils.KeySelectorUtil; +import org.apache.flink.table.planner.plan.utils.*; import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction; import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; @@ -47,21 +48,22 @@ import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; import org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction; import org.apache.flink.table.runtime.operators.aggregate.MiniBatchGroupAggFunction; import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator; +import org.apache.flink.table.runtime.operators.natives.*; import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; - -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; - -import org.apache.calcite.rel.core.AggregateCall; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -78,7 +80,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; producedTransformations = StreamExecGroupAggregate.GROUP_AGGREGATE_TRANSFORMATION, minPlanVersion = FlinkVersion.v1_15, minStateVersion = FlinkVersion.v1_15) -public class StreamExecGroupAggregate extends StreamExecAggregateBase { +public class StreamExecGroupAggregate extends StreamExecAggregateBase + implements NativeSupportedExec { private static final Logger LOG = LoggerFactory.getLogger(StreamExecGroupAggregate.class); @@ -159,6 +162,11 @@ public class StreamExecGroupAggregate extends StreamExecAggregateBase { + "state size. You may specify a retention time of 0 to not clean up the state."); } + Transformation<RowData> nativePlan = translateToNativePlan(planner, config); + if (nativePlan != null) { + return nativePlan; + } + final ExecEdge inputEdge = getInputEdges().get(0); final Transformation<RowData> inputTransform = (Transformation<RowData>) inputEdge.translateToPlan(planner); @@ -256,4 +264,137 @@ public class StreamExecGroupAggregate extends StreamExecAggregateBase { return transform; } + + @SuppressWarnings("unchecked") + @Nullable + private Transformation<RowData> translateToNativePlan( + PlannerBase planner, ExecNodeConfig config) { + final ExecEdge inputEdge = getInputEdges().get(0); + Transformation<NativeMemoryRegion> columnarInput = + (Transformation<NativeMemoryRegion>) inputEdge.translateToPlanNative(planner); + + final RowType inputRowType = (RowType) inputEdge.getOutputType(); + if (CommonNativeUtil.isSupportedDataType(inputRowType.getChildren())) { + return null; + } + + final AggregateInfoList aggInfoList = + AggregateUtil.transformToStreamAggregateInfoList( + inputRowType, + JavaScalaConversionUtil.toScala(Arrays.asList(aggCalls)), + aggCallNeedRetractions, + needRetraction, + true, + true); + + if (!CommonNativeUtil.isSupportedAggregation(aggInfoList)) { + return null; + } + + int numDistinctions = aggInfoList.distinctInfos().length; + int numAggregations = aggInfoList.aggInfos().length; + int[] distinctIndex = new int[numAggregations]; + + int[] filterIndex = new int[numAggregations]; + + Arrays.fill(distinctIndex, -1); + Arrays.fill(filterIndex, -1); + + NativeDistinctInfo[] distinctInfos = new NativeDistinctInfo[numDistinctions]; + int index = 0; + + for (DistinctInfo info : aggInfoList.distinctInfos()) { + ArrayList<Integer> filters = new ArrayList<>(info.filterArgs().length()); + info.filterArgs().foreach(filter -> filters.add((Integer) filter)); + + ArrayList<Integer> aggIds = new ArrayList<>(info.aggIndexes().length()); + int finalIndex = index; + AtomicInteger filter = new AtomicInteger(0); + info.aggIndexes() + .foreach( + aggId -> { + distinctIndex[(int) aggId] = finalIndex; + filterIndex[(int) aggId] = filters.get(filter.getAndIncrement()); + + aggIds.add((Integer) aggId); + return true; + }); + + List<LogicalType> logicalKeyTypes = + info.keyType().getChildren().isEmpty() + ? Collections.singletonList(info.keyType().getLogicalType()) + : info.keyType().getChildren().stream() + .map(DataType::getLogicalType) + .collect(Collectors.toList()); + if (!CommonNativeUtil.isSupportedDataType(logicalKeyTypes)) { + return null; + } + NativeType[] keyTypes = CommonNativeUtil.getNativeDataType(logicalKeyTypes); + NativeDistinctInfo distinctInfo = + new NativeDistinctInfo( + info.consumeRetraction(), + -1, + keyTypes, + info.argIndexes(), + aggIds.stream().mapToInt(aggId -> aggId).toArray(), + filters.stream().mapToInt(aggId -> aggId).toArray()); + distinctInfos[index++] = distinctInfo; + } + + NativeAggFunctionInfo[] aggFunctions = new NativeAggFunctionInfo[numAggregations]; + for (AggregateInfo info : aggInfoList.aggInfos()) { + NativeAggFunctionInfo function = + new NativeAggFunctionInfo( + info.consumeRetraction(), + distinctIndex[info.aggIndex()], + info.aggIndex(), + info.argIndexes(), + distinctIndex[info.aggIndex()] >= 0 + ? filterIndex[info.aggIndex()] + : info.agg().filterArg, + CommonNativeUtil.getNativeAggFunctionType(info), + NativeTypeUtils.toNativeType( + info.externalResultType().getLogicalType())); + + aggFunctions[info.aggIndex()] = function; + } + + NativeGroupAggInfo aggInfo = + new NativeGroupAggInfo( + needRetraction, + generateUpdateBefore, + aggInfoList.countStarInserted(), + aggInfoList.getIndexOfCountStar(), + config.get(TaskManagerOptions.TASK_NATIVE_BATCH_SIZE), + config.get(TaskManagerOptions.TASK_NATIVE_FINAL_AGG_BUFFER_SIZE), + config.get(TaskManagerOptions.TASK_NATIVE_AGG_DISTINCT_TABLE_INITIAL_SIZE), + config.get(TaskManagerOptions.TASK_NATIVE_AGG_RESULT_TABLE_INITIAL_SIZE), + config.get(TaskManagerOptions.TASK_NATIVE_AGG_STRING_SET_INITIAL_SIZE), + new int[0], + grouping, + CommonNativeUtil.getNativeDataType(inputRowType.getChildren()), + aggFunctions, + distinctInfos); + CommonNativeUtil.validateGroupAggInfo(aggInfo); + + OneInputStreamOperator<NativeMemoryRegion, NativeMemoryRegion> operator = + new NativeGroupAggOperator(aggInfo); + + // partitioned aggregation + final OneInputTransformation<NativeMemoryRegion, NativeMemoryRegion> transform = + ExecNodeUtil.createOneInputTransformation( + columnarInput, + createTransformationMeta(GROUP_AGGREGATE_TRANSFORMATION, config), + operator, + new NativeMemoryRegionTypeInfo( + NativeTypeUtils.toNativeType((RowType) getOutputType())), + columnarInput.getParallelism()); + + // set KeyType and Selector for state + final RowDataKeySelector selector = + KeySelectorUtil.getRowDataSelector(grouping, InternalTypeInfo.of(inputRowType)); + transform.setStateKeySelector(null); + transform.setStateKeyType(selector.getProducedType()); + return (OneInputTransformation) transform; + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java index 3477010cd59..608ef4d2bc7 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java @@ -18,43 +18,44 @@ package org.apache.flink.table.planner.plan.nodes.exec.stream; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.tools.RelBuilder; import org.apache.flink.FlinkVersion; import org.apache.flink.api.dag.Transformation; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.streaming.api.natives.types.NativeMemoryRegion; +import org.apache.flink.streaming.api.natives.types.NativeMemoryRegionTypeInfo; +import org.apache.flink.streaming.api.natives.types.NativeType; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.codegen.CodeGeneratorContext; import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator; import org.apache.flink.table.planner.delegation.PlannerBase; -import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata; -import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import org.apache.flink.table.planner.plan.nodes.exec.*; +import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonNativeUtil; import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; -import org.apache.flink.table.planner.plan.utils.AggregateInfoList; -import org.apache.flink.table.planner.plan.utils.AggregateUtil; -import org.apache.flink.table.planner.plan.utils.KeySelectorUtil; +import org.apache.flink.table.planner.plan.utils.*; import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; import org.apache.flink.table.runtime.operators.aggregate.MiniBatchIncrementalGroupAggFunction; import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator; +import org.apache.flink.table.runtime.operators.natives.*; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; - -import org.apache.calcite.rel.core.AggregateCall; -import org.apache.calcite.tools.RelBuilder; - +import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -68,7 +69,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; StreamExecIncrementalGroupAggregate.INCREMENTAL_GROUP_AGGREGATE_TRANSFORMATION, minPlanVersion = FlinkVersion.v1_15, minStateVersion = FlinkVersion.v1_15) -public class StreamExecIncrementalGroupAggregate extends StreamExecAggregateBase { +public class StreamExecIncrementalGroupAggregate extends StreamExecAggregateBase + implements NativeSupportedExec { public static final String INCREMENTAL_GROUP_AGGREGATE_TRANSFORMATION = "incremental-group-aggregate"; @@ -164,8 +166,6 @@ public class StreamExecIncrementalGroupAggregate extends StreamExecAggregateBase protected Transformation<RowData> translateToPlanInternal( PlannerBase planner, ExecNodeConfig config) { final ExecEdge inputEdge = getInputEdges().get(0); - final Transformation<RowData> inputTransform = - (Transformation<RowData>) inputEdge.translateToPlan(planner); final AggregateInfoList partialLocalAggInfoList = AggregateUtil.createPartialAggInfoList( @@ -176,6 +176,16 @@ public class StreamExecIncrementalGroupAggregate extends StreamExecAggregateBase partialAggNeedRetraction, false); + Transformation<RowData> nativePlan = + translateToNativePlan( + planner, inputTypeInfo.toRowType(), partialLocalAggInfoList, config); + if (nativePlan == null) { + return nativePlan; + } + + final Transformation<RowData> inputTransform = + (Transformation<RowData>) inputEdge.translateToPlan(planner); + final GeneratedAggsHandleFunction partialAggsHandler = generateAggsHandler( "PartialGroupAggsHandler", @@ -267,4 +277,139 @@ public class StreamExecIncrementalGroupAggregate extends StreamExecAggregateBase .needMerge(mergedAccOffset, true, mergedAccExternalTypes) .generateAggsHandler(name, aggInfoList); } + + @Nullable + private Transformation<RowData> translateToNativePlan( + PlannerBase planner, + RowType inputRowType, + AggregateInfoList aggInfoList, + ExecNodeConfig config) { + final ExecEdge inputEdge = getInputEdges().get(0); + Transformation<NativeMemoryRegion> columnarInput = + (Transformation<NativeMemoryRegion>) inputEdge.translateToPlanNative(planner); + if (!CommonNativeUtil.isSupportedDataType(inputRowType.getChildren()) + || CommonNativeUtil.isSupportedAggregation(aggInfoList)) { + return null; + } + int numDistinct = aggInfoList.distinctInfos().length; + int numAggregations = aggInfoList.aggInfos().length; + + int[][] aggInputIndex = new int[numAggregations][]; + int baseOffset = partialAggGrouping.length; + int index = 0; + for (AggregateInfo info : aggInfoList.aggInfos()) { + int[] inputIndex = new int[info.externalAccTypes().length]; + aggInputIndex[index++] = inputIndex; + for (int i = 0; i < info.externalAccTypes().length; ++i) { + inputIndex[i] = baseOffset++; + } + } + int distinctStateOffset = baseOffset; + + int[] distinctIndex = new int[numAggregations]; + Arrays.fill(distinctIndex, -1); + int[] filterIndex = new int[numAggregations]; + Arrays.fill(filterIndex, -1); + + NativeDistinctInfo[] distinctInfos = new NativeDistinctInfo[numDistinct]; + index = 0; + for (DistinctInfo info : aggInfoList.distinctInfos()) { + + ArrayList<Integer> filters = new ArrayList<>(info.filterArgs().length()); + info.filterArgs().foreach(filter -> filters.add((Integer) filter)); + + ArrayList<Integer> aggIds = new ArrayList<>(info.aggIndexes().length()); + int finalIndex = index; + + AtomicInteger filter = new AtomicInteger(0); + info.aggIndexes() + .foreach( + aggId -> { + distinctIndex[(int) aggId] = finalIndex; + filterIndex[(int) aggId] = filters.get(filter.getAndIncrement()); + aggIds.add((Integer) aggId); + return true; + }); + if (!info.keyType().getChildren().isEmpty()) { + return null; + } + List<LogicalType> logicalKeyTypes = + Collections.singletonList(info.keyType().getLogicalType()); + if (!CommonNativeUtil.isSupportedDataType(logicalKeyTypes)) { + return null; + } + NativeType[] keyTypes = CommonNativeUtil.getNativeDataType(logicalKeyTypes); + NativeDistinctInfo distinctInfo = + new NativeDistinctInfo( + info.consumeRetraction(), + distinctStateOffset++, + keyTypes, + info.argIndexes(), + aggIds.stream().mapToInt(aggId -> aggId).toArray(), + filters.stream().mapToInt(aggId -> aggId).toArray()); + distinctInfos[index++] = distinctInfo; + } + + NativeAggFunctionInfo[] aggFunctions = new NativeAggFunctionInfo[numAggregations]; + index = 0; + + for (AggregateInfo info : aggInfoList.aggInfos()) { + NativeAggFunctionInfo function = + new NativeAggFunctionInfo( + info.consumeRetraction(), + distinctIndex[info.aggIndex()], + info.aggIndex(), + aggInputIndex[index++], + distinctIndex[info.aggIndex()] >= 0 + ? filterIndex[info.aggIndex()] + : info.agg().filterArg, + CommonNativeUtil.getNativeAggFunctionType(info), + NativeTypeUtils.toNativeType( + info.externalResultType().getLogicalType())); + aggFunctions[info.aggIndex()] = function; + NativeGroupAggInfo aggInfo = + new NativeGroupAggInfo( + partialAggNeedRetraction, + false, + aggInfoList.countStarInserted(), + aggInfoList.getIndexOfCountStar(), + config.get(TaskManagerOptions.TASK_NATIVE_BATCH_SIZE), + config.get(TaskManagerOptions.TASK_NATIVE_INCREMENTAL_AGG_BUFFER_SIZE), + config.get( + TaskManagerOptions.TASK_NATIVE_AGG_DISTINCT_TABLE_INITIAL_SIZE), + config.get( + TaskManagerOptions.TASK_NATIVE_AGG_RESULT_TABLE_INITIAL_SIZE), + config.get(TaskManagerOptions.TASK_NATIVE_AGG_STRING_SET_INITIAL_SIZE), + partialAggGrouping, + finalAggGrouping, + CommonNativeUtil.getNativeDataType(inputRowType.getChildren()), + aggFunctions, + distinctInfos); + CommonNativeUtil.validateGroupAggInfo(aggInfo); + OneInputStreamOperator<NativeMemoryRegion, NativeMemoryRegion> operator = + new NativeIncGroupAggOperator(aggInfo); + + // partitioned aggregation + final OneInputTransformation<NativeMemoryRegion, NativeMemoryRegion> transform = + ExecNodeUtil.createOneInputTransformation( + columnarInput, + createTransformationMeta( + INCREMENTAL_GROUP_AGGREGATE_TRANSFORMATION, config), + operator, + new NativeMemoryRegionTypeInfo( + NativeTypeUtils.toNativeType((RowType) getOutputType())), + columnarInput.getParallelism(), + 0); + + // set KeyType and Selector for state + final RowDataKeySelector selector = + KeySelectorUtil.getRowDataSelector( + partialAggGrouping, InternalTypeInfo.of(inputEdge.getOutputType())); + + transform.setStateKeySelector(null); + transform.setStateKeyType(selector.getProducedType()); + + return (OneInputTransformation) transform; + } + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java index dbf399d8ae5..739581f0061 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java @@ -19,20 +19,24 @@ package org.apache.flink.table.planner.plan.nodes.exec.stream; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; import org.apache.flink.FlinkVersion; import org.apache.flink.api.dag.Transformation; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.streaming.api.natives.types.NativeMemoryRegion; +import org.apache.flink.streaming.api.natives.types.NativeMemoryRegionTypeInfo; +import org.apache.flink.streaming.api.natives.types.NativeType; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.transformations.TwoInputTransformation; import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.delegation.PlannerBase; -import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata; -import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; -import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator; +import org.apache.flink.table.planner.plan.nodes.exec.*; import org.apache.flink.table.planner.plan.nodes.exec.spec.JoinSpec; +import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonNativeUtil; import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; import org.apache.flink.table.planner.plan.utils.JoinUtil; import org.apache.flink.table.planner.plan.utils.KeySelectorUtil; @@ -43,6 +47,9 @@ import org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoi import org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator; import org.apache.flink.table.runtime.operators.join.stream.StreamingSemiAntiJoinOperator; import org.apache.flink.table.runtime.operators.join.stream.state.JoinInputSideSpec; +import org.apache.flink.table.runtime.operators.natives.NativeJoinInfo; +import org.apache.flink.table.runtime.operators.natives.NativeStreamingJoinOperator; +import org.apache.flink.table.runtime.operators.natives.NativeTypeUtils; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; @@ -51,7 +58,9 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCre import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import java.util.ArrayList; import java.util.List; +import java.util.Optional; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -69,7 +78,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull; minPlanVersion = FlinkVersion.v1_15, minStateVersion = FlinkVersion.v1_15) public class StreamExecJoin extends ExecNodeBase<RowData> - implements StreamExecNode<RowData>, SingleTransformationTranslator<RowData> { + implements StreamExecNode<RowData>, + SingleTransformationTranslator<RowData>, + NativeSupportedExec { public static final String JOIN_TRANSFORMATION = "join"; @@ -131,6 +142,12 @@ public class StreamExecJoin extends ExecNodeBase<RowData> @SuppressWarnings("unchecked") protected Transformation<RowData> translateToPlanInternal( PlannerBase planner, ExecNodeConfig config) { + + Transformation<RowData> nativePlan = translateToNativePlan(planner, config); + if (nativePlan != null) { + return nativePlan; + } + final ExecEdge leftInputEdge = getInputEdges().get(0); final ExecEdge rightInputEdge = getInputEdges().get(1); @@ -223,4 +240,86 @@ public class StreamExecJoin extends ExecNodeBase<RowData> transform.setStateKeyType(leftSelect.getProducedType()); return transform; } + + private Transformation<RowData> translateToNativePlan( + PlannerBase planner, ExecNodeConfig config) { + final ExecEdge leftInputEdge = getInputEdges().get(0); + final ExecEdge rightInputEdge = getInputEdges().get(1); + + final Transformation<NativeMemoryRegion> leftColumnarInput = + (Transformation<NativeMemoryRegion>) leftInputEdge.translateToPlanNative(planner); + final Transformation<NativeMemoryRegion> rightColumnarInput = + (Transformation<NativeMemoryRegion>) rightInputEdge.translateToPlanNative(planner); + + final RowType leftType = (RowType) leftInputEdge.getOutputType(); + final RowType rightType = (RowType) rightInputEdge.getOutputType(); + JoinUtil.validateJoinSpec(joinSpec, leftType, rightType, true); + + final int[] leftJoinKey = joinSpec.getLeftKeys(); + final int[] rightJoinKey = joinSpec.getRightKeys(); + + NativeType[] leftKeyTypes = CommonNativeUtil.getNativeDataType(leftType.getChildren()); + NativeType[] rightKeyTypes = CommonNativeUtil.getNativeDataType(rightType.getChildren()); + + Optional<RexNode> condition = joinSpec.getNonEquiCondition(); + NativeJoinInfo joinInfo = + new NativeJoinInfo( + joinSpec.getJoinType(), + config.get(TaskManagerOptions.TASK_NATIVE_BATCH_SIZE), + leftJoinKey, + rightJoinKey, + leftKeyTypes, + rightKeyTypes, + condition.map(this::rexToNativeExpr).orElse(null)); + System.out.println(joinInfo); + + TwoInputStreamOperator<NativeMemoryRegion, NativeMemoryRegion, NativeMemoryRegion> + operator = new NativeStreamingJoinOperator(joinInfo); + + final RowType returnType = (RowType) getOutputType(); + + final TwoInputTransformation<NativeMemoryRegion, NativeMemoryRegion, NativeMemoryRegion> + transform = + ExecNodeUtil.createTwoInputTransformation( + leftColumnarInput, + rightColumnarInput, + createTransformationMeta(JOIN_TRANSFORMATION, config), + operator, + new NativeMemoryRegionTypeInfo( + NativeTypeUtils.toNativeType(returnType)), + leftColumnarInput.getParallelism()); + + final InternalTypeInfo<RowData> leftTypeInfo = InternalTypeInfo.of(leftType); + + // set KeyType and Selector for state + RowDataKeySelector leftSelect = + KeySelectorUtil.getRowDataSelector(joinSpec.getLeftKeys(), leftTypeInfo); + transform.setStateKeySelectors(null, null); + + transform.setStateKeyType(leftSelect.getProducedType()); + return (TwoInputTransformation) transform; + } + + private NativeJoinInfo.Expr rexToNativeExpr(RexNode node) { + if (node instanceof RexInputRef) { + RexInputRef ref = (RexInputRef) node; + RelDataType fieldType = ref.getType(); + return new NativeJoinInfo.FieldAccessExpr( + CommonNativeUtil.toNativeType(fieldType), ref.getIndex()); + } else if (node instanceof RexCall) { + RexCall call = (RexCall) node; + List<NativeJoinInfo.Expr> operandExprs = new ArrayList<>(); + for (RexNode operand : call.getOperands()) { + operandExprs.add(rexToNativeExpr(operand)); + } + + RelDataType resultType = call.getType(); + return new NativeJoinInfo.CallExpr( + CommonNativeUtil.toNativeType(resultType), + call.getOperator().getName(), + operandExprs); + } + + throw new RuntimeException("Unsupported RexNode Type " + node); + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalGroupAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalGroupAggregate.java index 93067254cc7..07e76f3238e 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalGroupAggregate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalGroupAggregate.java @@ -18,39 +18,44 @@ package org.apache.flink.table.planner.plan.nodes.exec.stream; +import org.apache.calcite.rel.core.AggregateCall; import org.apache.flink.FlinkVersion; import org.apache.flink.api.dag.Transformation; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.streaming.api.natives.types.NativeMemoryRegion; +import org.apache.flink.streaming.api.natives.types.NativeMemoryRegionTypeInfo; +import org.apache.flink.streaming.api.natives.types.NativeType; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.codegen.CodeGeneratorContext; import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator; import org.apache.flink.table.planner.delegation.PlannerBase; -import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata; -import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import org.apache.flink.table.planner.plan.nodes.exec.*; +import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonNativeUtil; import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; -import org.apache.flink.table.planner.plan.utils.AggregateInfoList; -import org.apache.flink.table.planner.plan.utils.AggregateUtil; -import org.apache.flink.table.planner.plan.utils.KeySelectorUtil; +import org.apache.flink.table.planner.plan.utils.*; import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; import org.apache.flink.table.runtime.operators.aggregate.MiniBatchLocalGroupAggFunction; import org.apache.flink.table.runtime.operators.bundle.MapBundleOperator; +import org.apache.flink.table.runtime.operators.natives.*; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; - -import org.apache.calcite.rel.core.AggregateCall; - +import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -67,7 +72,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; StreamExecLocalGroupAggregate.LOCAL_GROUP_AGGREGATE_TRANSFORMATION, minPlanVersion = FlinkVersion.v1_15, minStateVersion = FlinkVersion.v1_15) -public class StreamExecLocalGroupAggregate extends StreamExecAggregateBase { +public class StreamExecLocalGroupAggregate extends StreamExecAggregateBase + implements NativeSupportedExec { public static final String LOCAL_GROUP_AGGREGATE_TRANSFORMATION = "local-group-aggregate"; @@ -133,8 +139,6 @@ public class StreamExecLocalGroupAggregate extends StreamExecAggregateBase { protected Transformation<RowData> translateToPlanInternal( PlannerBase planner, ExecNodeConfig config) { final ExecEdge inputEdge = getInputEdges().get(0); - final Transformation<RowData> inputTransform = - (Transformation<RowData>) inputEdge.translateToPlan(planner); final RowType inputRowType = (RowType) inputEdge.getOutputType(); final AggsHandlerCodeGenerator generator = @@ -159,6 +163,15 @@ public class StreamExecLocalGroupAggregate extends StreamExecAggregateBase { needRetraction, false, // isStateBackendDataViews true); // needDistinctInfo + + Transformation<RowData> nativePlan = translateToNativePlan(planner, config, aggInfoList); + if (nativePlan != null) { + return nativePlan; + } + + final Transformation<RowData> inputTransform = + (Transformation<RowData>) inputEdge.translateToPlan(planner); + final GeneratedAggsHandleFunction aggsHandler = generator.generateAggsHandler("GroupAggsHandler", aggInfoList); final MiniBatchLocalGroupAggFunction aggFunction = @@ -181,4 +194,134 @@ public class StreamExecLocalGroupAggregate extends StreamExecAggregateBase { InternalTypeInfo.of(getOutputType()), inputTransform.getParallelism()); } + + @Nullable + private Transformation<RowData> translateToNativePlan( + PlannerBase planner, ExecNodeConfig config, AggregateInfoList aggInfoList) { + final ExecEdge inputEdge = getInputEdges().get(0); + + Transformation<NativeMemoryRegion> columnarInput = + (Transformation<NativeMemoryRegion>) inputEdge.translateToPlanNative(planner); + + // Check if the types are all supported. + final RowType inputRowType = (RowType) inputEdge.getOutputType(); + + if (!CommonNativeUtil.isSupportedDataType(inputRowType.getChildren())) { + return null; + } + + if (!CommonNativeUtil.isSupportedAggregation(aggInfoList)) { + return null; + } + + int numDistinctions = aggInfoList.distinctInfos().length; + int numAggregations = aggInfoList.aggInfos().length; + int[] distinctIndex = new int[numAggregations]; + + int[] filterIndex = new int[numAggregations]; + Arrays.fill(distinctIndex, -1); + + Arrays.fill(filterIndex, -1); + + NativeDistinctInfo[] distinctInfos = new NativeDistinctInfo[numDistinctions]; + int index = 0; + + for (DistinctInfo info : aggInfoList.distinctInfos()) { + ArrayList<Integer> filters = new ArrayList<>(info.filterArgs().length()); + info.filterArgs().foreach(filter -> filters.add((Integer) filter)); + + ArrayList<Integer> aggIds = new ArrayList<>(info.aggIndexes().length()); + int finalIndex = index; + AtomicInteger filter = new AtomicInteger(0); + info.aggIndexes() + .foreach( + aggId -> { + distinctIndex[(int) aggId] = finalIndex; + filterIndex[(int) aggId] = filters.get(filter.getAndIncrement()); + aggIds.add((Integer) aggId); + return true; + }); + + List<LogicalType> logicalKeyTypes = + info.keyType().getChildren().isEmpty() + ? Collections.singletonList(info.keyType().getLogicalType()) + : info.keyType().getChildren().stream() + .map(DataType::getLogicalType) + .collect(Collectors.toList()); + if (!CommonNativeUtil.isSupportedDataType(logicalKeyTypes)) { + return null; + } + + NativeType[] keyTypes = CommonNativeUtil.getNativeDataType(logicalKeyTypes); + NativeDistinctInfo distinctInfo = + new NativeDistinctInfo( + info.consumeRetraction(), + -1, + keyTypes, + info.argIndexes(), + aggIds.stream().mapToInt(aggId -> aggId).toArray(), + filters.stream().mapToInt(aggId -> aggId).toArray()); + distinctInfos[index++] = distinctInfo; + } + + NativeAggFunctionInfo[] aggFunctions = new NativeAggFunctionInfo[numAggregations]; + for (AggregateInfo info : aggInfoList.aggInfos()) { + NativeAggFunctionInfo function = + new NativeAggFunctionInfo( + info.consumeRetraction(), + distinctIndex[info.aggIndex()], + info.aggIndex(), + info.argIndexes(), + distinctIndex[info.aggIndex()] >= 0 + ? filterIndex[info.aggIndex()] + : info.agg().filterArg, + CommonNativeUtil.getNativeAggFunctionType(info), + NativeTypeUtils.toNativeType( + info.externalResultType().getLogicalType())); + aggFunctions[info.aggIndex()] = function; + } + + NativeGroupAggInfo aggInfo = + new NativeGroupAggInfo( + needRetraction, + false, + aggInfoList.countStarInserted(), + aggInfoList.getIndexOfCountStar(), + config.get(TaskManagerOptions.TASK_NATIVE_BATCH_SIZE), + config.get(TaskManagerOptions.TASK_NATIVE_LOCAL_AGG_BUFFER_SIZE), + config.get(TaskManagerOptions.TASK_NATIVE_AGG_DISTINCT_TABLE_INITIAL_SIZE), + config.get(TaskManagerOptions.TASK_NATIVE_AGG_RESULT_TABLE_INITIAL_SIZE), + config.get(TaskManagerOptions.TASK_NATIVE_AGG_STRING_SET_INITIAL_SIZE), + new int[0], + grouping, + CommonNativeUtil.getNativeDataType(inputRowType.getChildren()), + aggFunctions, + distinctInfos); + CommonNativeUtil.validateGroupAggInfo(aggInfo); + + // Let's first still return row-data... + OneInputStreamOperator<NativeMemoryRegion, NativeMemoryRegion> operator = + new NativeLocalGroupAggOperator(aggInfo); + + // partitioned aggregation + final OneInputTransformation<NativeMemoryRegion, NativeMemoryRegion> transform = + ExecNodeUtil.createOneInputTransformation( + columnarInput, + createTransformationMeta(LOCAL_GROUP_AGGREGATE_TRANSFORMATION, config), + operator, + new NativeMemoryRegionTypeInfo( + NativeTypeUtils.toNativeType((RowType) getOutputType())), + columnarInput.getParallelism(), + 0); + + // set KeyType and Selector for state + final RowDataKeySelector selector = + KeySelectorUtil.getRowDataSelector(grouping, InternalTypeInfo.of(inputRowType)); + + transform.setStateKeySelector(null); + transform.setStateKeyType(selector.getProducedType()); + + // Luckily in Java generic type is just a compiler checks. Let's just skip the check... + return (OneInputTransformation) transform; + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java index 69f7c77e580..e24949a5e92 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java @@ -18,26 +18,29 @@ package org.apache.flink.table.planner.plan.nodes.exec.stream; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.tools.RelBuilder; import org.apache.flink.FlinkVersion; import org.apache.flink.api.dag.Transformation; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.streaming.api.natives.types.NativeMemoryRegion; +import org.apache.flink.streaming.api.natives.types.NativeMemoryRegionTypeInfo; +import org.apache.flink.streaming.api.natives.types.NativeType; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.codegen.CodeGeneratorContext; import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator; import org.apache.flink.table.planner.delegation.PlannerBase; -import org.apache.flink.table.planner.plan.logical.WindowingStrategy; -import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata; -import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import org.apache.flink.table.planner.plan.logical.*; +import org.apache.flink.table.planner.plan.nodes.exec.*; +import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonNativeUtil; import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; -import org.apache.flink.table.planner.plan.utils.AggregateInfoList; -import org.apache.flink.table.planner.plan.utils.AggregateUtil; -import org.apache.flink.table.planner.plan.utils.KeySelectorUtil; +import org.apache.flink.table.planner.plan.utils.*; import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; import org.apache.flink.table.planner.utils.TableConfigUtils; import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction; @@ -46,25 +49,25 @@ import org.apache.flink.table.runtime.operators.aggregate.window.LocalSlicingWin import org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer; import org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer; import org.apache.flink.table.runtime.operators.aggregate.window.combines.LocalAggCombiner; +import org.apache.flink.table.runtime.operators.natives.*; import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner; import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.runtime.util.TimeWindowUtil; +import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; - -import org.apache.calcite.rel.core.AggregateCall; -import org.apache.calcite.tools.RelBuilder; - +import javax.annotation.Nullable; import java.time.ZoneId; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -77,7 +80,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; StreamExecLocalWindowAggregate.LOCAL_WINDOW_AGGREGATE_TRANSFORMATION, minPlanVersion = FlinkVersion.v1_15, minStateVersion = FlinkVersion.v1_15) -public class StreamExecLocalWindowAggregate extends StreamExecWindowAggregateBase { +public class StreamExecLocalWindowAggregate extends StreamExecWindowAggregateBase + implements NativeSupportedExec { public static final String LOCAL_WINDOW_AGGREGATE_TRANSFORMATION = "local-window-aggregate"; @@ -136,6 +140,11 @@ public class StreamExecLocalWindowAggregate extends StreamExecWindowAggregateBas @Override protected Transformation<RowData> translateToPlanInternal( PlannerBase planner, ExecNodeConfig config) { + Transformation<RowData> nativePlan = translateToNativePlan(planner, config); + if (nativePlan != null) { + return nativePlan; + } + final ExecEdge inputEdge = getInputEdges().get(0); final Transformation<RowData> inputTransform = (Transformation<RowData>) inputEdge.translateToPlan(planner); @@ -216,4 +225,192 @@ public class StreamExecLocalWindowAggregate extends StreamExecWindowAggregateBas sliceAssigner, shiftTimeZone); } + + @SuppressWarnings("unchecked") + @Nullable + private Transformation<RowData> translateToNativePlan( + PlannerBase planner, ExecNodeConfig config) { + final ZoneId shiftTimeZone = + TimeWindowUtil.getShiftTimeZone( + windowing.getTimeAttributeType(), + TableConfigUtils.getLocalTimeZone(config)); + + // TODO support other window types + + if (!(windowing.getWindow() instanceof TumblingWindowSpec)) { + return null; + } + + NativeWindowAggInfo.WindowStrategy windowStrategy; + int timeFieldIndex; + + int sliceEndIndex = -1; + int windowEndIndex = -1; + + WindowSpec windowSpec = windowing.getWindow(); + + if (windowing instanceof WindowAttachedWindowingStrategy) { + windowEndIndex = ((WindowAttachedWindowingStrategy) windowing).getWindowEnd(); + timeFieldIndex = Integer.MAX_VALUE; + windowStrategy = NativeWindowAggInfo.WindowStrategy.WINDOW_ATTACHED; + } else if (windowing instanceof SliceAttachedWindowingStrategy) { + sliceEndIndex = ((SliceAttachedWindowingStrategy) windowing).getSliceEnd(); + timeFieldIndex = Integer.MAX_VALUE; + + windowStrategy = NativeWindowAggInfo.WindowStrategy.SLICE_ATTACHED; + } else if (windowing instanceof TimeAttributeWindowingStrategy) { + if (windowing.isRowtime()) { + timeFieldIndex = + ((TimeAttributeWindowingStrategy) windowing).getTimeAttributeIndex(); + } else { + timeFieldIndex = -1; + } + windowStrategy = NativeWindowAggInfo.WindowStrategy.TIME_ATTRIBUTE; + } else { + throw new UnsupportedOperationException(windowing + " is not supported yet."); + } + + final ExecEdge inputEdge = getInputEdges().get(0); + Transformation<NativeMemoryRegion> columnarInput = + (Transformation<NativeMemoryRegion>) inputEdge.translateToPlanNative(planner); + final RowType inputRowType = (RowType) inputEdge.getOutputType(); + + if (!CommonNativeUtil.isSupportedDataType(inputRowType.getChildren())) { + return null; + } + final AggregateInfoList aggInfoList = + AggregateUtil.deriveStreamWindowAggregateInfoList( + inputRowType, + JavaScalaConversionUtil.toScala(Arrays.asList(aggCalls)), + windowing.getWindow(), + false); + if (CommonNativeUtil.isSupportedAggregation(aggInfoList)) { + return null; + } + + int[] distinctIndex = new int[aggInfoList.aggInfos().length]; + + int[] filterIndex = new int[aggInfoList.aggInfos().length]; + Arrays.fill(distinctIndex, -1); + Arrays.fill(filterIndex, -1); + NativeDistinctInfo[] distinctInfos = + new NativeDistinctInfo[aggInfoList.distinctInfos().length]; + int index = 0; + for (DistinctInfo info : aggInfoList.distinctInfos()) { + ArrayList<Integer> filters = new ArrayList<>(info.filterArgs().length()); + info.filterArgs().foreach(filter -> filters.add((Integer) filter)); + + ArrayList<Integer> aggIds = new ArrayList<>(info.aggIndexes().length()); + int finalIndex = index; + + AtomicInteger filter = new AtomicInteger(0); + info.aggIndexes() + .foreach( + aggId -> { + distinctIndex[(int) aggId] = finalIndex; + + filterIndex[(int) aggId] = filters.get(filter.getAndIncrement()); + aggIds.add((Integer) aggId); + return true; + }); + List<LogicalType> logicalKeyTypes = + info.keyType().getChildren().isEmpty() + ? Collections.singletonList(info.keyType().getLogicalType()) + : info.keyType().getChildren().stream() + .map(DataType::getLogicalType) + .collect(Collectors.toList()); + + if (!CommonNativeUtil.isSupportedDataType(logicalKeyTypes)) { + return null; + } + + NativeType[] keyTypes = CommonNativeUtil.getNativeDataType(logicalKeyTypes); + NativeDistinctInfo distinctInfo = + new NativeDistinctInfo( + info.consumeRetraction(), + -1, + keyTypes, + info.argIndexes(), + aggIds.stream().mapToInt(aggId -> aggId).toArray(), + filters.stream().mapToInt(aggId -> aggId).toArray()); + distinctInfos[index++] = distinctInfo; + } + + NativeAggFunctionInfo[] aggFunctions = + new NativeAggFunctionInfo[aggInfoList.aggInfos().length]; + for (AggregateInfo info : aggInfoList.aggInfos()) { + NativeAggFunctionInfo function = + new NativeAggFunctionInfo( + info.consumeRetraction(), + distinctIndex[info.aggIndex()], + info.aggIndex(), + info.argIndexes(), + distinctIndex[info.aggIndex()] >= 0 + ? filterIndex[info.aggIndex()] + : info.agg().filterArg, + CommonNativeUtil.getNativeAggFunctionType(info), + NativeTypeUtils.toNativeType( + info.externalResultType().getLogicalType())); + + aggFunctions[info.aggIndex()] = function; + } + + NativeGroupAggInfo aggInfo = + new NativeGroupAggInfo( + false, + false, + aggInfoList.countStarInserted(), + aggInfoList.getIndexOfCountStar(), + config.get(TaskManagerOptions.TASK_NATIVE_BATCH_SIZE), + config.get(TaskManagerOptions.TASK_NATIVE_LOCAL_AGG_BUFFER_SIZE), + config.get(TaskManagerOptions.TASK_NATIVE_AGG_DISTINCT_TABLE_INITIAL_SIZE), + config.get(TaskManagerOptions.TASK_NATIVE_AGG_RESULT_TABLE_INITIAL_SIZE), + config.get(TaskManagerOptions.TASK_NATIVE_AGG_STRING_SET_INITIAL_SIZE), + new int[0], + grouping, + CommonNativeUtil.getNativeDataType(inputRowType.getChildren()), + aggFunctions, + distinctInfos); + CommonNativeUtil.validateGroupAggInfo(aggInfo); + + OneInputStreamOperator<NativeMemoryRegion, NativeMemoryRegion> operator; + if (windowSpec instanceof TumblingWindowSpec) { + long offset = 0; + long size = ((TumblingWindowSpec) windowSpec).getSize().toMillis(); + + if (((TumblingWindowSpec) windowSpec).getOffset() != null) { + offset = ((TumblingWindowSpec) windowSpec).getOffset().toMillis(); + } + NativeWindowAggInfo windowInfo = + new NativeWindowAggInfo( + timeFieldIndex, + sliceEndIndex, + windowEndIndex, + size, + offset, + windowStrategy, + shiftTimeZone.getId(), + aggInfo); + operator = new NativeLocalWindowOperator(windowInfo); + } else { + throw new UnsupportedOperationException(windowSpec + " is not supported yet."); + } + + // partitioned aggregation + final OneInputTransformation<NativeMemoryRegion, NativeMemoryRegion> transform = + ExecNodeUtil.createOneInputTransformation( + columnarInput, + createTransformationMeta(LOCAL_WINDOW_AGGREGATE_TRANSFORMATION, config), + operator, + new NativeMemoryRegionTypeInfo( + NativeTypeUtils.toNativeType(getOutputType())), + columnarInput.getParallelism()); + + RowDataKeySelector selector = + KeySelectorUtil.getRowDataSelector(grouping, InternalTypeInfo.of(inputRowType)); + // set KeyType and Selector for state + transform.setStateKeySelector(null); + transform.setStateKeyType(selector.getProducedType()); + return (OneInputTransformation) transform; + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java index 238916ee96e..f3f35a298ab 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java @@ -18,9 +18,17 @@ package org.apache.flink.table.planner.plan.nodes.exec.stream; +import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.tools.RelBuilder; import org.apache.flink.FlinkVersion; import org.apache.flink.api.dag.Transformation; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.streaming.api.natives.types.NativeMemoryRegion; +import org.apache.flink.streaming.api.natives.types.NativeMemoryRegionTypeInfo; +import org.apache.flink.streaming.api.natives.types.NativeType; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.transformations.OneInputTransformation; @@ -28,17 +36,11 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.planner.codegen.CodeGeneratorContext; import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator; import org.apache.flink.table.planner.delegation.PlannerBase; -import org.apache.flink.table.planner.plan.logical.WindowingStrategy; -import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; -import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata; -import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import org.apache.flink.table.planner.plan.logical.*; +import org.apache.flink.table.planner.plan.nodes.exec.*; +import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonNativeUtil; import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; -import org.apache.flink.table.planner.plan.utils.AggregateInfoList; -import org.apache.flink.table.planner.plan.utils.AggregateUtil; -import org.apache.flink.table.planner.plan.utils.KeySelectorUtil; +import org.apache.flink.table.planner.plan.utils.*; import org.apache.flink.table.planner.utils.JavaScalaConversionUtil; import org.apache.flink.table.planner.utils.TableConfigUtils; import org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction; @@ -46,25 +48,25 @@ import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty; import org.apache.flink.table.runtime.groupwindow.WindowProperty; import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; import org.apache.flink.table.runtime.operators.aggregate.window.SlicingWindowAggOperatorBuilder; +import org.apache.flink.table.runtime.operators.natives.*; import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner; import org.apache.flink.table.runtime.operators.window.slicing.SliceSharedAssigner; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.runtime.util.TimeWindowUtil; +import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; - -import org.apache.calcite.rel.core.AggregateCall; -import org.apache.calcite.tools.RelBuilder; - +import javax.annotation.Nullable; import java.time.ZoneId; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -83,7 +85,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; producedTransformations = StreamExecWindowAggregate.WINDOW_AGGREGATE_TRANSFORMATION, minPlanVersion = FlinkVersion.v1_15, minStateVersion = FlinkVersion.v1_15) -public class StreamExecWindowAggregate extends StreamExecWindowAggregateBase { +public class StreamExecWindowAggregate extends StreamExecWindowAggregateBase + implements NativeSupportedExec { public static final String WINDOW_AGGREGATE_TRANSFORMATION = "window-aggregate"; @@ -150,6 +153,11 @@ public class StreamExecWindowAggregate extends StreamExecWindowAggregateBase { @Override protected Transformation<RowData> translateToPlanInternal( PlannerBase planner, ExecNodeConfig config) { + Transformation<RowData> nativePlan = translateToNativePlan(planner, config); + if (nativePlan != null) { + return nativePlan; + } + final ExecEdge inputEdge = getInputEdges().get(0); final Transformation<RowData> inputTransform = (Transformation<RowData>) inputEdge.translateToPlan(planner); @@ -248,4 +256,193 @@ public class StreamExecWindowAggregate extends StreamExecWindowAggregateBase { sliceAssigner, shiftTimeZone); } + + @SuppressWarnings("unchecked") + @Nullable + private Transformation<RowData> translateToNativePlan( + PlannerBase planner, ExecNodeConfig config) { + final ZoneId shiftTimeZone = + TimeWindowUtil.getShiftTimeZone( + windowing.getTimeAttributeType(), + TableConfigUtils.getLocalTimeZone(config)); + + // TODO support other window types + if (!(windowing.getWindow() instanceof TumblingWindowSpec)) { + return null; + } + + NativeWindowAggInfo.WindowStrategy windowStrategy; + int timeFieldIndex; + int sliceEndIndex = -1; + int windowEndIndex = -1; + + WindowSpec windowSpec = windowing.getWindow(); + if (windowing instanceof WindowAttachedWindowingStrategy) { + windowEndIndex = + ((WindowAttachedWindowingStrategy) windowing).getWindowEnd(); + timeFieldIndex = Integer.MAX_VALUE; + + windowStrategy = NativeWindowAggInfo.WindowStrategy. WINDOW_ATTACHED: + } else if (windowing instanceof SliceAttachedWindowingStrategy) { + sliceEndIndex = ((SliceAttachedWindowingStrategy) windowing).getSliceEnd(); + timeFieldIndex = Integer.MAX_VALUE; + windowStrategy = NativeWindowAggInfo.WindowStrategy.SLICE_ATTACHED: + } else if (windowing instanceof TimeAttributeWindowingStrategy) { + if (windowing.isRowtime()) { + timeFieldIndex = + ((TimeAttributeWindowingStrategy) windowing).getTimeAttributeIndex(); + } else { + timeFieldIndex = -1; + } + windowStrategy = NativeWindowAggInfo.WindowStrategy.TIME_ATTRIBUTE; + } else { + throw new UnsupportedOperationException(windowing + " is not supported yet."); + } + + final ExecEdge inputEdge = getInputEdges().get(0); + Transformation<NativeMemoryRegion> columnarInput = + (Transformation<NativeMemoryRegion>) inputEdge.translateToPlanNative(planner); + final RowType inputRowType = (RowType) inputEdge.getOutputType(); + + if (!CommonNativeUtil.isSupportedDataType(inputRowType.getChildren())) { + return null; + } + + + // Hopping window requires additional COUNT(*) to determine whether to register next timer + // through whether the current fired window is empty, see SliceSharedWindowAggProcessor. + final AggregateInfoList aggInfoList = getStateBackedaggInfoList(); + if (CommonNativeUtil.isSupportedAggregation(aggInfoList)) { + return null; + } + + int[] distinctIndex = new int[aggInfoList.aggInfos().length]; + int[] filterIndex = new int[aggInfoList.aggInfos().length]; + Arrays.fill(distinctIndex, -1); + Arrays.fill(filterIndex, -1); + + NativeDistinctInfo[] distinctInfos = + new NativeDistinctInfo[aggInfoList.distinctInfos().length]; + int index = 0; + for (DistinctInfo info : aggInfoList.distinctInfos()) { + ArrayList<Integer> filters = new ArrayList<>(info.filterArgs().length()); + info.filterArgs().foreach(filter -> filters.add((Integer) filter)); + + ArrayList<Integer> aggIds = new ArrayList<>(info.aggIndexes().length()); + int finalIndex = index; + + AtomicInteger filter = new AtomicInteger(0); + info.aggIndexes() + .foreach( + aggId -> { + distinctIndex[(int) aggId] = finalIndex; + filterIndex[(int) aggId] = filters.get(filter.getAndIncrement()); + aggIds.add((Integer) aggId); + return true; + }); + + List<LogicalType> logicalKeyTypes = + info.keyType().getChildren().isEmpty() + ? Collections.singletonList(info.keyType().getLogicalType()) + : info.keyType().getChildren().stream() + .map(DataType::getLogicalType).collect(Collectors.toList()); + + if (!CommonNativeUtil.isSupportedDataType(logicalKeyTypes)) { + return null; + } + + NativeType[] keyTypes = CommonNativeUtil.getNativeDataType(logicalKeyTypes); + NativeDistinctInfo distinctInfo = + new NativeDistinctInfo( + info.consumeRetraction(), + -1, + keyTypes, + info.argIndexes(), + aggIds.stream().mapToInt(aggId -> aggId).toArray(), + filters.stream().mapToInt(aggId -> aggId).toArray()); + distinctInfos[index++] = distinctInfo; + } + + NativeAggFunctionInfo[] aggFunctions = + new NativeAggFunctionInfo[aggInfoList.aggInfos().length]; + for (AggregateInfo info : aggInfoList.aggInfos()) { + NativeAggFunctionInfo function = + new NativeAggFunctionInfo( + info.consumeRetraction(), + distinctIndex[info.aggIndex()], + info.aggIndex(), + info.argIndexes(), + distinctIndex[info.aggIndex()] >= 0 + ? filterIndex[info.aggIndex()] + : info.agg().filterArg, + CommonNativeUtil.getNativeAggFunctionType(info), + NativeTypeUtils.toNativeType( + info.externalResultType().getLogicalType())); + aggFunctions[info.aggIndex()] = function; + } + + NativeGroupAggInfo aggInfo = + new NativeGroupAggInfo( + false, + false, + aggInfoList.countStarInserted(), + aggInfoList.getIndexOfCountStar(), + config.get(TaskManagerOptions.TASK_NATIVE_BATCH_SIZE), + config.get(TaskManagerOptions.TASK_NATIVE_FINAL_AGG_BUFFER_SIZE), + config.get(TaskManagerOptions.TASK_NATIVE_AGG_DISTINCT_TABLE_INITIAL_SIZE), + config.get(TaskManagerOptions.TASK_NATIVE_AGG_RESULT_TABLE_INITIAL_SIZE), + config.get(TaskManagerOptions.TASK_NATIVE_AGG_STRING_SET_INITIAL_SIZE), + new int[0], + grouping, + CommonNativeUtil.getNativeDataType(inputRowType.getChildren()), + aggFunctions, + distinctInfos); + CommonNativeUtil.validateGroupAggInfo(aggInfo); + + OneInputStreamOperator<NativeMemoryRegion, NativeMemoryRegion> operator; + if (windowSpec instanceof TumblingWindowSpec) { + long offset = 0; + long size = ((TumblingWindowSpec) windowSpec).getSize().toMillis(); + if (((TumblingWindowSpec) windowSpec).getOffset() != null) { + offset = ((TumblingWindowSpec) windowSpec).getOffset().toMillis(); + } + + NativeWindowAggInfo windowInfo = new NativeWindowAggInfo( + timeFieldIndex, + sliceEndIndex, + windowEndIndex, + size, + offset, + windowStrategy, + shiftTimeZone.getId(), + aggInfo); + operator = new NativeTumbleWindowOperator(windowInfo); + } else { + throw new UnsupportedOperationException(windowSpec + " is not supported yet."); + } + + +// partitioned aggregation + + final OneInputTransformation<NativeMemoryRegion, NativeMemoryRegion> transform = + ExecNodeUtil.createOneInputTransformation( + columnarInput, + + createTransformationMeta(WINDOW_AGGREGATE_TRANSFORMATION, config), + operator, + + new NativeMemoryRegionTypeInfo( + + NativeTypeUtils.toNativeType(getOutputType())), + columnarInput.getParallelism()); + + RowDataKeySelector selector = + KeySelectorUtil.getRowDataSelector(grouping, InternalTypeInfo.of(inputRowType)); +// set KeyType and Selector for state + + transform.setStateKeySelector(null); + + transform.setStateKeyType(selector.getProducedType()); + return (OneInputTransformation)transform; + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/visitor/AbstractExecNodeExactlyOnceVisitor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/visitor/AbstractExecNodeExactlyOnceVisitor.java index 1b8cad82b96..ccee5f5c535 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/visitor/AbstractExecNodeExactlyOnceVisitor.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/visitor/AbstractExecNodeExactlyOnceVisitor.java @@ -26,7 +26,7 @@ import java.util.Set; /** Implement of {@link ExecNodeVisitor}. All nodes are visited exactly once. */ public abstract class AbstractExecNodeExactlyOnceVisitor implements ExecNodeVisitor { - private final Set<ExecNode<?>> visited; + protected final Set<ExecNode<?>> visited; public AbstractExecNodeExactlyOnceVisitor() { this.visited = new HashSet<>();