This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f0d96eb3f94bb41dd3dd8e35d0d6aaab7ca69670
Author: Yun Gao <gaoyunhen...@gmail.com>
AuthorDate: Sun May 28 10:28:32 2023 +0800

    Tmp save
---
 .../natives/StreamExecColumnToRowExchange.java     |   8 +-
 .../NativeRowColumnTranslationProcessor.java       | 348 ++++++++++++++++++++-
 .../stream/StreamExecGlobalWindowAggregate.java    | 219 ++++++++++++-
 .../exec/stream/StreamExecGroupAggregate.java      | 171 +++++++++-
 .../StreamExecIncrementalGroupAggregate.java       | 181 +++++++++--
 .../plan/nodes/exec/stream/StreamExecJoin.java     | 115 ++++++-
 .../exec/stream/StreamExecLocalGroupAggregate.java | 177 ++++++++++-
 .../stream/StreamExecLocalWindowAggregate.java     | 231 +++++++++++++-
 .../exec/stream/StreamExecWindowAggregate.java     | 231 +++++++++++++-
 .../AbstractExecNodeExactlyOnceVisitor.java        |   2 +-
 10 files changed, 1567 insertions(+), 116 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/natives/StreamExecColumnToRowExchange.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/natives/StreamExecColumnToRowExchange.java
index 3e4f253d786..a0ae1e2802e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/natives/StreamExecColumnToRowExchange.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/natives/StreamExecColumnToRowExchange.java
@@ -30,11 +30,7 @@ import 
org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.delegation.PlannerBase;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
-import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import org.apache.flink.table.planner.plan.nodes.exec.*;
 import 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecExchange;
 import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange;
 import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecNode;
@@ -57,7 +53,7 @@ import static 
org.apache.flink.runtime.state.KeyGroupRangeAssignment.DEFAULT_LOW
         minPlanVersion = FlinkVersion.v1_15,
         minStateVersion = FlinkVersion.v1_15)
 public class StreamExecColumnToRowExchange extends CommonExecExchange
-        implements StreamExecNode<RowData> {
+        implements StreamExecNode<RowData>, NativeSupportedExec {
 
     public static final String EXCHANGE_TRANSFORMATION = "exchange";
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/NativeRowColumnTranslationProcessor.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/NativeRowColumnTranslationProcessor.java
index 40f8b6e6f9f..e6f21b360d4 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/NativeRowColumnTranslationProcessor.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/NativeRowColumnTranslationProcessor.java
@@ -19,26 +19,348 @@
 package org.apache.flink.table.planner.plan.nodes.exec.processor;
 
 import org.apache.flink.configuration.ReadableConfig;
-
 import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.*;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
+import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecUnion;
+import 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecExchange;
+import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecUnion;
+import 
org.apache.flink.table.planner.plan.nodes.exec.natives.StreamExecColumnToColumnExchange;
+import 
org.apache.flink.table.planner.plan.nodes.exec.natives.StreamExecColumnToRowExchange;
+import 
org.apache.flink.table.planner.plan.nodes.exec.natives.StreamExecRowToColumnExchange;
+import 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecExchange;
+import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecUnion;
+import 
org.apache.flink.table.planner.plan.nodes.exec.visitor.AbstractExecNodeExactlyOnceVisitor;
+import org.apache.flink.table.types.logical.RowType;
 
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
-import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import static org.apache.flink.util.Preconditions.checkArgument;
 
-import org.apache.flink.table.planner.plan.nodes.exec.NativeSupportedExec;
+public class NativeRowColumnTranslationProcessor implements 
ExecNodeGraphProcessor {
 
-import org.apache.flink.table. planner.plan.nodes.exec.batch.BatchExecUnion;
-import org.apache.flink.table.planner.plan. 
nodes.exec.natives.StreamExecColumnToColumnExchange;
-import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange;
-import 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecExchange;
-import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecUnion;
-import org.apache.flink.table.planner.plan.nodes.exec.natives. 
StreamExecColumnToColumnExchange;
+    private final ReadableConfig tableConfig;
+
+    public NativeRowColumnTranslationProcessor(ReadableConfig tableConfig) {
+        this.tableConfig = tableConfig;
+    }
+
+    @Override
+    public ExecNodeGraph process(ExecNodeGraph execGraph, ProcessorContext 
context) {
+        System.out.println(execGraph.getRootNodes());
+        ReadableConfig config = context.getPlanner().getTableConfig();
+
+        Map<ExecNode<?>, RowOrColumnarEdge> reusableRowOrColumnarEdges = new 
HashMap<>();
+        Map<ExecNode<?>, RowOrColumnarNode> reusableRowOrColumnarNodes = new 
HashMap<>();
+        AbstractExecNodeExactlyOnceVisitor visitor =
+                new AbstractExecNodeExactlyOnceVisitor() {
+                    @Override
+                    protected void visitNode(ExecNode<?> node) {
+                        if (node instanceof CommonExecExchange || node 
instanceof CommonExecUnion) {
+                            visitInputs(node);
+                            return;
+                        }
+
+                        boolean acceptColumnar = supportsNative(node, config);
+                        getInputEdgeRegion(
+                                node, reusableRowOrColumnarEdges, 
reusableRowOrColumnarNodes);
+                        for (int i = 0; i < node.getInputEdges().size(); i++) {
+                            ExecEdge edge = node.getInputEdges().get(i);
+                            ExecEdge newEdge =
+                                    duplicateAndConnectEdges(
+                                            acceptColumnar,
+                                            node,
+                                            edge,
+                                            reusableRowOrColumnarEdges,
+                                            reusableRowOrColumnarNodes,
+                                            config);
+
+                            node.replaceInputEdge(i, newEdge);
+                        }
+                        visitInputs(node);
+                    }
+                };
+        execGraph.getRootNodes().forEach(visitor::visit);
+
+        return execGraph;
+    }
+
+    private ExecEdge duplicateAndConnectEdges(
+            boolean isColumnar,
+            ExecNode<?> target,
+            ExecEdge edge,
+            Map<ExecNode<?>, RowOrColumnarEdge> edges,
+            Map<ExecNode<?>, RowOrColumnarNode> inputs,
+            ReadableConfig config) {
+
+        ExecNode<?> source = edge.getSource();
+
+        boolean found = false;
+        ExecNode<?> duplicated = null;
+
+        if (edges.containsKey(source)) {
+            RowOrColumnarEdge rowOrCol = edges.get(source);
+            ExecNode<?> origin = rowOrCol.originEdge();
+
+            duplicated = isColumnar ? rowOrCol.asColumnarEdge() : 
rowOrCol.asRowEdge();
+            List<ExecEdge> newEdges = new ArrayList<>();
+            for (ExecEdge inputEdge : origin.getInputEdges()) {
+                newEdges.add(
+                        duplicateAndConnectEdges(
+                                isColumnar, duplicated, inputEdge, edges, 
inputs, config));
+            }
+
+            duplicated.setInputEdges(newEdges);
+            found = true;
+        }
+        if (!found
+                && !inputs.containsKey(source)
+                && source instanceof CommonExecExchange
+                && source.getInputEdges().size() == 1) {
+            source = source.getInputEdges().get(0).getSource();
+        }
+
+        if (!found && inputs.containsKey(source)) {
+            RowOrColumnarNode rowOrCol = inputs.get(source);
+
+            duplicated = isColumnar ? rowOrCol.asColumnarNode(config) : 
rowOrCol.asRowNode(config);
+            found = true;
+        }
+        if (!found) {
+
+            throw new IllegalArgumentException();
+        }
+
+        return new ExecEdge(duplicated, target, edge.getShuffle(), 
edge.getExchangeMode());
+    }
+
+    private void getInputEdgeRegion(
+            ExecNode<?> node,
+            Map<ExecNode<?>, RowOrColumnarEdge> reusableRowOrColumnarEdges,
+            Map<ExecNode<?>, RowOrColumnarNode> reusableRowOrColumnarNodes) {
+        for (ExecEdge edge : node.getInputEdges()) {
+            ExecNode<?> source = edge.getSource();
+            if (lisEdge(source)) {
+                if (node instanceof CommonExecExchange) {
+                    // exchange node is swollen by the node
+                    reusableRowOrColumnarNodes.putIfAbsent(
+                            source, new RowOrColumnarNode(source, node));
+                    continue;
+
+                } else {
+                    reusableRowOrColumnarNodes.putIfAbsent(
+                            source, new RowOrColumnarNode(source, null));
+                }
+
+            } else {
+                getInputEdgeRegion(source, reusableRowOrColumnarEdges, 
reusableRowOrColumnarNodes);
+            }
+
+            if (isEdge(node)) {
+                reusableRowOrColumnarEdges.put(node, new 
RowOrColumnarEdge(node));
+            }
+        }
+    }
+
+    private static boolean isEdge(ExecNode<?> node) {
+        return node instanceof CommonExecExchange || node instanceof 
CommonExecUnion;
+    }
+
+    private class RowOrColumnarEdge {
+        private final ExecNode<?> edge;
+        private final boolean isExchange;
+
+        private ExecNode<?> dupRowEdge;
+
+        private ExecNode<?> dupColEdge;
+
+        RowOrColumnarEdge(ExecNode<?> edge) {
+            checkArgument(isEdge(edge));
+            this.edge = edge;
+            this.isExchange = edge instanceof CommonExecExchange;
+        }
+
+        ExecNode<?> originEdge() {
+            return edge;
+        }
+
+        ExecNode<?> asColumnarEdge() {
+            if (dupColEdge == null) {
+                dupColEdge =
+                        lisExchange
+                                ? duplicateEdge()
+                                : new StreamExecColumnToColumnExchange(
+                                        tableConfig,
+                                        edge.getInputProperty(0),
+                                        (RowType) edge.getOutputType(),
+                                        "ColumnToColumn" + 
edge.getDescription());
+            }
+            return dupColEdge;
+        }
+
+        ExecNode<?> asRowEdge() {
+            if (dupRowEdge == null) {
+                dupColEdge = duplicateEdge();
+            }
+            return dupColEdge;
+        }
+
+        private ExecNode<?> duplicateEdge() {
+            if (edge instanceof BatchExecUnion) {
+
+                BatchExecUnion union = (BatchExecUnion) edge;
+                BatchExecUnion newEdge =
+                        new BatchExecUnion(
+                                tableConfig,
+                                union.getInputProperties(),
+                                (RowType) union.getOutputType(),
+                                union.getDescription());
+
+                newEdge.setInputEdges(new ArrayList<>(union.getInputEdges()));
+                return newEdge;
+            }
+            if (edge instanceof StreamExecUnion) {
+                StreamExecUnion union = (StreamExecUnion) edge;
+                StreamExecUnion newEdge =
+                        new StreamExecUnion(
+                                tableConfig,
+                                union.getInputProperties(),
+                                (RowType) union.getOutputType(),
+                                union.getDescription());
+
+                newEdge.setInputEdges(new ArrayList<>(union.getInputEdges()));
+                return newEdge;
+            }
+            if (edge instanceof BatchExecExchange) {
+                BatchExecExchange exchange = (BatchExecExchange) edge;
+                BatchExecExchange newEdge =
+                        new BatchExecExchange(
+                                tableConfig,
+                                exchange.getInputProperties().get(0),
+                                (RowType) exchange.getOutputType(),
+                                exchange.getDescription());
+
+                newEdge.setInputEdges(new 
ArrayList<>(exchange.getInputEdges()));
+                return newEdge;
+            }
+
+            if (edge instanceof StreamExecExchange) {
+                StreamExecExchange exchange = (StreamExecExchange) edge;
+                StreamExecExchange newEdge =
+                        new StreamExecExchange(
+                                tableConfig,
+                                exchange.getInputProperties().get(0),
+                                (RowType) exchange.getOutputType(),
+                                exchange.getDescription());
+
+                newEdge.setInputEdges(new 
ArrayList<>(exchange.getInputEdges()));
+                return newEdge;
+            }
+            throw new IllegalArgumentException();
+        }
+    }
+
+    private class RowOrColumnarNode {
+        private final ExecNode<?> node;
+        private final ExecNode<?> exchange;
+
+        private ExecNode<?> rowNode;
+        private ExecNode<?> colNode;
+
+        RowOrColumnarNode(ExecNode<?> node, ExecNode<?> exchange) {
+            this.node = node;
+            this.exchange = exchange;
+            checkArgument(
+                    !isEdge(node) && (exchange == null || exchange instanceof 
CommonExecExchange));
+        }
+
+        ExecNode<?> asColumnarNode(ReadableConfig config) {
+            if (colNode == null) {
+                if (supportsNative(node, config)) {
+                    if (exchange == null) {
+                        colNode = node;
+                    } else {
+                        new StreamExecColumnToColumnExchange(
+                                tableConfig,
+                                exchange.getInputProperty(0),
+                                (RowType) exchange.getOutputType(),
+                                "ColumnToColumn" + exchange.getDescription());
+                        colNode.setInputEdges(
+                                Collections.singletonList(
+                                        new ExecEdge(
+                                                node,
+                                                colNode,
+                                                ExecEdge.FORWARD_SHUFFLE,
+                                                
StreamExchangeMode.PIPELINED)));
+                    }
+                } else {
+                    colNode =
+                            exchange == null
+                                    ? new StreamExecRowToColumnExchange(
+                                            tableConfig,
+                                            InputProperty.builder()
+                                                    .requiredDistribution(
+                                                            
KEEP_INPUT_AS_IS_DISTRIBUTION)
+                                                    .build(),
+                                            (RowType) node.getOutputType(),
+                                            "Inserted Row to Column")
+                                    : new StreamExecRowToColumnExchange(
+                                            tableConfig,
+                                            exchange.getInputProperty(0),
+                                            (RowType) exchange.getOutputType(),
+                                            "RowToColumn" + 
exchange.getDescription());
 
-public class NativeRowColumnTranslationProcessor {
+                    colNode.setInputEdges(
+                            Collections.singletonList(
+                                    new ExecEdge(
+                                            node,
+                                            colNode,
+                                            ExecEdge.FORWARD_SHUFFLE,
+                                            StreamExchangeMode.PIPELINED)));
+                }
+            }
 
+            return colNode;
+        }
 
+        ExecNode<?> asRowNode(ReadableConfig config) {
+            if (rowNode == null) {
+                if (!supportsNative(node, config)) {
+                    rowNode = exchange == null ? node : exchange;
+                } else {
+                    rowNode =
+                            exchange == null
+                                    ? new StreamExecColumnToRowExchange(
+                                            tableConfig,
+                                            InputProperty.builder()
+                                                    .requiredDistribution(
+                                                            
KEEP_INPUT_AS_IS_DISTRIBUTION)
+                                                    .build(),
+                                            (RowType) node.getOutputType(),
+                                            "Inserted Column to Row")
+                                    : new StreamExecColumnToRowExchange(
+                                            tableConfig,
+                                            exchange.getInputProperty(0),
+                                            (RowType) exchange.getOutputType(),
+                                            "ColumnToRow" + 
exchange.getDescription());
+                    rowNode.setInputEdges(
+                            Collections.singletonList(
+                                    new ExecEdge(
+                                            node,
+                                            rowNode,
+                                            ExecEdge.FORWARD_SHUFFLE,
+                                            StreamExchangeMode.PIPELINED)));
+                }
+            }
+            return rowNode;
+        }
+    }
 
+    private static boolean supportsNative(ExecNode<?> exec, ReadableConfig 
config) {
+        return exec instanceof NativeSupportedExec
+                && ((NativeSupportedExec) exec).canTranslateNative(config);
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
index 9b23711724a..bc792099078 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGlobalWindowAggregate.java
@@ -21,6 +21,10 @@ package 
org.apache.flink.table.planner.plan.nodes.exec.stream;
 import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.streaming.api.natives.types.NativeMemoryRegion;
+import org.apache.flink.streaming.api.natives.types.NativeMemoryRegionTypeInfo;
+import org.apache.flink.streaming.api.natives.types.NativeType;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
@@ -28,17 +32,16 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
 import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator;
 import org.apache.flink.table.planner.delegation.PlannerBase;
-import org.apache.flink.table.planner.plan.logical.WindowingStrategy;
+import org.apache.flink.table.planner.plan.logical.*;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonNativeUtil;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
-import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
-import org.apache.flink.table.planner.plan.utils.AggregateUtil;
-import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.plan.utils.*;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.TableConfigUtils;
 import 
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
@@ -46,6 +49,7 @@ import 
org.apache.flink.table.runtime.groupwindow.NamedWindowProperty;
 import org.apache.flink.table.runtime.groupwindow.WindowProperty;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import 
org.apache.flink.table.runtime.operators.aggregate.window.SlicingWindowAggOperatorBuilder;
+import org.apache.flink.table.runtime.operators.natives.*;
 import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
@@ -61,10 +65,15 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
 import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.calcite.tools.RelBuilder;
 
+import javax.annotation.Nullable;
+import javax.sound.sampled.UnsupportedAudioFileException;
 import java.time.ZoneId;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -150,6 +159,11 @@ public class StreamExecGlobalWindowAggregate extends 
StreamExecWindowAggregateBa
     @Override
     protected Transformation<RowData> translateToPlanInternal(
             PlannerBase planner, ExecNodeConfig config) {
+        Transformation<RowData> nativePlan = translateToNativePlan(planner, 
config);
+        if (nativePlan != null) {
+            return nativePlan;
+        }
+
         final ExecEdge inputEdge = getInputEdges().get(0);
         final Transformation<RowData> inputTransform =
                 (Transformation<RowData>) inputEdge.translateToPlan(planner);
@@ -292,4 +306,201 @@ public class StreamExecGlobalWindowAggregate extends 
StreamExecWindowAggregateBa
                 sliceAssigner,
                 shifTimeZone);
     }
+
+    @SuppressWarnings("unchecked")
+    @Nullable
+    private Transformation<RowData> translateToNativePlan(
+            PlannerBase planner, ExecNodeConfig config) {
+        final ZoneId shiftTimeZone =
+                TimeWindowUtil.getShiftTimeZone(
+                        windowing.getTimeAttributeType(),
+                        TableConfigUtils.getLocalTimeZone(config));
+
+        // TODO support other window types
+        if (!(windowing.getWindow() instanceof TumblingWindowSpec)) {
+            return null;
+        }
+
+        NativeWindowAggInfo.WindowStrategy windowStrategy;
+        int timeFieldIndex;
+
+        int sliceEndIndex = -1;
+        int windowEndIndex = -1;
+
+        WindowSpec windowSpec = windowing.getWindow();
+
+        if (windowing instanceof WindowAttachedWindowingStrategy) {
+            windowEndIndex = ((WindowAttachedWindowingStrategy) 
windowing).getWindowEnd();
+            timeFieldIndex = Integer.MAX_VALUE;
+            windowStrategy = 
NativeWindowAggInfo.WindowStrategy.WINDOW_ATTACHED;
+        } else if (windowing instanceof SliceAttachedWindowingStrategy) {
+            sliceEndIndex = ((SliceAttachedWindowingStrategy) 
windowing).getSliceEnd();
+            timeFieldIndex = Integer.MAX_VALUE;
+            windowStrategy = NativeWindowAggInfo.WindowStrategy.SLICE_ATTACHED;
+        } else if (windowing instanceof TimeAttributeWindowingStrategy) {
+            if (windowing.isRowtime()) {
+                timeFieldIndex =
+                        ((TimeAttributeWindowingStrategy) 
windowing).getTimeAttributeIndex();
+            } else {
+
+                timeFieldIndex = -1;
+            }
+            windowStrategy = NativeWindowAggInfo.WindowStrategy.TIME_ATTRIBUTE;
+        } else {
+            throw new UnsupportedOperationException(windowing + " is not 
supported yet.");
+        }
+
+        final ExecEdge inputEdge = getInputEdges().get(0);
+
+        Transformation<NativeMemoryRegion> columnarInput =
+                (Transformation<NativeMemoryRegion>) 
inputEdge.translateToPlanNative(planner);
+
+        final RowType inputRowType = (RowType) inputEdge.getOutputType();
+
+        if (!CommonNativeUtil.isSupportedDataType(inputRowType.getChildren())) 
{
+            return null;
+        }
+
+        final AggregateInfoList aggInfoList = getStateBackedAggInfoList();
+
+        if (!CommonNativeUtil.isSupportedAggregation(aggInfoList)) {
+            return null;
+        }
+
+        int numDistinct = aggInfoList.distinctInfos().length;
+        int numAggregations = aggInfoList.aggInfos().length;
+
+        int[][] aggInputindex = new int[numAggregations][];
+        int baseOffset = grouping.length;
+        int index = 0;
+        for (AggregateInfo info : aggInfoList.aggInfos()) {
+            int[] inputindex = new int[info.externalAccTypes().length];
+            aggInputindex[index++] = inputIndex;
+            for (int i = 0; i < info.externalAccTypes().length; ++i) {
+                inputIndex[i] = baseOffset++;
+            }
+        }
+        int distinctStateOffset = baseOffset;
+
+        int[] distinctIndex = new int[numAggregations];
+        int[] filterIndex = new int[numAggregations];
+        Arrays.fill(distinctIndex, -1);
+        Arrays.fill(filterIndex, -1);
+
+        NativeDistinctInfo[] distinctInfos = new 
NativeDistinctInfo[numDistinct];
+        index = 0;
+        for (DistinctInfo info : aggInfoList.distinctInfos()) {
+            ArrayList<Integer> filters = new 
ArrayList<>(info.filterArgs().length());
+            info.filterArgs().foreach(filter -> filters.add((Integer) filter));
+
+            ArrayList<Integer> aggIds = new 
ArrayList<>(info.aggIndexes().length());
+            int finalIndex = index;
+
+            AtomicInteger filter = new AtomicInteger(0);
+            info.aggIndexes()
+                    .foreach(
+                            aggId -> {
+                                distinctIndex[(int) aggId] = finalIndex;
+                                filterIndex[(int) aggId] = 
filters.get(filter.getAndIncrement());
+                                aggIds.add((Integer) aggId);
+                                aggIds.add((Integer) aggId);
+                                return true;
+                            });
+
+            List<LogicalType> logicalKeyTypes =
+                    info.keyType().getChildren().isEmpty()
+                            ? 
Collections.singletonList(info.keyType().getLogicalType())
+                            : info.keyType().getChildren().stream()
+                                    .map(DataType::getLogicalType)
+                                    .collect(Collectors.toList());
+            if (!CommonNativeUtil.isSupportedDataType(logicalKeyTypes)) {
+                return null;
+            }
+
+            NativeType[] keyTypes = 
CommonNativeUtil.getNativeDataType(logicalKeyTypes);
+            NativeDistinctInfo distinctInfo =
+                    new NativeDistinctInfo(
+                            info.consumeRetraction(),
+                            distinctStateOffset++,
+                            keyTypes,
+                            info.argIndexes(),
+                            aggIds.stream().mapToInt(aggId -> aggId).toArray(),
+                            filters.stream().mapToInt(aggId -> 
aggId).toArray());
+            distinctInfos[index++] = distinctInfo;
+        }
+
+        NativeAggFunctionInfo[] aggFunctions = new 
NativeAggFunctionInfo[numAggregations];
+        index = 0;
+
+        for (AggregateInfo info : aggInfoList.aggInfos()) {
+            NativeAggFunctionInfo function =
+                    new NativeAggFunctionInfo(
+                            info.consumeRetraction(),
+                            distinctIndex[info.aggIndex()],
+                            info.aggIndex(),
+                            aggInputIndex[index++],
+                            distinctIndex[info.aggIndex()] >= 0
+                                    ? filterIndex[info.aggIndex()]
+                                    : info.agg().filterArg,
+                            CommonNativeUtil.getNativeAggFunctionType(info),
+                            NativeTypeUtils.toNativeType(
+                                    
info.externalResultType().getLogicalType()));
+            aggFunctions[info.aggIndex()] = function;
+        }
+
+        NativeGroupAggInfo aggInfo =
+                new NativeGroupAggInfo(
+                        false,
+                        false,
+                        aggInfoList.countStarInserted(),
+                        aggInfoList.getIndexOfCountStar(),
+                        config.get(TaskManagerOptions.TASK_NATIVE_BATCH_SIZE),
+                        
config.get(TaskManagerOptions.TASK_NATIVE_FINAL_AGG_BUFFER_SIZE),
+                        
config.get(TaskManagerOptions.TASK_NATIVE_AGG_DISTINCT_TABLE_INITIAL_SIZE),
+                        
config.get(TaskManagerOptions.TASK_NATIVE_AGG_RESULT_TABLE_INITIAL_SIZE),
+                        
config.get(TaskManagerOptions.TASK_NATIVE_AGG_STRING_SET_INITIAL_SIZE),
+                        new int[0],
+                        grouping,
+                        
CommonNativeUtil.getNativeDataType(inputRowType.getChildren()),
+                        aggFunctions,
+                        distinctInfos);
+        CommonNativeUtil.validateGroupAggInfo(aggInfo);
+
+        OneInputStreamOperator<NativeMemoryRegion, NativeMemoryRegion> 
operator;
+        if (windowSpec instanceof TumblingWindowSpec) {
+            long offset = 0;
+            long size = ((TumblingWindowSpec) windowSpec).getSize().toMillis();
+            if (((TumblingWindowSpec) windowSpec).getOffset() != null) {
+                offset = ((TumblingWindowSpec) 
windowSpec).getOffset().toMillis();
+            }
+            NativeWindowAggInfo windowInfo =
+                    new NativeWindowAggInfo(
+                            timeFieldIndex,
+                            sliceEndIndex,
+                            windowEndIndex,
+                            size,
+                            offset,
+                            windowStrategy,
+                            shiftTimeZone.getId(),
+                            aggInfo);
+            operator = new NativeTumbleGlobalWindowOperator(windowInfo);
+        } else {
+            throw new UnsupportedOperationException(windowSpec + " is not 
supported yet");
+        }
+
+        final OneInputTransformation<NativeMemoryRegion, NativeMemoryRegion> 
transform =
+                ExecNodeUtil.createOneInputTransformation(
+                        columnarInput,
+                        
createTransformationMeta(GLOBAL_WINDOW_AGGREGATE_TRANSFORMATION, config),
+                        operator,
+                        new NativeMemoryRegionTypeInfo(
+                                NativeTypeUtils.toNativeType(getOutputType())),
+                        columnarInput.getParallelism());
+        RowDataKeySelector selector =
+                KeySelectorUtil.getRowDataSelector(grouping, 
InternalTypeInfo.of(inputRowType));
+        // set KeyType and Selector for state
+        transform.setStateKeySelector(null);
+        transform.setStateKeyType(selector.getProducedType());
+        return (OneInputTransformation) transform;
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
index cc2b87976e1..85f530aac66 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java
@@ -18,9 +18,16 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
+import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.TaskManagerOptions;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.streaming.api.natives.types.NativeMemoryRegion;
+import org.apache.flink.streaming.api.natives.types.NativeMemoryRegionTypeInfo;
+import org.apache.flink.streaming.api.natives.types.NativeType;
 import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
@@ -30,16 +37,10 @@ import 
org.apache.flink.table.planner.codegen.CodeGeneratorContext;
 import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
 import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator;
 import org.apache.flink.table.planner.delegation.PlannerBase;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
-import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import org.apache.flink.table.planner.plan.nodes.exec.*;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonNativeUtil;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
-import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
-import org.apache.flink.table.planner.plan.utils.AggregateUtil;
-import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.plan.utils.*;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
 import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
@@ -47,21 +48,22 @@ import 
org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction;
 import 
org.apache.flink.table.runtime.operators.aggregate.MiniBatchGroupAggFunction;
 import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
+import org.apache.flink.table.runtime.operators.natives.*;
 import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
-
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-
-import org.apache.calcite.rel.core.AggregateCall;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -78,7 +80,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
         producedTransformations = 
StreamExecGroupAggregate.GROUP_AGGREGATE_TRANSFORMATION,
         minPlanVersion = FlinkVersion.v1_15,
         minStateVersion = FlinkVersion.v1_15)
-public class StreamExecGroupAggregate extends StreamExecAggregateBase {
+public class StreamExecGroupAggregate extends StreamExecAggregateBase
+        implements NativeSupportedExec {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(StreamExecGroupAggregate.class);
 
@@ -159,6 +162,11 @@ public class StreamExecGroupAggregate extends 
StreamExecAggregateBase {
                             + "state size. You may specify a retention time of 
0 to not clean up the state.");
         }
 
+        Transformation<RowData> nativePlan = translateToNativePlan(planner, 
config);
+        if (nativePlan != null) {
+            return nativePlan;
+        }
+
         final ExecEdge inputEdge = getInputEdges().get(0);
         final Transformation<RowData> inputTransform =
                 (Transformation<RowData>) inputEdge.translateToPlan(planner);
@@ -256,4 +264,137 @@ public class StreamExecGroupAggregate extends 
StreamExecAggregateBase {
 
         return transform;
     }
+
+    @SuppressWarnings("unchecked")
+    @Nullable
+    private Transformation<RowData> translateToNativePlan(
+            PlannerBase planner, ExecNodeConfig config) {
+        final ExecEdge inputEdge = getInputEdges().get(0);
+        Transformation<NativeMemoryRegion> columnarInput =
+                (Transformation<NativeMemoryRegion>) 
inputEdge.translateToPlanNative(planner);
+
+        final RowType inputRowType = (RowType) inputEdge.getOutputType();
+        if (CommonNativeUtil.isSupportedDataType(inputRowType.getChildren())) {
+            return null;
+        }
+
+        final AggregateInfoList aggInfoList =
+                AggregateUtil.transformToStreamAggregateInfoList(
+                        inputRowType,
+                        
JavaScalaConversionUtil.toScala(Arrays.asList(aggCalls)),
+                        aggCallNeedRetractions,
+                        needRetraction,
+                        true,
+                        true);
+
+        if (!CommonNativeUtil.isSupportedAggregation(aggInfoList)) {
+            return null;
+        }
+
+        int numDistinctions = aggInfoList.distinctInfos().length;
+        int numAggregations = aggInfoList.aggInfos().length;
+        int[] distinctIndex = new int[numAggregations];
+
+        int[] filterIndex = new int[numAggregations];
+
+        Arrays.fill(distinctIndex, -1);
+        Arrays.fill(filterIndex, -1);
+
+        NativeDistinctInfo[] distinctInfos = new 
NativeDistinctInfo[numDistinctions];
+        int index = 0;
+
+        for (DistinctInfo info : aggInfoList.distinctInfos()) {
+            ArrayList<Integer> filters = new 
ArrayList<>(info.filterArgs().length());
+            info.filterArgs().foreach(filter -> filters.add((Integer) filter));
+
+            ArrayList<Integer> aggIds = new 
ArrayList<>(info.aggIndexes().length());
+            int finalIndex = index;
+            AtomicInteger filter = new AtomicInteger(0);
+            info.aggIndexes()
+                    .foreach(
+                            aggId -> {
+                                distinctIndex[(int) aggId] = finalIndex;
+                                filterIndex[(int) aggId] = 
filters.get(filter.getAndIncrement());
+
+                                aggIds.add((Integer) aggId);
+                                return true;
+                            });
+
+            List<LogicalType> logicalKeyTypes =
+                    info.keyType().getChildren().isEmpty()
+                            ? 
Collections.singletonList(info.keyType().getLogicalType())
+                            : info.keyType().getChildren().stream()
+                                    .map(DataType::getLogicalType)
+                                    .collect(Collectors.toList());
+            if (!CommonNativeUtil.isSupportedDataType(logicalKeyTypes)) {
+                return null;
+            }
+            NativeType[] keyTypes = 
CommonNativeUtil.getNativeDataType(logicalKeyTypes);
+            NativeDistinctInfo distinctInfo =
+                    new NativeDistinctInfo(
+                            info.consumeRetraction(),
+                            -1,
+                            keyTypes,
+                            info.argIndexes(),
+                            aggIds.stream().mapToInt(aggId -> aggId).toArray(),
+                            filters.stream().mapToInt(aggId -> 
aggId).toArray());
+            distinctInfos[index++] = distinctInfo;
+        }
+
+        NativeAggFunctionInfo[] aggFunctions = new 
NativeAggFunctionInfo[numAggregations];
+        for (AggregateInfo info : aggInfoList.aggInfos()) {
+            NativeAggFunctionInfo function =
+                    new NativeAggFunctionInfo(
+                            info.consumeRetraction(),
+                            distinctIndex[info.aggIndex()],
+                            info.aggIndex(),
+                            info.argIndexes(),
+                            distinctIndex[info.aggIndex()] >= 0
+                                    ? filterIndex[info.aggIndex()]
+                                    : info.agg().filterArg,
+                            CommonNativeUtil.getNativeAggFunctionType(info),
+                            NativeTypeUtils.toNativeType(
+                                    
info.externalResultType().getLogicalType()));
+
+            aggFunctions[info.aggIndex()] = function;
+        }
+
+        NativeGroupAggInfo aggInfo =
+                new NativeGroupAggInfo(
+                        needRetraction,
+                        generateUpdateBefore,
+                        aggInfoList.countStarInserted(),
+                        aggInfoList.getIndexOfCountStar(),
+                        config.get(TaskManagerOptions.TASK_NATIVE_BATCH_SIZE),
+                        
config.get(TaskManagerOptions.TASK_NATIVE_FINAL_AGG_BUFFER_SIZE),
+                        
config.get(TaskManagerOptions.TASK_NATIVE_AGG_DISTINCT_TABLE_INITIAL_SIZE),
+                        
config.get(TaskManagerOptions.TASK_NATIVE_AGG_RESULT_TABLE_INITIAL_SIZE),
+                        
config.get(TaskManagerOptions.TASK_NATIVE_AGG_STRING_SET_INITIAL_SIZE),
+                        new int[0],
+                        grouping,
+                        
CommonNativeUtil.getNativeDataType(inputRowType.getChildren()),
+                        aggFunctions,
+                        distinctInfos);
+        CommonNativeUtil.validateGroupAggInfo(aggInfo);
+
+        OneInputStreamOperator<NativeMemoryRegion, NativeMemoryRegion> 
operator =
+                new NativeGroupAggOperator(aggInfo);
+
+        // partitioned aggregation
+        final OneInputTransformation<NativeMemoryRegion, NativeMemoryRegion> 
transform =
+                ExecNodeUtil.createOneInputTransformation(
+                        columnarInput,
+                        
createTransformationMeta(GROUP_AGGREGATE_TRANSFORMATION, config),
+                        operator,
+                        new NativeMemoryRegionTypeInfo(
+                                NativeTypeUtils.toNativeType((RowType) 
getOutputType())),
+                        columnarInput.getParallelism());
+
+        // set KeyType and Selector for state
+        final RowDataKeySelector selector =
+                KeySelectorUtil.getRowDataSelector(grouping, 
InternalTypeInfo.of(inputRowType));
+        transform.setStateKeySelector(null);
+        transform.setStateKeyType(selector.getProducedType());
+        return (OneInputTransformation) transform;
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
index 3477010cd59..608ef4d2bc7 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIncrementalGroupAggregate.java
@@ -18,43 +18,44 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.tools.RelBuilder;
 import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.TaskManagerOptions;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.streaming.api.natives.types.NativeMemoryRegion;
+import org.apache.flink.streaming.api.natives.types.NativeMemoryRegionTypeInfo;
+import org.apache.flink.streaming.api.natives.types.NativeType;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
 import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator;
 import org.apache.flink.table.planner.delegation.PlannerBase;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
-import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import org.apache.flink.table.planner.plan.nodes.exec.*;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonNativeUtil;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
-import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
-import org.apache.flink.table.planner.plan.utils.AggregateUtil;
-import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.plan.utils.*;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import 
org.apache.flink.table.runtime.operators.aggregate.MiniBatchIncrementalGroupAggFunction;
 import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
+import org.apache.flink.table.runtime.operators.natives.*;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-
-import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.tools.RelBuilder;
-
+import javax.annotation.Nullable;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -68,7 +69,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
                 
StreamExecIncrementalGroupAggregate.INCREMENTAL_GROUP_AGGREGATE_TRANSFORMATION,
         minPlanVersion = FlinkVersion.v1_15,
         minStateVersion = FlinkVersion.v1_15)
-public class StreamExecIncrementalGroupAggregate extends 
StreamExecAggregateBase {
+public class StreamExecIncrementalGroupAggregate extends 
StreamExecAggregateBase
+        implements NativeSupportedExec {
 
     public static final String INCREMENTAL_GROUP_AGGREGATE_TRANSFORMATION =
             "incremental-group-aggregate";
@@ -164,8 +166,6 @@ public class StreamExecIncrementalGroupAggregate extends 
StreamExecAggregateBase
     protected Transformation<RowData> translateToPlanInternal(
             PlannerBase planner, ExecNodeConfig config) {
         final ExecEdge inputEdge = getInputEdges().get(0);
-        final Transformation<RowData> inputTransform =
-                (Transformation<RowData>) inputEdge.translateToPlan(planner);
 
         final AggregateInfoList partialLocalAggInfoList =
                 AggregateUtil.createPartialAggInfoList(
@@ -176,6 +176,16 @@ public class StreamExecIncrementalGroupAggregate extends 
StreamExecAggregateBase
                         partialAggNeedRetraction,
                         false);
 
+        Transformation<RowData> nativePlan =
+                translateToNativePlan(
+                        planner, inputTypeInfo.toRowType(), 
partialLocalAggInfoList, config);
+        if (nativePlan == null) {
+            return nativePlan;
+        }
+
+        final Transformation<RowData> inputTransform =
+                (Transformation<RowData>) inputEdge.translateToPlan(planner);
+
         final GeneratedAggsHandleFunction partialAggsHandler =
                 generateAggsHandler(
                         "PartialGroupAggsHandler",
@@ -267,4 +277,139 @@ public class StreamExecIncrementalGroupAggregate extends 
StreamExecAggregateBase
                 .needMerge(mergedAccOffset, true, mergedAccExternalTypes)
                 .generateAggsHandler(name, aggInfoList);
     }
+
+    @Nullable
+    private Transformation<RowData> translateToNativePlan(
+            PlannerBase planner,
+            RowType inputRowType,
+            AggregateInfoList aggInfoList,
+            ExecNodeConfig config) {
+        final ExecEdge inputEdge = getInputEdges().get(0);
+        Transformation<NativeMemoryRegion> columnarInput =
+                (Transformation<NativeMemoryRegion>) 
inputEdge.translateToPlanNative(planner);
+        if (!CommonNativeUtil.isSupportedDataType(inputRowType.getChildren())
+                || CommonNativeUtil.isSupportedAggregation(aggInfoList)) {
+            return null;
+        }
+        int numDistinct = aggInfoList.distinctInfos().length;
+        int numAggregations = aggInfoList.aggInfos().length;
+
+        int[][] aggInputIndex = new int[numAggregations][];
+        int baseOffset = partialAggGrouping.length;
+        int index = 0;
+        for (AggregateInfo info : aggInfoList.aggInfos()) {
+            int[] inputIndex = new int[info.externalAccTypes().length];
+            aggInputIndex[index++] = inputIndex;
+            for (int i = 0; i < info.externalAccTypes().length; ++i) {
+                inputIndex[i] = baseOffset++;
+            }
+        }
+        int distinctStateOffset = baseOffset;
+
+        int[] distinctIndex = new int[numAggregations];
+        Arrays.fill(distinctIndex, -1);
+        int[] filterIndex = new int[numAggregations];
+        Arrays.fill(filterIndex, -1);
+
+        NativeDistinctInfo[] distinctInfos = new 
NativeDistinctInfo[numDistinct];
+        index = 0;
+        for (DistinctInfo info : aggInfoList.distinctInfos()) {
+
+            ArrayList<Integer> filters = new 
ArrayList<>(info.filterArgs().length());
+            info.filterArgs().foreach(filter -> filters.add((Integer) filter));
+
+            ArrayList<Integer> aggIds = new 
ArrayList<>(info.aggIndexes().length());
+            int finalIndex = index;
+
+            AtomicInteger filter = new AtomicInteger(0);
+            info.aggIndexes()
+                    .foreach(
+                            aggId -> {
+                                distinctIndex[(int) aggId] = finalIndex;
+                                filterIndex[(int) aggId] = 
filters.get(filter.getAndIncrement());
+                                aggIds.add((Integer) aggId);
+                                return true;
+                            });
+            if (!info.keyType().getChildren().isEmpty()) {
+                return null;
+            }
+            List<LogicalType> logicalKeyTypes =
+                    Collections.singletonList(info.keyType().getLogicalType());
+            if (!CommonNativeUtil.isSupportedDataType(logicalKeyTypes)) {
+                return null;
+            }
+            NativeType[] keyTypes = 
CommonNativeUtil.getNativeDataType(logicalKeyTypes);
+            NativeDistinctInfo distinctInfo =
+                    new NativeDistinctInfo(
+                            info.consumeRetraction(),
+                            distinctStateOffset++,
+                            keyTypes,
+                            info.argIndexes(),
+                            aggIds.stream().mapToInt(aggId -> aggId).toArray(),
+                            filters.stream().mapToInt(aggId -> 
aggId).toArray());
+            distinctInfos[index++] = distinctInfo;
+        }
+
+        NativeAggFunctionInfo[] aggFunctions = new 
NativeAggFunctionInfo[numAggregations];
+        index = 0;
+
+        for (AggregateInfo info : aggInfoList.aggInfos()) {
+            NativeAggFunctionInfo function =
+                    new NativeAggFunctionInfo(
+                            info.consumeRetraction(),
+                            distinctIndex[info.aggIndex()],
+                            info.aggIndex(),
+                            aggInputIndex[index++],
+                            distinctIndex[info.aggIndex()] >= 0
+                                    ? filterIndex[info.aggIndex()]
+                                    : info.agg().filterArg,
+                            CommonNativeUtil.getNativeAggFunctionType(info),
+                            NativeTypeUtils.toNativeType(
+                                    
info.externalResultType().getLogicalType()));
+            aggFunctions[info.aggIndex()] = function;
+            NativeGroupAggInfo aggInfo =
+                    new NativeGroupAggInfo(
+                            partialAggNeedRetraction,
+                            false,
+                            aggInfoList.countStarInserted(),
+                            aggInfoList.getIndexOfCountStar(),
+                            
config.get(TaskManagerOptions.TASK_NATIVE_BATCH_SIZE),
+                            
config.get(TaskManagerOptions.TASK_NATIVE_INCREMENTAL_AGG_BUFFER_SIZE),
+                            config.get(
+                                    
TaskManagerOptions.TASK_NATIVE_AGG_DISTINCT_TABLE_INITIAL_SIZE),
+                            config.get(
+                                    
TaskManagerOptions.TASK_NATIVE_AGG_RESULT_TABLE_INITIAL_SIZE),
+                            
config.get(TaskManagerOptions.TASK_NATIVE_AGG_STRING_SET_INITIAL_SIZE),
+                            partialAggGrouping,
+                            finalAggGrouping,
+                            
CommonNativeUtil.getNativeDataType(inputRowType.getChildren()),
+                            aggFunctions,
+                            distinctInfos);
+            CommonNativeUtil.validateGroupAggInfo(aggInfo);
+            OneInputStreamOperator<NativeMemoryRegion, NativeMemoryRegion> 
operator =
+                    new NativeIncGroupAggOperator(aggInfo);
+
+            // partitioned aggregation
+            final OneInputTransformation<NativeMemoryRegion, 
NativeMemoryRegion> transform =
+                    ExecNodeUtil.createOneInputTransformation(
+                            columnarInput,
+                            createTransformationMeta(
+                                    
INCREMENTAL_GROUP_AGGREGATE_TRANSFORMATION, config),
+                            operator,
+                            new NativeMemoryRegionTypeInfo(
+                                    NativeTypeUtils.toNativeType((RowType) 
getOutputType())),
+                            columnarInput.getParallelism(),
+                            0);
+
+            // set KeyType and Selector for state
+            final RowDataKeySelector selector =
+                    KeySelectorUtil.getRowDataSelector(
+                            partialAggGrouping, 
InternalTypeInfo.of(inputEdge.getOutputType()));
+
+            transform.setStateKeySelector(null);
+            transform.setStateKeyType(selector.getProducedType());
+
+            return (OneInputTransformation) transform;
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
index dbf399d8ae5..739581f0061 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecJoin.java
@@ -19,20 +19,24 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
 import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.streaming.api.natives.types.NativeMemoryRegion;
+import org.apache.flink.streaming.api.natives.types.NativeMemoryRegionTypeInfo;
+import org.apache.flink.streaming.api.natives.types.NativeType;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.delegation.PlannerBase;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
-import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
-import 
org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
+import org.apache.flink.table.planner.plan.nodes.exec.*;
 import org.apache.flink.table.planner.plan.nodes.exec.spec.JoinSpec;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonNativeUtil;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
 import org.apache.flink.table.planner.plan.utils.JoinUtil;
 import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
@@ -43,6 +47,9 @@ import 
org.apache.flink.table.runtime.operators.join.stream.AbstractStreamingJoi
 import 
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator;
 import 
org.apache.flink.table.runtime.operators.join.stream.StreamingSemiAntiJoinOperator;
 import 
org.apache.flink.table.runtime.operators.join.stream.state.JoinInputSideSpec;
+import org.apache.flink.table.runtime.operators.natives.NativeJoinInfo;
+import 
org.apache.flink.table.runtime.operators.natives.NativeStreamingJoinOperator;
+import org.apache.flink.table.runtime.operators.natives.NativeTypeUtils;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
 
@@ -51,7 +58,9 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCre
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -69,7 +78,9 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
         minPlanVersion = FlinkVersion.v1_15,
         minStateVersion = FlinkVersion.v1_15)
 public class StreamExecJoin extends ExecNodeBase<RowData>
-        implements StreamExecNode<RowData>, 
SingleTransformationTranslator<RowData> {
+        implements StreamExecNode<RowData>,
+                SingleTransformationTranslator<RowData>,
+                NativeSupportedExec {
 
     public static final String JOIN_TRANSFORMATION = "join";
 
@@ -131,6 +142,12 @@ public class StreamExecJoin extends ExecNodeBase<RowData>
     @SuppressWarnings("unchecked")
     protected Transformation<RowData> translateToPlanInternal(
             PlannerBase planner, ExecNodeConfig config) {
+
+        Transformation<RowData> nativePlan = translateToNativePlan(planner, 
config);
+        if (nativePlan != null) {
+            return nativePlan;
+        }
+
         final ExecEdge leftInputEdge = getInputEdges().get(0);
         final ExecEdge rightInputEdge = getInputEdges().get(1);
 
@@ -223,4 +240,86 @@ public class StreamExecJoin extends ExecNodeBase<RowData>
         transform.setStateKeyType(leftSelect.getProducedType());
         return transform;
     }
+
+    private Transformation<RowData> translateToNativePlan(
+            PlannerBase planner, ExecNodeConfig config) {
+        final ExecEdge leftInputEdge = getInputEdges().get(0);
+        final ExecEdge rightInputEdge = getInputEdges().get(1);
+
+        final Transformation<NativeMemoryRegion> leftColumnarInput =
+                (Transformation<NativeMemoryRegion>) 
leftInputEdge.translateToPlanNative(planner);
+        final Transformation<NativeMemoryRegion> rightColumnarInput =
+                (Transformation<NativeMemoryRegion>) 
rightInputEdge.translateToPlanNative(planner);
+
+        final RowType leftType = (RowType) leftInputEdge.getOutputType();
+        final RowType rightType = (RowType) rightInputEdge.getOutputType();
+        JoinUtil.validateJoinSpec(joinSpec, leftType, rightType, true);
+
+        final int[] leftJoinKey = joinSpec.getLeftKeys();
+        final int[] rightJoinKey = joinSpec.getRightKeys();
+
+        NativeType[] leftKeyTypes = 
CommonNativeUtil.getNativeDataType(leftType.getChildren());
+        NativeType[] rightKeyTypes = 
CommonNativeUtil.getNativeDataType(rightType.getChildren());
+
+        Optional<RexNode> condition = joinSpec.getNonEquiCondition();
+        NativeJoinInfo joinInfo =
+                new NativeJoinInfo(
+                        joinSpec.getJoinType(),
+                        config.get(TaskManagerOptions.TASK_NATIVE_BATCH_SIZE),
+                        leftJoinKey,
+                        rightJoinKey,
+                        leftKeyTypes,
+                        rightKeyTypes,
+                        condition.map(this::rexToNativeExpr).orElse(null));
+        System.out.println(joinInfo);
+
+        TwoInputStreamOperator<NativeMemoryRegion, NativeMemoryRegion, 
NativeMemoryRegion>
+                operator = new NativeStreamingJoinOperator(joinInfo);
+
+        final RowType returnType = (RowType) getOutputType();
+
+        final TwoInputTransformation<NativeMemoryRegion, NativeMemoryRegion, 
NativeMemoryRegion>
+                transform =
+                        ExecNodeUtil.createTwoInputTransformation(
+                                leftColumnarInput,
+                                rightColumnarInput,
+                                createTransformationMeta(JOIN_TRANSFORMATION, 
config),
+                                operator,
+                                new NativeMemoryRegionTypeInfo(
+                                        
NativeTypeUtils.toNativeType(returnType)),
+                                leftColumnarInput.getParallelism());
+
+        final InternalTypeInfo<RowData> leftTypeInfo = 
InternalTypeInfo.of(leftType);
+
+        // set KeyType and Selector for state
+        RowDataKeySelector leftSelect =
+                KeySelectorUtil.getRowDataSelector(joinSpec.getLeftKeys(), 
leftTypeInfo);
+        transform.setStateKeySelectors(null, null);
+
+        transform.setStateKeyType(leftSelect.getProducedType());
+        return (TwoInputTransformation) transform;
+    }
+
+    private NativeJoinInfo.Expr rexToNativeExpr(RexNode node) {
+        if (node instanceof RexInputRef) {
+            RexInputRef ref = (RexInputRef) node;
+            RelDataType fieldType = ref.getType();
+            return new NativeJoinInfo.FieldAccessExpr(
+                    CommonNativeUtil.toNativeType(fieldType), ref.getIndex());
+        } else if (node instanceof RexCall) {
+            RexCall call = (RexCall) node;
+            List<NativeJoinInfo.Expr> operandExprs = new ArrayList<>();
+            for (RexNode operand : call.getOperands()) {
+                operandExprs.add(rexToNativeExpr(operand));
+            }
+
+            RelDataType resultType = call.getType();
+            return new NativeJoinInfo.CallExpr(
+                    CommonNativeUtil.toNativeType(resultType),
+                    call.getOperator().getName(),
+                    operandExprs);
+        }
+
+        throw new RuntimeException("Unsupported RexNode Type " + node);
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalGroupAggregate.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalGroupAggregate.java
index 93067254cc7..07e76f3238e 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalGroupAggregate.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalGroupAggregate.java
@@ -18,39 +18,44 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
+import org.apache.calcite.rel.core.AggregateCall;
 import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.TaskManagerOptions;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.streaming.api.natives.types.NativeMemoryRegion;
+import org.apache.flink.streaming.api.natives.types.NativeMemoryRegionTypeInfo;
+import org.apache.flink.streaming.api.natives.types.NativeType;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
 import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator;
 import org.apache.flink.table.planner.delegation.PlannerBase;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
-import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import org.apache.flink.table.planner.plan.nodes.exec.*;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonNativeUtil;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
-import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
-import org.apache.flink.table.planner.plan.utils.AggregateUtil;
-import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.plan.utils.*;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import 
org.apache.flink.table.runtime.operators.aggregate.MiniBatchLocalGroupAggFunction;
 import org.apache.flink.table.runtime.operators.bundle.MapBundleOperator;
+import org.apache.flink.table.runtime.operators.natives.*;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-
-import org.apache.calcite.rel.core.AggregateCall;
-
+import javax.annotation.Nullable;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -67,7 +72,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
                 
StreamExecLocalGroupAggregate.LOCAL_GROUP_AGGREGATE_TRANSFORMATION,
         minPlanVersion = FlinkVersion.v1_15,
         minStateVersion = FlinkVersion.v1_15)
-public class StreamExecLocalGroupAggregate extends StreamExecAggregateBase {
+public class StreamExecLocalGroupAggregate extends StreamExecAggregateBase
+        implements NativeSupportedExec {
 
     public static final String LOCAL_GROUP_AGGREGATE_TRANSFORMATION = 
"local-group-aggregate";
 
@@ -133,8 +139,6 @@ public class StreamExecLocalGroupAggregate extends 
StreamExecAggregateBase {
     protected Transformation<RowData> translateToPlanInternal(
             PlannerBase planner, ExecNodeConfig config) {
         final ExecEdge inputEdge = getInputEdges().get(0);
-        final Transformation<RowData> inputTransform =
-                (Transformation<RowData>) inputEdge.translateToPlan(planner);
         final RowType inputRowType = (RowType) inputEdge.getOutputType();
 
         final AggsHandlerCodeGenerator generator =
@@ -159,6 +163,15 @@ public class StreamExecLocalGroupAggregate extends 
StreamExecAggregateBase {
                         needRetraction,
                         false, // isStateBackendDataViews
                         true); // needDistinctInfo
+
+        Transformation<RowData> nativePlan = translateToNativePlan(planner, 
config, aggInfoList);
+        if (nativePlan != null) {
+            return nativePlan;
+        }
+
+        final Transformation<RowData> inputTransform =
+                (Transformation<RowData>) inputEdge.translateToPlan(planner);
+
         final GeneratedAggsHandleFunction aggsHandler =
                 generator.generateAggsHandler("GroupAggsHandler", aggInfoList);
         final MiniBatchLocalGroupAggFunction aggFunction =
@@ -181,4 +194,134 @@ public class StreamExecLocalGroupAggregate extends 
StreamExecAggregateBase {
                 InternalTypeInfo.of(getOutputType()),
                 inputTransform.getParallelism());
     }
+
+    @Nullable
+    private Transformation<RowData> translateToNativePlan(
+            PlannerBase planner, ExecNodeConfig config, AggregateInfoList 
aggInfoList) {
+        final ExecEdge inputEdge = getInputEdges().get(0);
+
+        Transformation<NativeMemoryRegion> columnarInput =
+                (Transformation<NativeMemoryRegion>) 
inputEdge.translateToPlanNative(planner);
+
+        // Check if the types are all supported.
+        final RowType inputRowType = (RowType) inputEdge.getOutputType();
+
+        if (!CommonNativeUtil.isSupportedDataType(inputRowType.getChildren())) 
{
+            return null;
+        }
+
+        if (!CommonNativeUtil.isSupportedAggregation(aggInfoList)) {
+            return null;
+        }
+
+        int numDistinctions = aggInfoList.distinctInfos().length;
+        int numAggregations = aggInfoList.aggInfos().length;
+        int[] distinctIndex = new int[numAggregations];
+
+        int[] filterIndex = new int[numAggregations];
+        Arrays.fill(distinctIndex, -1);
+
+        Arrays.fill(filterIndex, -1);
+
+        NativeDistinctInfo[] distinctInfos = new 
NativeDistinctInfo[numDistinctions];
+        int index = 0;
+
+        for (DistinctInfo info : aggInfoList.distinctInfos()) {
+            ArrayList<Integer> filters = new 
ArrayList<>(info.filterArgs().length());
+            info.filterArgs().foreach(filter -> filters.add((Integer) filter));
+
+            ArrayList<Integer> aggIds = new 
ArrayList<>(info.aggIndexes().length());
+            int finalIndex = index;
+            AtomicInteger filter = new AtomicInteger(0);
+            info.aggIndexes()
+                    .foreach(
+                            aggId -> {
+                                distinctIndex[(int) aggId] = finalIndex;
+                                filterIndex[(int) aggId] = 
filters.get(filter.getAndIncrement());
+                                aggIds.add((Integer) aggId);
+                                return true;
+                            });
+
+            List<LogicalType> logicalKeyTypes =
+                    info.keyType().getChildren().isEmpty()
+                            ? 
Collections.singletonList(info.keyType().getLogicalType())
+                            : info.keyType().getChildren().stream()
+                                    .map(DataType::getLogicalType)
+                                    .collect(Collectors.toList());
+            if (!CommonNativeUtil.isSupportedDataType(logicalKeyTypes)) {
+                return null;
+            }
+
+            NativeType[] keyTypes = 
CommonNativeUtil.getNativeDataType(logicalKeyTypes);
+            NativeDistinctInfo distinctInfo =
+                    new NativeDistinctInfo(
+                            info.consumeRetraction(),
+                            -1,
+                            keyTypes,
+                            info.argIndexes(),
+                            aggIds.stream().mapToInt(aggId -> aggId).toArray(),
+                            filters.stream().mapToInt(aggId -> 
aggId).toArray());
+            distinctInfos[index++] = distinctInfo;
+        }
+
+        NativeAggFunctionInfo[] aggFunctions = new 
NativeAggFunctionInfo[numAggregations];
+        for (AggregateInfo info : aggInfoList.aggInfos()) {
+            NativeAggFunctionInfo function =
+                    new NativeAggFunctionInfo(
+                            info.consumeRetraction(),
+                            distinctIndex[info.aggIndex()],
+                            info.aggIndex(),
+                            info.argIndexes(),
+                            distinctIndex[info.aggIndex()] >= 0
+                                    ? filterIndex[info.aggIndex()]
+                                    : info.agg().filterArg,
+                            CommonNativeUtil.getNativeAggFunctionType(info),
+                            NativeTypeUtils.toNativeType(
+                                    
info.externalResultType().getLogicalType()));
+            aggFunctions[info.aggIndex()] = function;
+        }
+
+        NativeGroupAggInfo aggInfo =
+                new NativeGroupAggInfo(
+                        needRetraction,
+                        false,
+                        aggInfoList.countStarInserted(),
+                        aggInfoList.getIndexOfCountStar(),
+                        config.get(TaskManagerOptions.TASK_NATIVE_BATCH_SIZE),
+                        
config.get(TaskManagerOptions.TASK_NATIVE_LOCAL_AGG_BUFFER_SIZE),
+                        
config.get(TaskManagerOptions.TASK_NATIVE_AGG_DISTINCT_TABLE_INITIAL_SIZE),
+                        
config.get(TaskManagerOptions.TASK_NATIVE_AGG_RESULT_TABLE_INITIAL_SIZE),
+                        
config.get(TaskManagerOptions.TASK_NATIVE_AGG_STRING_SET_INITIAL_SIZE),
+                        new int[0],
+                        grouping,
+                        
CommonNativeUtil.getNativeDataType(inputRowType.getChildren()),
+                        aggFunctions,
+                        distinctInfos);
+        CommonNativeUtil.validateGroupAggInfo(aggInfo);
+
+        // Let's first still return row-data...
+        OneInputStreamOperator<NativeMemoryRegion, NativeMemoryRegion> 
operator =
+                new NativeLocalGroupAggOperator(aggInfo);
+
+        // partitioned aggregation
+        final OneInputTransformation<NativeMemoryRegion, NativeMemoryRegion> 
transform =
+                ExecNodeUtil.createOneInputTransformation(
+                        columnarInput,
+                        
createTransformationMeta(LOCAL_GROUP_AGGREGATE_TRANSFORMATION, config),
+                        operator,
+                        new NativeMemoryRegionTypeInfo(
+                                NativeTypeUtils.toNativeType((RowType) 
getOutputType())),
+                        columnarInput.getParallelism(),
+                        0);
+
+        // set KeyType and Selector for state
+        final RowDataKeySelector selector =
+                KeySelectorUtil.getRowDataSelector(grouping, 
InternalTypeInfo.of(inputRowType));
+
+        transform.setStateKeySelector(null);
+        transform.setStateKeyType(selector.getProducedType());
+
+        // Luckily in Java generic type is just a compiler checks. Let's just 
skip the check...
+        return (OneInputTransformation) transform;
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
index 69f7c77e580..e24949a5e92 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLocalWindowAggregate.java
@@ -18,26 +18,29 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.tools.RelBuilder;
 import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.TaskManagerOptions;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.streaming.api.natives.types.NativeMemoryRegion;
+import org.apache.flink.streaming.api.natives.types.NativeMemoryRegionTypeInfo;
+import org.apache.flink.streaming.api.natives.types.NativeType;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
 import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator;
 import org.apache.flink.table.planner.delegation.PlannerBase;
-import org.apache.flink.table.planner.plan.logical.WindowingStrategy;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
-import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import org.apache.flink.table.planner.plan.logical.*;
+import org.apache.flink.table.planner.plan.nodes.exec.*;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonNativeUtil;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
-import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
-import org.apache.flink.table.planner.plan.utils.AggregateUtil;
-import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.plan.utils.*;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.TableConfigUtils;
 import 
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
@@ -46,25 +49,25 @@ import 
org.apache.flink.table.runtime.operators.aggregate.window.LocalSlicingWin
 import 
org.apache.flink.table.runtime.operators.aggregate.window.buffers.RecordsWindowBuffer;
 import 
org.apache.flink.table.runtime.operators.aggregate.window.buffers.WindowBuffer;
 import 
org.apache.flink.table.runtime.operators.aggregate.window.combines.LocalAggCombiner;
+import org.apache.flink.table.runtime.operators.natives.*;
 import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner;
 import org.apache.flink.table.runtime.typeutils.AbstractRowDataSerializer;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.runtime.util.TimeWindowUtil;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-
-import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.tools.RelBuilder;
-
+import javax.annotation.Nullable;
 import java.time.ZoneId;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -77,7 +80,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
                 
StreamExecLocalWindowAggregate.LOCAL_WINDOW_AGGREGATE_TRANSFORMATION,
         minPlanVersion = FlinkVersion.v1_15,
         minStateVersion = FlinkVersion.v1_15)
-public class StreamExecLocalWindowAggregate extends 
StreamExecWindowAggregateBase {
+public class StreamExecLocalWindowAggregate extends 
StreamExecWindowAggregateBase
+        implements NativeSupportedExec {
 
     public static final String LOCAL_WINDOW_AGGREGATE_TRANSFORMATION = 
"local-window-aggregate";
 
@@ -136,6 +140,11 @@ public class StreamExecLocalWindowAggregate extends 
StreamExecWindowAggregateBas
     @Override
     protected Transformation<RowData> translateToPlanInternal(
             PlannerBase planner, ExecNodeConfig config) {
+        Transformation<RowData> nativePlan = translateToNativePlan(planner, 
config);
+        if (nativePlan != null) {
+            return nativePlan;
+        }
+
         final ExecEdge inputEdge = getInputEdges().get(0);
         final Transformation<RowData> inputTransform =
                 (Transformation<RowData>) inputEdge.translateToPlan(planner);
@@ -216,4 +225,192 @@ public class StreamExecLocalWindowAggregate extends 
StreamExecWindowAggregateBas
                 sliceAssigner,
                 shiftTimeZone);
     }
+
+    @SuppressWarnings("unchecked")
+    @Nullable
+    private Transformation<RowData> translateToNativePlan(
+            PlannerBase planner, ExecNodeConfig config) {
+        final ZoneId shiftTimeZone =
+                TimeWindowUtil.getShiftTimeZone(
+                        windowing.getTimeAttributeType(),
+                        TableConfigUtils.getLocalTimeZone(config));
+
+        // TODO support other window types
+
+        if (!(windowing.getWindow() instanceof TumblingWindowSpec)) {
+            return null;
+        }
+
+        NativeWindowAggInfo.WindowStrategy windowStrategy;
+        int timeFieldIndex;
+
+        int sliceEndIndex = -1;
+        int windowEndIndex = -1;
+
+        WindowSpec windowSpec = windowing.getWindow();
+
+        if (windowing instanceof WindowAttachedWindowingStrategy) {
+            windowEndIndex = ((WindowAttachedWindowingStrategy) 
windowing).getWindowEnd();
+            timeFieldIndex = Integer.MAX_VALUE;
+            windowStrategy = 
NativeWindowAggInfo.WindowStrategy.WINDOW_ATTACHED;
+        } else if (windowing instanceof SliceAttachedWindowingStrategy) {
+            sliceEndIndex = ((SliceAttachedWindowingStrategy) 
windowing).getSliceEnd();
+            timeFieldIndex = Integer.MAX_VALUE;
+
+            windowStrategy = NativeWindowAggInfo.WindowStrategy.SLICE_ATTACHED;
+        } else if (windowing instanceof TimeAttributeWindowingStrategy) {
+            if (windowing.isRowtime()) {
+                timeFieldIndex =
+                        ((TimeAttributeWindowingStrategy) 
windowing).getTimeAttributeIndex();
+            } else {
+                timeFieldIndex = -1;
+            }
+            windowStrategy = NativeWindowAggInfo.WindowStrategy.TIME_ATTRIBUTE;
+        } else {
+            throw new UnsupportedOperationException(windowing + " is not 
supported yet.");
+        }
+
+        final ExecEdge inputEdge = getInputEdges().get(0);
+        Transformation<NativeMemoryRegion> columnarInput =
+                (Transformation<NativeMemoryRegion>) 
inputEdge.translateToPlanNative(planner);
+        final RowType inputRowType = (RowType) inputEdge.getOutputType();
+
+        if (!CommonNativeUtil.isSupportedDataType(inputRowType.getChildren())) 
{
+            return null;
+        }
+        final AggregateInfoList aggInfoList =
+                AggregateUtil.deriveStreamWindowAggregateInfoList(
+                        inputRowType,
+                        
JavaScalaConversionUtil.toScala(Arrays.asList(aggCalls)),
+                        windowing.getWindow(),
+                        false);
+        if (CommonNativeUtil.isSupportedAggregation(aggInfoList)) {
+            return null;
+        }
+
+        int[] distinctIndex = new int[aggInfoList.aggInfos().length];
+
+        int[] filterIndex = new int[aggInfoList.aggInfos().length];
+        Arrays.fill(distinctIndex, -1);
+        Arrays.fill(filterIndex, -1);
+        NativeDistinctInfo[] distinctInfos =
+                new NativeDistinctInfo[aggInfoList.distinctInfos().length];
+        int index = 0;
+        for (DistinctInfo info : aggInfoList.distinctInfos()) {
+            ArrayList<Integer> filters = new 
ArrayList<>(info.filterArgs().length());
+            info.filterArgs().foreach(filter -> filters.add((Integer) filter));
+
+            ArrayList<Integer> aggIds = new 
ArrayList<>(info.aggIndexes().length());
+            int finalIndex = index;
+
+            AtomicInteger filter = new AtomicInteger(0);
+            info.aggIndexes()
+                    .foreach(
+                            aggId -> {
+                                distinctIndex[(int) aggId] = finalIndex;
+
+                                filterIndex[(int) aggId] = 
filters.get(filter.getAndIncrement());
+                                aggIds.add((Integer) aggId);
+                                return true;
+                            });
+            List<LogicalType> logicalKeyTypes =
+                    info.keyType().getChildren().isEmpty()
+                            ? 
Collections.singletonList(info.keyType().getLogicalType())
+                            : info.keyType().getChildren().stream()
+                                    .map(DataType::getLogicalType)
+                                    .collect(Collectors.toList());
+
+            if (!CommonNativeUtil.isSupportedDataType(logicalKeyTypes)) {
+                return null;
+            }
+
+            NativeType[] keyTypes = 
CommonNativeUtil.getNativeDataType(logicalKeyTypes);
+            NativeDistinctInfo distinctInfo =
+                    new NativeDistinctInfo(
+                            info.consumeRetraction(),
+                            -1,
+                            keyTypes,
+                            info.argIndexes(),
+                            aggIds.stream().mapToInt(aggId -> aggId).toArray(),
+                            filters.stream().mapToInt(aggId -> 
aggId).toArray());
+            distinctInfos[index++] = distinctInfo;
+        }
+
+        NativeAggFunctionInfo[] aggFunctions =
+                new NativeAggFunctionInfo[aggInfoList.aggInfos().length];
+        for (AggregateInfo info : aggInfoList.aggInfos()) {
+            NativeAggFunctionInfo function =
+                    new NativeAggFunctionInfo(
+                            info.consumeRetraction(),
+                            distinctIndex[info.aggIndex()],
+                            info.aggIndex(),
+                            info.argIndexes(),
+                            distinctIndex[info.aggIndex()] >= 0
+                                    ? filterIndex[info.aggIndex()]
+                                    : info.agg().filterArg,
+                            CommonNativeUtil.getNativeAggFunctionType(info),
+                            NativeTypeUtils.toNativeType(
+                                    
info.externalResultType().getLogicalType()));
+
+            aggFunctions[info.aggIndex()] = function;
+        }
+
+        NativeGroupAggInfo aggInfo =
+                new NativeGroupAggInfo(
+                        false,
+                        false,
+                        aggInfoList.countStarInserted(),
+                        aggInfoList.getIndexOfCountStar(),
+                        config.get(TaskManagerOptions.TASK_NATIVE_BATCH_SIZE),
+                        
config.get(TaskManagerOptions.TASK_NATIVE_LOCAL_AGG_BUFFER_SIZE),
+                        
config.get(TaskManagerOptions.TASK_NATIVE_AGG_DISTINCT_TABLE_INITIAL_SIZE),
+                        
config.get(TaskManagerOptions.TASK_NATIVE_AGG_RESULT_TABLE_INITIAL_SIZE),
+                        
config.get(TaskManagerOptions.TASK_NATIVE_AGG_STRING_SET_INITIAL_SIZE),
+                        new int[0],
+                        grouping,
+                        
CommonNativeUtil.getNativeDataType(inputRowType.getChildren()),
+                        aggFunctions,
+                        distinctInfos);
+        CommonNativeUtil.validateGroupAggInfo(aggInfo);
+
+        OneInputStreamOperator<NativeMemoryRegion, NativeMemoryRegion> 
operator;
+        if (windowSpec instanceof TumblingWindowSpec) {
+            long offset = 0;
+            long size = ((TumblingWindowSpec) windowSpec).getSize().toMillis();
+
+            if (((TumblingWindowSpec) windowSpec).getOffset() != null) {
+                offset = ((TumblingWindowSpec) 
windowSpec).getOffset().toMillis();
+            }
+            NativeWindowAggInfo windowInfo =
+                    new NativeWindowAggInfo(
+                            timeFieldIndex,
+                            sliceEndIndex,
+                            windowEndIndex,
+                            size,
+                            offset,
+                            windowStrategy,
+                            shiftTimeZone.getId(),
+                            aggInfo);
+            operator = new NativeLocalWindowOperator(windowInfo);
+        } else {
+            throw new UnsupportedOperationException(windowSpec + " is not 
supported yet.");
+        }
+
+        // partitioned aggregation
+        final OneInputTransformation<NativeMemoryRegion, NativeMemoryRegion> 
transform =
+                ExecNodeUtil.createOneInputTransformation(
+                        columnarInput,
+                        
createTransformationMeta(LOCAL_WINDOW_AGGREGATE_TRANSFORMATION, config),
+                        operator,
+                        new NativeMemoryRegionTypeInfo(
+                                NativeTypeUtils.toNativeType(getOutputType())),
+                        columnarInput.getParallelism());
+
+        RowDataKeySelector selector =
+                KeySelectorUtil.getRowDataSelector(grouping, 
InternalTypeInfo.of(inputRowType));
+        // set KeyType and Selector for state
+        transform.setStateKeySelector(null);
+        transform.setStateKeyType(selector.getProducedType());
+        return (OneInputTransformation) transform;
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java
index 238916ee96e..f3f35a298ab 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecWindowAggregate.java
@@ -18,9 +18,17 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.stream;
 
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.tools.RelBuilder;
 import org.apache.flink.FlinkVersion;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.configuration.TaskManagerOptions;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.streaming.api.natives.types.NativeMemoryRegion;
+import org.apache.flink.streaming.api.natives.types.NativeMemoryRegionTypeInfo;
+import org.apache.flink.streaming.api.natives.types.NativeType;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
@@ -28,17 +36,11 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
 import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator;
 import org.apache.flink.table.planner.delegation.PlannerBase;
-import org.apache.flink.table.planner.plan.logical.WindowingStrategy;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
-import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
-import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import org.apache.flink.table.planner.plan.logical.*;
+import org.apache.flink.table.planner.plan.nodes.exec.*;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonNativeUtil;
 import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
-import org.apache.flink.table.planner.plan.utils.AggregateInfoList;
-import org.apache.flink.table.planner.plan.utils.AggregateUtil;
-import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
+import org.apache.flink.table.planner.plan.utils.*;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
 import org.apache.flink.table.planner.utils.TableConfigUtils;
 import 
org.apache.flink.table.runtime.generated.GeneratedNamespaceAggsHandleFunction;
@@ -46,25 +48,25 @@ import 
org.apache.flink.table.runtime.groupwindow.NamedWindowProperty;
 import org.apache.flink.table.runtime.groupwindow.WindowProperty;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import 
org.apache.flink.table.runtime.operators.aggregate.window.SlicingWindowAggOperatorBuilder;
+import org.apache.flink.table.runtime.operators.natives.*;
 import org.apache.flink.table.runtime.operators.window.slicing.SliceAssigner;
 import 
org.apache.flink.table.runtime.operators.window.slicing.SliceSharedAssigner;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.runtime.typeutils.PagedTypeSerializer;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.runtime.util.TimeWindowUtil;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-
-import org.apache.calcite.rel.core.AggregateCall;
-import org.apache.calcite.tools.RelBuilder;
-
+import javax.annotation.Nullable;
 import java.time.ZoneId;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -83,7 +85,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
         producedTransformations = 
StreamExecWindowAggregate.WINDOW_AGGREGATE_TRANSFORMATION,
         minPlanVersion = FlinkVersion.v1_15,
         minStateVersion = FlinkVersion.v1_15)
-public class StreamExecWindowAggregate extends StreamExecWindowAggregateBase {
+public class StreamExecWindowAggregate extends StreamExecWindowAggregateBase
+        implements NativeSupportedExec {
 
     public static final String WINDOW_AGGREGATE_TRANSFORMATION = 
"window-aggregate";
 
@@ -150,6 +153,11 @@ public class StreamExecWindowAggregate extends 
StreamExecWindowAggregateBase {
     @Override
     protected Transformation<RowData> translateToPlanInternal(
             PlannerBase planner, ExecNodeConfig config) {
+        Transformation<RowData> nativePlan = translateToNativePlan(planner, 
config);
+        if (nativePlan != null) {
+            return nativePlan;
+        }
+
         final ExecEdge inputEdge = getInputEdges().get(0);
         final Transformation<RowData> inputTransform =
                 (Transformation<RowData>) inputEdge.translateToPlan(planner);
@@ -248,4 +256,193 @@ public class StreamExecWindowAggregate extends 
StreamExecWindowAggregateBase {
                 sliceAssigner,
                 shiftTimeZone);
     }
+
+    @SuppressWarnings("unchecked")
+    @Nullable
+    private Transformation<RowData> translateToNativePlan(
+            PlannerBase planner, ExecNodeConfig config) {
+        final ZoneId shiftTimeZone =
+                TimeWindowUtil.getShiftTimeZone(
+                        windowing.getTimeAttributeType(),
+                        TableConfigUtils.getLocalTimeZone(config));
+
+        // TODO support other window types
+        if (!(windowing.getWindow() instanceof TumblingWindowSpec)) {
+            return null;
+        }
+
+        NativeWindowAggInfo.WindowStrategy windowStrategy;
+        int timeFieldIndex;
+        int sliceEndIndex = -1;
+        int windowEndIndex = -1;
+
+        WindowSpec windowSpec = windowing.getWindow();
+        if (windowing instanceof WindowAttachedWindowingStrategy) {
+            windowEndIndex =
+                    ((WindowAttachedWindowingStrategy) 
windowing).getWindowEnd();
+            timeFieldIndex = Integer.MAX_VALUE;
+
+            windowStrategy = NativeWindowAggInfo.WindowStrategy. 
WINDOW_ATTACHED:
+        } else if (windowing instanceof SliceAttachedWindowingStrategy) {
+            sliceEndIndex = ((SliceAttachedWindowingStrategy) 
windowing).getSliceEnd();
+            timeFieldIndex = Integer.MAX_VALUE;
+            windowStrategy = NativeWindowAggInfo.WindowStrategy.SLICE_ATTACHED:
+        } else if (windowing instanceof TimeAttributeWindowingStrategy) {
+            if (windowing.isRowtime()) {
+                timeFieldIndex =
+                        ((TimeAttributeWindowingStrategy) 
windowing).getTimeAttributeIndex();
+            } else {
+                timeFieldIndex = -1;
+            }
+            windowStrategy = NativeWindowAggInfo.WindowStrategy.TIME_ATTRIBUTE;
+        } else {
+            throw new UnsupportedOperationException(windowing + " is not 
supported yet.");
+        }
+
+        final ExecEdge inputEdge = getInputEdges().get(0);
+        Transformation<NativeMemoryRegion> columnarInput =
+                (Transformation<NativeMemoryRegion>) 
inputEdge.translateToPlanNative(planner);
+        final RowType inputRowType = (RowType) inputEdge.getOutputType();
+
+        if (!CommonNativeUtil.isSupportedDataType(inputRowType.getChildren())) 
{
+            return null;
+        }
+
+
+    // Hopping window requires additional COUNT(*) to determine whether to 
register next timer
+    // through whether the current fired window is empty, see 
SliceSharedWindowAggProcessor.
+        final AggregateInfoList aggInfoList = getStateBackedaggInfoList();
+        if (CommonNativeUtil.isSupportedAggregation(aggInfoList)) {
+            return null;
+        }
+
+        int[] distinctIndex = new int[aggInfoList.aggInfos().length];
+        int[] filterIndex = new int[aggInfoList.aggInfos().length];
+        Arrays.fill(distinctIndex, -1);
+        Arrays.fill(filterIndex, -1);
+
+        NativeDistinctInfo[] distinctInfos =
+                new NativeDistinctInfo[aggInfoList.distinctInfos().length];
+        int index = 0;
+        for (DistinctInfo info : aggInfoList.distinctInfos()) {
+            ArrayList<Integer> filters = new 
ArrayList<>(info.filterArgs().length());
+            info.filterArgs().foreach(filter -> filters.add((Integer) filter));
+
+            ArrayList<Integer> aggIds = new 
ArrayList<>(info.aggIndexes().length());
+            int finalIndex = index;
+
+            AtomicInteger filter = new AtomicInteger(0);
+            info.aggIndexes()
+                    .foreach(
+                            aggId -> {
+                                distinctIndex[(int) aggId] = finalIndex;
+                                filterIndex[(int) aggId] = 
filters.get(filter.getAndIncrement());
+                                aggIds.add((Integer) aggId);
+                                return true;
+                            });
+
+            List<LogicalType> logicalKeyTypes =
+                    info.keyType().getChildren().isEmpty()
+                            ? 
Collections.singletonList(info.keyType().getLogicalType())
+                            : info.keyType().getChildren().stream()
+                            
.map(DataType::getLogicalType).collect(Collectors.toList());
+
+            if (!CommonNativeUtil.isSupportedDataType(logicalKeyTypes)) {
+                return null;
+            }
+
+            NativeType[] keyTypes = 
CommonNativeUtil.getNativeDataType(logicalKeyTypes);
+            NativeDistinctInfo distinctInfo =
+                    new NativeDistinctInfo(
+                            info.consumeRetraction(),
+                            -1,
+                            keyTypes,
+                            info.argIndexes(),
+                            aggIds.stream().mapToInt(aggId -> aggId).toArray(),
+                            filters.stream().mapToInt(aggId -> 
aggId).toArray());
+            distinctInfos[index++] = distinctInfo;
+        }
+
+        NativeAggFunctionInfo[] aggFunctions =
+                new NativeAggFunctionInfo[aggInfoList.aggInfos().length];
+        for (AggregateInfo info : aggInfoList.aggInfos()) {
+            NativeAggFunctionInfo function =
+                    new NativeAggFunctionInfo(
+                            info.consumeRetraction(),
+                            distinctIndex[info.aggIndex()],
+                            info.aggIndex(),
+                            info.argIndexes(),
+                            distinctIndex[info.aggIndex()] >= 0
+                                    ? filterIndex[info.aggIndex()]
+                                    : info.agg().filterArg,
+                            CommonNativeUtil.getNativeAggFunctionType(info),
+                            NativeTypeUtils.toNativeType(
+                                    
info.externalResultType().getLogicalType()));
+            aggFunctions[info.aggIndex()] = function;
+        }
+
+        NativeGroupAggInfo aggInfo =
+                new NativeGroupAggInfo(
+                        false,
+                        false,
+                        aggInfoList.countStarInserted(),
+                        aggInfoList.getIndexOfCountStar(),
+                        config.get(TaskManagerOptions.TASK_NATIVE_BATCH_SIZE),
+                        
config.get(TaskManagerOptions.TASK_NATIVE_FINAL_AGG_BUFFER_SIZE),
+                        
config.get(TaskManagerOptions.TASK_NATIVE_AGG_DISTINCT_TABLE_INITIAL_SIZE),
+                        
config.get(TaskManagerOptions.TASK_NATIVE_AGG_RESULT_TABLE_INITIAL_SIZE),
+                        
config.get(TaskManagerOptions.TASK_NATIVE_AGG_STRING_SET_INITIAL_SIZE),
+                        new int[0],
+                        grouping,
+                        
CommonNativeUtil.getNativeDataType(inputRowType.getChildren()),
+                        aggFunctions,
+                        distinctInfos);
+        CommonNativeUtil.validateGroupAggInfo(aggInfo);
+
+        OneInputStreamOperator<NativeMemoryRegion, NativeMemoryRegion> 
operator;
+        if (windowSpec instanceof TumblingWindowSpec) {
+            long offset = 0;
+            long size = ((TumblingWindowSpec) windowSpec).getSize().toMillis();
+            if (((TumblingWindowSpec) windowSpec).getOffset() != null) {
+                offset = ((TumblingWindowSpec) 
windowSpec).getOffset().toMillis();
+            }
+
+            NativeWindowAggInfo windowInfo = new NativeWindowAggInfo(
+                    timeFieldIndex,
+                    sliceEndIndex,
+                    windowEndIndex,
+                    size,
+                    offset,
+                    windowStrategy,
+                    shiftTimeZone.getId(),
+                    aggInfo);
+            operator = new NativeTumbleWindowOperator(windowInfo);
+        } else {
+            throw new UnsupportedOperationException(windowSpec + " is not 
supported yet.");
+        }
+
+
+// partitioned aggregation
+
+        final OneInputTransformation<NativeMemoryRegion, NativeMemoryRegion> 
transform =
+                ExecNodeUtil.createOneInputTransformation(
+                        columnarInput,
+
+                        
createTransformationMeta(WINDOW_AGGREGATE_TRANSFORMATION, config),
+                        operator,
+
+                        new NativeMemoryRegionTypeInfo(
+
+                                NativeTypeUtils.toNativeType(getOutputType())),
+                        columnarInput.getParallelism());
+
+        RowDataKeySelector selector =
+                KeySelectorUtil.getRowDataSelector(grouping, 
InternalTypeInfo.of(inputRowType));
+// set KeyType and Selector for state
+
+        transform.setStateKeySelector(null);
+
+        transform.setStateKeyType(selector.getProducedType());
+        return (OneInputTransformation)transform;
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/visitor/AbstractExecNodeExactlyOnceVisitor.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/visitor/AbstractExecNodeExactlyOnceVisitor.java
index 1b8cad82b96..ccee5f5c535 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/visitor/AbstractExecNodeExactlyOnceVisitor.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/visitor/AbstractExecNodeExactlyOnceVisitor.java
@@ -26,7 +26,7 @@ import java.util.Set;
 /** Implement of {@link ExecNodeVisitor}. All nodes are visited exactly once. 
*/
 public abstract class AbstractExecNodeExactlyOnceVisitor implements 
ExecNodeVisitor {
 
-    private final Set<ExecNode<?>> visited;
+    protected final Set<ExecNode<?>> visited;
 
     public AbstractExecNodeExactlyOnceVisitor() {
         this.visited = new HashSet<>();

Reply via email to