tsreaper commented on code in PR #21393:
URL: https://github.com/apache/flink/pull/21393#discussion_r1066708360


##########
flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/BlockStatementGrouper.java:
##########
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codesplit;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.codesplit.JavaParser.BlockStatementContext;
+import org.apache.flink.table.codesplit.JavaParser.StatementContext;
+
+import org.antlr.v4.runtime.CharStreams;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.ParserRuleContext;
+import org.antlr.v4.runtime.Token;
+import org.antlr.v4.runtime.TokenStreamRewriter;
+import org.antlr.v4.runtime.atn.PredictionMode;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Groups end extract single line statements such as operations on 
fields/local variables and
+ * extract new method for each group making them smaller.
+ *
+ * <p><i>Before</i>
+ *
+ * <pre><code>
+ * {
+ *     a[0] += b[1];
+ *     b[1] += a[1];
+ *     while (counter > 0) {
+ *         myFun_whileBody0_0(a, b);
+ *         if (a[0] > 0) {
+ *             myFun_whileBody0_0_ifBody0(a, b);
+ *         } else {
+ *             myFun_whileBody0_0_ifBody1(a, b);
+ *         }
+ *         counter--;
+ *     }
+ *
+ *     a[2] += b[2];
+ *     b[3] += a[3];
+ * }
+ *
+ * </code></pre>
+ *
+ * <p><i>After</i>
+ *
+ * <pre><code>
+ * {
+ *      myFun_rewriteGroup1(a, b);
+ *      myFun_rewriteGroup2(a, b);
+ * }
+ * </code></pre>
+ *
+ * <p>Where bodies of extracted "methods" are:
+ *
+ * <pre><code>
+ *     myFun_rewriteGroup1 ->
+ *         a[0] += b[1];
+ *         b[1] += a[1];
+ *         while (counter > 0) {
+ *             myFun_rewriteGroup0_rewriteGroup1(a, b);
+ *             counter--;
+ *         }
+ * </code></pre>
+ *
+ * <pre><code>
+ *     myFun_rewriteGroup2 ->
+ *         a[2] += b[2];
+ *         b[3] += a[3];
+ * </code></pre>
+ *
+ * <pre><code>
+ *     myFun_rewriteGroup0_rewriteGroup1 ->
+ *         myFun_whileBody0_0(a, b);
+ *         if (a[0] > 0) {
+ *             myFun_whileBody0_0_ifBody0(a, b);
+ *         } else {
+ *             myFun_whileBody0_0_ifBody1(a, b);
+ *         }
+ * </code></pre>
+ */
+@Internal
+public class BlockStatementGrouper {
+
+    private final String code;
+
+    private final long maxMethodLength;
+
+    private final String parameters;
+
+    /**
+     * Initialize new BlockStatementGrouper.
+     *
+     * @param code code block that should be rewritten for statement grouping.
+     * @param maxMethodLength maximal length of the extracted code block.
+     * @param parameters parameters definition that should be used for 
extracted methods.
+     */
+    public BlockStatementGrouper(String code, long maxMethodLength, String 
parameters) {
+        this.code = code;
+        this.maxMethodLength = maxMethodLength;
+        this.parameters = parameters;
+    }
+
+    /**
+     * Rewrite code block used for initialization of this object. The code 
block is grouped into new
+     * methods.
+     *
+     * @param context prefix used for extracted group names.
+     * @return {@link RewriteGroupedCode} representing rewritten code block 
and containing extracted
+     *     groups with their names and content.
+     */
+    public RewriteGroupedCode rewrite(String context) {
+
+        BlockStatementGrouperVisitor visitor =
+                new BlockStatementGrouperVisitor(code, context, 
maxMethodLength, parameters);
+        JavaParser javaParser = new JavaParser(visitor.tokenStream);
+        javaParser.getInterpreter().setPredictionMode(PredictionMode.SLL);
+        visitor.visitStatement(javaParser.statement());
+
+        visitor.rewriteToGroups();
+        List<Pair<String, List<LocalGroupElement>>> groups = 
visitor.getGroups();
+
+        List<Pair<String, List<String>>> groupStrings = new 
ArrayList<>(groups.size());
+        for (Pair<String, List<LocalGroupElement>> group : groups) {
+            List<String> collectedStringGroups =
+                    group.getValue().stream()
+                            .map(LocalGroupElement::getBody)
+                            .collect(Collectors.toList());
+            groupStrings.add(Pair.of(group.getKey(), collectedStringGroups));
+        }
+
+        return new RewriteGroupedCode(visitor.rewriter.getText(), 
groupStrings);
+    }
+
+    private static class BlockStatementGrouperVisitor extends 
JavaParserBaseVisitor<Void> {
+
+        private final List<BlockStatementGrouperVisitor> children = new 
ArrayList<>();
+
+        private final String context;
+
+        private final CommonTokenStream tokenStream;
+
+        private final TokenStreamRewriter rewriter;
+
+        private final long maxMethodLength;
+
+        private final List<Pair<String, List<LocalGroupElement>>> groups = new 
ArrayList<>();
+
+        private final String parameters;
+
+        private int counter = 0;
+
+        private BlockStatementGrouperVisitor(
+                String code, String context, long maxMethodLength, String 
parameters) {
+            this.tokenStream = new CommonTokenStream(new 
JavaLexer(CharStreams.fromString(code)));
+            this.rewriter = new TokenStreamRewriter(tokenStream);
+            this.context = context;
+            this.maxMethodLength = maxMethodLength;
+            this.parameters = parameters;
+        }
+
+        @Override
+        public Void visitStatement(StatementContext ctx) {
+
+            if (ctx.getChildCount() == 0) {
+                return null;
+            }
+
+            if (ctx.WHILE() != null || ctx.IF() != null || ctx.ELSE() != null) 
{
+                for (StatementContext statement : ctx.statement()) {
+                    if (shouldExtract(statement)) {
+                        groupBlock(statement);
+                    }
+                }
+            } else {
+                if (shouldExtract(ctx)) {
+                    groupBlock(ctx);
+                }
+            }
+
+            return null;
+        }
+
+        public List<Pair<String, List<LocalGroupElement>>> getGroups() {
+            List<Pair<String, List<LocalGroupElement>>> groupsTmp = new 
ArrayList<>();
+            for (BlockStatementGrouperVisitor child : children) {
+                groupsTmp.addAll(child.getGroups());
+            }
+
+            groupsTmp.addAll(this.groups);
+
+            return groupsTmp;
+        }
+
+        private void addGroups(List<Pair<String, List<LocalGroupElement>>> 
groups) {
+            this.groups.addAll(groups);
+        }
+
+        private void groupBlock(StatementContext ctx) {
+            int localCounter = this.counter++;
+            int localGroupCodeLength = 0;
+            List<LocalGroupElement> localGroup = new ArrayList<>();
+            for (BlockStatementContext bsc : ctx.block().blockStatement()) {
+
+                StatementContext statement = bsc.statement();
+                if (statement.IF() != null
+                        || statement.ELSE() != null
+                        || statement.WHILE() != null) {
+                    String localContext = context + "_rewriteGroup" + 
localCounter;
+                    BlockStatementGrouperVisitor visitor =
+                            new BlockStatementGrouperVisitor(
+                                    CodeSplitUtil.getContextString(statement),
+                                    localContext,
+                                    maxMethodLength,
+                                    parameters);
+                    JavaParser javaParser = new 
JavaParser(visitor.tokenStream);
+                    
javaParser.getInterpreter().setPredictionMode(PredictionMode.SLL);
+                    visitor.visitStatement(javaParser.statement());
+
+                    localGroup.add(new RewriteContextGroupElement(statement, 
visitor.rewriter));
+                    children.add(visitor);
+                    localCounter = this.counter++;
+                } else {
+
+                    if (localGroupCodeLength + 1 + bsc.getText().length() <= 
maxMethodLength) {
+                        localGroup.add(new ContextGroupElement(bsc));
+                        localGroupCodeLength += bsc.getText().length();
+                    } else {
+                        if (localGroup.size() > 1
+                                || (localGroup.size() == 1
+                                        && canGroupAsSingleStatement(
+                                                
localGroup.get(0).getContext()))) {
+                            String localContext = context + "_rewriteGroup" + 
localCounter;
+                            groups.add(Pair.of(localContext, localGroup));
+                            localCounter = this.counter++;
+                            localGroup = new ArrayList<>();
+                        }
+                        localGroupCodeLength = bsc.getText().length();
+                        localGroup.add(new ContextGroupElement(bsc));
+                    }
+                }
+            }
+
+            if (localGroup.size() > 1
+                    || (localGroup.size() == 1
+                            && 
canGroupAsSingleStatement(localGroup.get(0).getContext()))) {
+                String localContext = context + "_rewriteGroup" + localCounter;
+                groups.add(Pair.of(localContext, localGroup));
+            }
+        }
+
+        private boolean canGroupAsSingleStatement(ParserRuleContext context) {
+
+            StatementContext statement;
+
+            if (context instanceof StatementContext) {
+                statement = (StatementContext) context;
+            } else if (context instanceof BlockStatementContext) {
+                statement = ((BlockStatementContext) context).statement();
+            } else {
+                return false;
+            }
+
+            return statement != null
+                    && (statement.IF() != null
+                            || statement.ELSE() != null
+                            || statement.WHILE() != null);
+        }
+
+        private boolean shouldExtract(StatementContext ctx) {
+            return ctx != null
+                    && ctx.block() != null
+                    && ctx.block().blockStatement() != null
+                    // if there is only one statement in the block it's 
useless to extract
+                    // it into a separate function
+                    && ctx.block().blockStatement().size() > 1
+                    // should not extract blocks with return statements
+                    && getNumReturnsInContext(ctx.block()) == 0;
+        }
+
+        private int getNumReturnsInContext(ParserRuleContext ctx) {
+            ReturnCounter counter = new ReturnCounter();
+            counter.visit(ctx);
+            return counter.returnCount;
+        }
+
+        public List<Pair<String, List<LocalGroupElement>>> rewriteToGroups() {
+            for (BlockStatementGrouperVisitor child : children) {
+                child.rewriteToGroups();
+            }
+
+            for (Pair<String, List<LocalGroupElement>> group : groups) {
+                List<LocalGroupElement> value = group.getValue();
+                rewriter.replace(
+                        value.get(0).getStart(),
+                        value.get(value.size() - 1).getStop(),
+                        group.getKey() + "(" + this.parameters + ");");
+            }
+
+            return groups;
+        }
+    }
+
+    private interface LocalGroupElement {
+
+        Token getStart();
+
+        Token getStop();
+
+        String getBody();
+
+        ParserRuleContext getContext();
+    }
+
+    private static class ContextGroupElement implements LocalGroupElement {
+
+        private final ParserRuleContext parserRuleContext;
+
+        private ContextGroupElement(ParserRuleContext parserRuleContext) {
+            this.parserRuleContext = parserRuleContext;
+        }
+
+        @Override
+        public Token getStart() {
+            return this.parserRuleContext.start;
+        }
+
+        @Override
+        public Token getStop() {
+            return this.parserRuleContext.stop;
+        }
+
+        @Override
+        public String getBody() {
+            return CodeSplitUtil.getContextString(this.parserRuleContext);
+        }
+
+        @Override
+        public ParserRuleContext getContext() {
+            return this.parserRuleContext;
+        }
+    }
+
+    private static class RewriteContextGroupElement implements 
LocalGroupElement {
+
+        private final ParserRuleContext parserRuleContext;
+
+        private final TokenStreamRewriter rewriter;
+
+        private RewriteContextGroupElement(
+                ParserRuleContext parserRuleContext, TokenStreamRewriter 
rewriter) {
+            this.parserRuleContext = parserRuleContext;
+            this.rewriter = rewriter;
+        }
+
+        @Override
+        public Token getStart() {
+            return this.parserRuleContext.start;
+        }
+
+        @Override
+        public Token getStop() {
+            return this.parserRuleContext.stop;
+        }
+
+        @Override
+        public String getBody() {
+            return this.rewriter.getText();
+        }
+
+        @Override
+        public ParserRuleContext getContext() {
+            return this.parserRuleContext;
+        }
+    }
+
+    private static class ReturnCounter extends JavaParserBaseVisitor<Void> {
+
+        private int returnCount = 0;
+
+        @Override
+        public Void visitStatement(StatementContext ctx) {
+            if (ctx.RETURN() != null) {
+                returnCount++;
+            }
+            return visitChildren(ctx);
+        }
+    }
+
+    /**
+     * This object represents a rewritten code block. It contains its new form 
along with all
+     * extracted groups and their names.
+     */
+    public static class RewriteGroupedCode {
+
+        /** Rewritten code block containing calls to extracted methods. */
+        private final String rewriteCode;
+
+        /** All extracted groups with their names. */
+        private final List<Pair<String, List<String>>> groups;

Review Comment:
   `Pair` -> `Tuple2`. `Tuple2` is a Flink built-in class.



##########
flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/BlockStatementGrouper.java:
##########
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codesplit;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.codesplit.JavaParser.BlockStatementContext;
+import org.apache.flink.table.codesplit.JavaParser.StatementContext;
+
+import org.antlr.v4.runtime.CharStreams;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.ParserRuleContext;
+import org.antlr.v4.runtime.Token;
+import org.antlr.v4.runtime.TokenStreamRewriter;
+import org.antlr.v4.runtime.atn.PredictionMode;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Groups end extract single line statements such as operations on 
fields/local variables and

Review Comment:
   What are "end extract single line statements"? Could you explain more?



##########
flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/BlockStatementGrouper.java:
##########
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codesplit;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.codesplit.JavaParser.BlockStatementContext;
+import org.apache.flink.table.codesplit.JavaParser.StatementContext;
+
+import org.antlr.v4.runtime.CharStreams;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.ParserRuleContext;
+import org.antlr.v4.runtime.Token;
+import org.antlr.v4.runtime.TokenStreamRewriter;
+import org.antlr.v4.runtime.atn.PredictionMode;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Groups end extract single line statements such as operations on 
fields/local variables and
+ * extract new method for each group making them smaller.
+ *
+ * <p><i>Before</i>
+ *
+ * <pre><code>
+ * {
+ *     a[0] += b[1];
+ *     b[1] += a[1];
+ *     while (counter > 0) {
+ *         myFun_whileBody0_0(a, b);
+ *         if (a[0] > 0) {
+ *             myFun_whileBody0_0_ifBody0(a, b);
+ *         } else {
+ *             myFun_whileBody0_0_ifBody1(a, b);
+ *         }
+ *         counter--;
+ *     }
+ *
+ *     a[2] += b[2];
+ *     b[3] += a[3];
+ * }
+ *
+ * </code></pre>
+ *
+ * <p><i>After</i>
+ *
+ * <pre><code>
+ * {
+ *      myFun_rewriteGroup1(a, b);
+ *      myFun_rewriteGroup2(a, b);
+ * }
+ * </code></pre>
+ *
+ * <p>Where bodies of extracted "methods" are:
+ *
+ * <pre><code>
+ *     myFun_rewriteGroup1 ->
+ *         a[0] += b[1];
+ *         b[1] += a[1];
+ *         while (counter > 0) {
+ *             myFun_rewriteGroup0_rewriteGroup1(a, b);
+ *             counter--;
+ *         }
+ * </code></pre>
+ *
+ * <pre><code>
+ *     myFun_rewriteGroup2 ->
+ *         a[2] += b[2];
+ *         b[3] += a[3];
+ * </code></pre>
+ *
+ * <pre><code>
+ *     myFun_rewriteGroup0_rewriteGroup1 ->
+ *         myFun_whileBody0_0(a, b);
+ *         if (a[0] > 0) {
+ *             myFun_whileBody0_0_ifBody0(a, b);
+ *         } else {
+ *             myFun_whileBody0_0_ifBody1(a, b);
+ *         }
+ * </code></pre>
+ */
+@Internal
+public class BlockStatementGrouper {
+
+    private final String code;
+
+    private final long maxMethodLength;
+
+    private final String parameters;
+
+    /**
+     * Initialize new BlockStatementGrouper.
+     *
+     * @param code code block that should be rewritten for statement grouping.
+     * @param maxMethodLength maximal length of the extracted code block.
+     * @param parameters parameters definition that should be used for 
extracted methods.
+     */
+    public BlockStatementGrouper(String code, long maxMethodLength, String 
parameters) {
+        this.code = code;
+        this.maxMethodLength = maxMethodLength;
+        this.parameters = parameters;
+    }
+
+    /**
+     * Rewrite code block used for initialization of this object. The code 
block is grouped into new
+     * methods.
+     *
+     * @param context prefix used for extracted group names.
+     * @return {@link RewriteGroupedCode} representing rewritten code block 
and containing extracted
+     *     groups with their names and content.
+     */
+    public RewriteGroupedCode rewrite(String context) {
+
+        BlockStatementGrouperVisitor visitor =
+                new BlockStatementGrouperVisitor(code, context, 
maxMethodLength, parameters);
+        JavaParser javaParser = new JavaParser(visitor.tokenStream);
+        javaParser.getInterpreter().setPredictionMode(PredictionMode.SLL);
+        visitor.visitStatement(javaParser.statement());
+
+        visitor.rewriteToGroups();
+        List<Pair<String, List<LocalGroupElement>>> groups = 
visitor.getGroups();
+
+        List<Pair<String, List<String>>> groupStrings = new 
ArrayList<>(groups.size());
+        for (Pair<String, List<LocalGroupElement>> group : groups) {
+            List<String> collectedStringGroups =
+                    group.getValue().stream()
+                            .map(LocalGroupElement::getBody)
+                            .collect(Collectors.toList());
+            groupStrings.add(Pair.of(group.getKey(), collectedStringGroups));
+        }
+
+        return new RewriteGroupedCode(visitor.rewriter.getText(), 
groupStrings);
+    }
+
+    private static class BlockStatementGrouperVisitor extends 
JavaParserBaseVisitor<Void> {
+
+        private final List<BlockStatementGrouperVisitor> children = new 
ArrayList<>();
+
+        private final String context;
+
+        private final CommonTokenStream tokenStream;
+
+        private final TokenStreamRewriter rewriter;
+
+        private final long maxMethodLength;
+
+        private final List<Pair<String, List<LocalGroupElement>>> groups = new 
ArrayList<>();
+
+        private final String parameters;
+
+        private int counter = 0;
+
+        private BlockStatementGrouperVisitor(
+                String code, String context, long maxMethodLength, String 
parameters) {
+            this.tokenStream = new CommonTokenStream(new 
JavaLexer(CharStreams.fromString(code)));
+            this.rewriter = new TokenStreamRewriter(tokenStream);
+            this.context = context;
+            this.maxMethodLength = maxMethodLength;
+            this.parameters = parameters;
+        }
+
+        @Override
+        public Void visitStatement(StatementContext ctx) {
+
+            if (ctx.getChildCount() == 0) {
+                return null;
+            }
+
+            if (ctx.WHILE() != null || ctx.IF() != null || ctx.ELSE() != null) 
{
+                for (StatementContext statement : ctx.statement()) {
+                    if (shouldExtract(statement)) {
+                        groupBlock(statement);
+                    }
+                }
+            } else {
+                if (shouldExtract(ctx)) {
+                    groupBlock(ctx);
+                }
+            }
+
+            return null;
+        }
+
+        public List<Pair<String, List<LocalGroupElement>>> getGroups() {
+            List<Pair<String, List<LocalGroupElement>>> groupsTmp = new 
ArrayList<>();
+            for (BlockStatementGrouperVisitor child : children) {
+                groupsTmp.addAll(child.getGroups());
+            }
+
+            groupsTmp.addAll(this.groups);
+
+            return groupsTmp;
+        }
+
+        private void addGroups(List<Pair<String, List<LocalGroupElement>>> 
groups) {
+            this.groups.addAll(groups);
+        }

Review Comment:
   What's the usage of this method? No one is calling it.



##########
flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/BlockStatementGrouper.java:
##########
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codesplit;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.codesplit.JavaParser.BlockStatementContext;
+import org.apache.flink.table.codesplit.JavaParser.StatementContext;
+
+import org.antlr.v4.runtime.CharStreams;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.ParserRuleContext;
+import org.antlr.v4.runtime.Token;
+import org.antlr.v4.runtime.TokenStreamRewriter;
+import org.antlr.v4.runtime.atn.PredictionMode;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Groups end extract single line statements such as operations on 
fields/local variables and
+ * extract new method for each group making them smaller.
+ *
+ * <p><i>Before</i>
+ *
+ * <pre><code>
+ * {
+ *     a[0] += b[1];
+ *     b[1] += a[1];
+ *     while (counter > 0) {
+ *         myFun_whileBody0_0(a, b);
+ *         if (a[0] > 0) {
+ *             myFun_whileBody0_0_ifBody0(a, b);
+ *         } else {
+ *             myFun_whileBody0_0_ifBody1(a, b);
+ *         }
+ *         counter--;
+ *     }
+ *
+ *     a[2] += b[2];
+ *     b[3] += a[3];
+ * }
+ *
+ * </code></pre>
+ *
+ * <p><i>After</i>
+ *
+ * <pre><code>
+ * {
+ *      myFun_rewriteGroup1(a, b);
+ *      myFun_rewriteGroup2(a, b);
+ * }
+ * </code></pre>
+ *
+ * <p>Where bodies of extracted "methods" are:
+ *
+ * <pre><code>
+ *     myFun_rewriteGroup1 ->
+ *         a[0] += b[1];
+ *         b[1] += a[1];
+ *         while (counter > 0) {
+ *             myFun_rewriteGroup0_rewriteGroup1(a, b);
+ *             counter--;
+ *         }
+ * </code></pre>
+ *
+ * <pre><code>
+ *     myFun_rewriteGroup2 ->
+ *         a[2] += b[2];
+ *         b[3] += a[3];
+ * </code></pre>
+ *
+ * <pre><code>
+ *     myFun_rewriteGroup0_rewriteGroup1 ->
+ *         myFun_whileBody0_0(a, b);
+ *         if (a[0] > 0) {
+ *             myFun_whileBody0_0_ifBody0(a, b);
+ *         } else {
+ *             myFun_whileBody0_0_ifBody1(a, b);
+ *         }
+ * </code></pre>
+ */
+@Internal
+public class BlockStatementGrouper {
+
+    private final String code;
+
+    private final long maxMethodLength;
+
+    private final String parameters;
+
+    /**
+     * Initialize new BlockStatementGrouper.
+     *
+     * @param code code block that should be rewritten for statement grouping.
+     * @param maxMethodLength maximal length of the extracted code block.
+     * @param parameters parameters definition that should be used for 
extracted methods.
+     */
+    public BlockStatementGrouper(String code, long maxMethodLength, String 
parameters) {
+        this.code = code;
+        this.maxMethodLength = maxMethodLength;
+        this.parameters = parameters;
+    }
+
+    /**
+     * Rewrite code block used for initialization of this object. The code 
block is grouped into new
+     * methods.
+     *
+     * @param context prefix used for extracted group names.
+     * @return {@link RewriteGroupedCode} representing rewritten code block 
and containing extracted
+     *     groups with their names and content.
+     */
+    public RewriteGroupedCode rewrite(String context) {
+
+        BlockStatementGrouperVisitor visitor =
+                new BlockStatementGrouperVisitor(code, context, 
maxMethodLength, parameters);
+        JavaParser javaParser = new JavaParser(visitor.tokenStream);
+        javaParser.getInterpreter().setPredictionMode(PredictionMode.SLL);
+        visitor.visitStatement(javaParser.statement());
+
+        visitor.rewriteToGroups();
+        List<Pair<String, List<LocalGroupElement>>> groups = 
visitor.getGroups();
+
+        List<Pair<String, List<String>>> groupStrings = new 
ArrayList<>(groups.size());
+        for (Pair<String, List<LocalGroupElement>> group : groups) {
+            List<String> collectedStringGroups =
+                    group.getValue().stream()
+                            .map(LocalGroupElement::getBody)
+                            .collect(Collectors.toList());
+            groupStrings.add(Pair.of(group.getKey(), collectedStringGroups));
+        }
+
+        return new RewriteGroupedCode(visitor.rewriter.getText(), 
groupStrings);
+    }
+
+    private static class BlockStatementGrouperVisitor extends 
JavaParserBaseVisitor<Void> {
+
+        private final List<BlockStatementGrouperVisitor> children = new 
ArrayList<>();
+
+        private final String context;
+
+        private final CommonTokenStream tokenStream;
+
+        private final TokenStreamRewriter rewriter;
+
+        private final long maxMethodLength;
+
+        private final List<Pair<String, List<LocalGroupElement>>> groups = new 
ArrayList<>();
+
+        private final String parameters;
+
+        private int counter = 0;
+
+        private BlockStatementGrouperVisitor(
+                String code, String context, long maxMethodLength, String 
parameters) {
+            this.tokenStream = new CommonTokenStream(new 
JavaLexer(CharStreams.fromString(code)));
+            this.rewriter = new TokenStreamRewriter(tokenStream);
+            this.context = context;
+            this.maxMethodLength = maxMethodLength;
+            this.parameters = parameters;
+        }
+
+        @Override
+        public Void visitStatement(StatementContext ctx) {
+
+            if (ctx.getChildCount() == 0) {
+                return null;
+            }
+
+            if (ctx.WHILE() != null || ctx.IF() != null || ctx.ELSE() != null) 
{
+                for (StatementContext statement : ctx.statement()) {
+                    if (shouldExtract(statement)) {
+                        groupBlock(statement);
+                    }
+                }
+            } else {
+                if (shouldExtract(ctx)) {
+                    groupBlock(ctx);
+                }
+            }
+
+            return null;
+        }
+
+        public List<Pair<String, List<LocalGroupElement>>> getGroups() {
+            List<Pair<String, List<LocalGroupElement>>> groupsTmp = new 
ArrayList<>();
+            for (BlockStatementGrouperVisitor child : children) {
+                groupsTmp.addAll(child.getGroups());
+            }
+
+            groupsTmp.addAll(this.groups);
+
+            return groupsTmp;
+        }
+
+        private void addGroups(List<Pair<String, List<LocalGroupElement>>> 
groups) {
+            this.groups.addAll(groups);
+        }
+
+        private void groupBlock(StatementContext ctx) {
+            int localCounter = this.counter++;
+            int localGroupCodeLength = 0;
+            List<LocalGroupElement> localGroup = new ArrayList<>();
+            for (BlockStatementContext bsc : ctx.block().blockStatement()) {
+
+                StatementContext statement = bsc.statement();
+                if (statement.IF() != null
+                        || statement.ELSE() != null
+                        || statement.WHILE() != null) {
+                    String localContext = context + "_rewriteGroup" + 
localCounter;
+                    BlockStatementGrouperVisitor visitor =
+                            new BlockStatementGrouperVisitor(
+                                    CodeSplitUtil.getContextString(statement),
+                                    localContext,
+                                    maxMethodLength,
+                                    parameters);
+                    JavaParser javaParser = new 
JavaParser(visitor.tokenStream);
+                    
javaParser.getInterpreter().setPredictionMode(PredictionMode.SLL);
+                    visitor.visitStatement(javaParser.statement());
+
+                    localGroup.add(new RewriteContextGroupElement(statement, 
visitor.rewriter));
+                    children.add(visitor);
+                    localCounter = this.counter++;
+                } else {
+
+                    if (localGroupCodeLength + 1 + bsc.getText().length() <= 
maxMethodLength) {
+                        localGroup.add(new ContextGroupElement(bsc));
+                        localGroupCodeLength += bsc.getText().length();
+                    } else {
+                        if (localGroup.size() > 1
+                                || (localGroup.size() == 1
+                                        && canGroupAsSingleStatement(
+                                                
localGroup.get(0).getContext()))) {
+                            String localContext = context + "_rewriteGroup" + 
localCounter;
+                            groups.add(Pair.of(localContext, localGroup));
+                            localCounter = this.counter++;
+                            localGroup = new ArrayList<>();
+                        }
+                        localGroupCodeLength = bsc.getText().length();

Review Comment:
   ```suggestion
                               localGroupCodeLength = 0;
                           }
                           localGroupCodeLength += bsc.getText().length();
   ```



##########
flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/BlockStatementGrouper.java:
##########
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codesplit;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.codesplit.JavaParser.BlockStatementContext;
+import org.apache.flink.table.codesplit.JavaParser.StatementContext;
+
+import org.antlr.v4.runtime.CharStreams;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.ParserRuleContext;
+import org.antlr.v4.runtime.Token;
+import org.antlr.v4.runtime.TokenStreamRewriter;
+import org.antlr.v4.runtime.atn.PredictionMode;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Groups end extract single line statements such as operations on 
fields/local variables and
+ * extract new method for each group making them smaller.
+ *
+ * <p><i>Before</i>
+ *
+ * <pre><code>
+ * {
+ *     a[0] += b[1];
+ *     b[1] += a[1];
+ *     while (counter > 0) {
+ *         myFun_whileBody0_0(a, b);
+ *         if (a[0] > 0) {
+ *             myFun_whileBody0_0_ifBody0(a, b);
+ *         } else {
+ *             myFun_whileBody0_0_ifBody1(a, b);
+ *         }
+ *         counter--;
+ *     }
+ *
+ *     a[2] += b[2];
+ *     b[3] += a[3];
+ * }
+ *
+ * </code></pre>
+ *
+ * <p><i>After</i>
+ *
+ * <pre><code>
+ * {
+ *      myFun_rewriteGroup1(a, b);
+ *      myFun_rewriteGroup2(a, b);
+ * }
+ * </code></pre>
+ *
+ * <p>Where bodies of extracted "methods" are:
+ *
+ * <pre><code>
+ *     myFun_rewriteGroup1 ->
+ *         a[0] += b[1];
+ *         b[1] += a[1];
+ *         while (counter > 0) {
+ *             myFun_rewriteGroup0_rewriteGroup1(a, b);
+ *             counter--;
+ *         }
+ * </code></pre>
+ *
+ * <pre><code>
+ *     myFun_rewriteGroup2 ->
+ *         a[2] += b[2];
+ *         b[3] += a[3];
+ * </code></pre>
+ *
+ * <pre><code>
+ *     myFun_rewriteGroup0_rewriteGroup1 ->
+ *         myFun_whileBody0_0(a, b);
+ *         if (a[0] > 0) {
+ *             myFun_whileBody0_0_ifBody0(a, b);
+ *         } else {
+ *             myFun_whileBody0_0_ifBody1(a, b);
+ *         }
+ * </code></pre>
+ */
+@Internal
+public class BlockStatementGrouper {
+
+    private final String code;
+
+    private final long maxMethodLength;
+
+    private final String parameters;
+
+    /**
+     * Initialize new BlockStatementGrouper.
+     *
+     * @param code code block that should be rewritten for statement grouping.
+     * @param maxMethodLength maximal length of the extracted code block.
+     * @param parameters parameters definition that should be used for 
extracted methods.
+     */
+    public BlockStatementGrouper(String code, long maxMethodLength, String 
parameters) {
+        this.code = code;
+        this.maxMethodLength = maxMethodLength;
+        this.parameters = parameters;
+    }
+
+    /**
+     * Rewrite code block used for initialization of this object. The code 
block is grouped into new
+     * methods.
+     *
+     * @param context prefix used for extracted group names.
+     * @return {@link RewriteGroupedCode} representing rewritten code block 
and containing extracted
+     *     groups with their names and content.
+     */
+    public RewriteGroupedCode rewrite(String context) {
+
+        BlockStatementGrouperVisitor visitor =
+                new BlockStatementGrouperVisitor(code, context, 
maxMethodLength, parameters);
+        JavaParser javaParser = new JavaParser(visitor.tokenStream);
+        javaParser.getInterpreter().setPredictionMode(PredictionMode.SLL);
+        visitor.visitStatement(javaParser.statement());
+
+        visitor.rewriteToGroups();
+        List<Pair<String, List<LocalGroupElement>>> groups = 
visitor.getGroups();
+
+        List<Pair<String, List<String>>> groupStrings = new 
ArrayList<>(groups.size());
+        for (Pair<String, List<LocalGroupElement>> group : groups) {
+            List<String> collectedStringGroups =
+                    group.getValue().stream()
+                            .map(LocalGroupElement::getBody)
+                            .collect(Collectors.toList());
+            groupStrings.add(Pair.of(group.getKey(), collectedStringGroups));
+        }
+
+        return new RewriteGroupedCode(visitor.rewriter.getText(), 
groupStrings);
+    }
+
+    private static class BlockStatementGrouperVisitor extends 
JavaParserBaseVisitor<Void> {
+
+        private final List<BlockStatementGrouperVisitor> children = new 
ArrayList<>();
+
+        private final String context;
+
+        private final CommonTokenStream tokenStream;
+
+        private final TokenStreamRewriter rewriter;
+
+        private final long maxMethodLength;
+
+        private final List<Pair<String, List<LocalGroupElement>>> groups = new 
ArrayList<>();
+
+        private final String parameters;
+
+        private int counter = 0;
+
+        private BlockStatementGrouperVisitor(
+                String code, String context, long maxMethodLength, String 
parameters) {
+            this.tokenStream = new CommonTokenStream(new 
JavaLexer(CharStreams.fromString(code)));
+            this.rewriter = new TokenStreamRewriter(tokenStream);
+            this.context = context;
+            this.maxMethodLength = maxMethodLength;
+            this.parameters = parameters;
+        }
+
+        @Override
+        public Void visitStatement(StatementContext ctx) {
+
+            if (ctx.getChildCount() == 0) {
+                return null;
+            }
+
+            if (ctx.WHILE() != null || ctx.IF() != null || ctx.ELSE() != null) 
{
+                for (StatementContext statement : ctx.statement()) {
+                    if (shouldExtract(statement)) {
+                        groupBlock(statement);
+                    }
+                }
+            } else {
+                if (shouldExtract(ctx)) {
+                    groupBlock(ctx);
+                }
+            }
+
+            return null;
+        }
+
+        public List<Pair<String, List<LocalGroupElement>>> getGroups() {
+            List<Pair<String, List<LocalGroupElement>>> groupsTmp = new 
ArrayList<>();
+            for (BlockStatementGrouperVisitor child : children) {
+                groupsTmp.addAll(child.getGroups());
+            }
+
+            groupsTmp.addAll(this.groups);
+
+            return groupsTmp;
+        }
+
+        private void addGroups(List<Pair<String, List<LocalGroupElement>>> 
groups) {
+            this.groups.addAll(groups);
+        }
+
+        private void groupBlock(StatementContext ctx) {
+            int localCounter = this.counter++;
+            int localGroupCodeLength = 0;
+            List<LocalGroupElement> localGroup = new ArrayList<>();
+            for (BlockStatementContext bsc : ctx.block().blockStatement()) {
+
+                StatementContext statement = bsc.statement();
+                if (statement.IF() != null
+                        || statement.ELSE() != null
+                        || statement.WHILE() != null) {
+                    String localContext = context + "_rewriteGroup" + 
localCounter;
+                    BlockStatementGrouperVisitor visitor =
+                            new BlockStatementGrouperVisitor(
+                                    CodeSplitUtil.getContextString(statement),
+                                    localContext,
+                                    maxMethodLength,
+                                    parameters);
+                    JavaParser javaParser = new 
JavaParser(visitor.tokenStream);
+                    
javaParser.getInterpreter().setPredictionMode(PredictionMode.SLL);
+                    visitor.visitStatement(javaParser.statement());
+
+                    localGroup.add(new RewriteContextGroupElement(statement, 
visitor.rewriter));
+                    children.add(visitor);
+                    localCounter = this.counter++;
+                } else {
+
+                    if (localGroupCodeLength + 1 + bsc.getText().length() <= 
maxMethodLength) {
+                        localGroup.add(new ContextGroupElement(bsc));
+                        localGroupCodeLength += bsc.getText().length();
+                    } else {
+                        if (localGroup.size() > 1
+                                || (localGroup.size() == 1
+                                        && canGroupAsSingleStatement(
+                                                
localGroup.get(0).getContext()))) {
+                            String localContext = context + "_rewriteGroup" + 
localCounter;
+                            groups.add(Pair.of(localContext, localGroup));
+                            localCounter = this.counter++;
+                            localGroup = new ArrayList<>();
+                        }
+                        localGroupCodeLength = bsc.getText().length();
+                        localGroup.add(new ContextGroupElement(bsc));
+                    }
+                }
+            }
+
+            if (localGroup.size() > 1
+                    || (localGroup.size() == 1
+                            && 
canGroupAsSingleStatement(localGroup.get(0).getContext()))) {
+                String localContext = context + "_rewriteGroup" + localCounter;
+                groups.add(Pair.of(localContext, localGroup));
+            }
+        }
+
+        private boolean canGroupAsSingleStatement(ParserRuleContext context) {
+
+            StatementContext statement;
+
+            if (context instanceof StatementContext) {
+                statement = (StatementContext) context;
+            } else if (context instanceof BlockStatementContext) {
+                statement = ((BlockStatementContext) context).statement();
+            } else {
+                return false;
+            }
+
+            return statement != null
+                    && (statement.IF() != null
+                            || statement.ELSE() != null
+                            || statement.WHILE() != null);
+        }
+
+        private boolean shouldExtract(StatementContext ctx) {
+            return ctx != null
+                    && ctx.block() != null
+                    && ctx.block().blockStatement() != null
+                    // if there is only one statement in the block it's 
useless to extract
+                    // it into a separate function
+                    && ctx.block().blockStatement().size() > 1
+                    // should not extract blocks with return statements
+                    && getNumReturnsInContext(ctx.block()) == 0;
+        }

Review Comment:
   What about `break`s and `continue`s in a `while` loop? You can't partition a 
`while` loop into different methods if it has `break`s or `continue`s.



##########
flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/BlockStatementGrouper.java:
##########
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codesplit;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.codesplit.JavaParser.BlockStatementContext;
+import org.apache.flink.table.codesplit.JavaParser.StatementContext;
+
+import org.antlr.v4.runtime.CharStreams;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.ParserRuleContext;
+import org.antlr.v4.runtime.Token;
+import org.antlr.v4.runtime.TokenStreamRewriter;
+import org.antlr.v4.runtime.atn.PredictionMode;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Groups end extract single line statements such as operations on 
fields/local variables and
+ * extract new method for each group making them smaller.
+ *
+ * <p><i>Before</i>
+ *
+ * <pre><code>
+ * {
+ *     a[0] += b[1];
+ *     b[1] += a[1];
+ *     while (counter > 0) {
+ *         myFun_whileBody0_0(a, b);
+ *         if (a[0] > 0) {
+ *             myFun_whileBody0_0_ifBody0(a, b);
+ *         } else {
+ *             myFun_whileBody0_0_ifBody1(a, b);
+ *         }
+ *         counter--;
+ *     }
+ *
+ *     a[2] += b[2];
+ *     b[3] += a[3];
+ * }
+ *
+ * </code></pre>
+ *
+ * <p><i>After</i>
+ *
+ * <pre><code>
+ * {
+ *      myFun_rewriteGroup1(a, b);
+ *      myFun_rewriteGroup2(a, b);
+ * }
+ * </code></pre>
+ *
+ * <p>Where bodies of extracted "methods" are:
+ *
+ * <pre><code>
+ *     myFun_rewriteGroup1 ->
+ *         a[0] += b[1];
+ *         b[1] += a[1];
+ *         while (counter > 0) {
+ *             myFun_rewriteGroup0_rewriteGroup1(a, b);
+ *             counter--;
+ *         }
+ * </code></pre>
+ *
+ * <pre><code>
+ *     myFun_rewriteGroup2 ->
+ *         a[2] += b[2];
+ *         b[3] += a[3];
+ * </code></pre>
+ *
+ * <pre><code>
+ *     myFun_rewriteGroup0_rewriteGroup1 ->
+ *         myFun_whileBody0_0(a, b);
+ *         if (a[0] > 0) {
+ *             myFun_whileBody0_0_ifBody0(a, b);
+ *         } else {
+ *             myFun_whileBody0_0_ifBody1(a, b);
+ *         }
+ * </code></pre>
+ */
+@Internal
+public class BlockStatementGrouper {
+
+    private final String code;
+
+    private final long maxMethodLength;
+
+    private final String parameters;
+
+    /**
+     * Initialize new BlockStatementGrouper.
+     *
+     * @param code code block that should be rewritten for statement grouping.
+     * @param maxMethodLength maximal length of the extracted code block.
+     * @param parameters parameters definition that should be used for 
extracted methods.
+     */
+    public BlockStatementGrouper(String code, long maxMethodLength, String 
parameters) {
+        this.code = code;
+        this.maxMethodLength = maxMethodLength;
+        this.parameters = parameters;
+    }
+
+    /**
+     * Rewrite code block used for initialization of this object. The code 
block is grouped into new
+     * methods.
+     *
+     * @param context prefix used for extracted group names.
+     * @return {@link RewriteGroupedCode} representing rewritten code block 
and containing extracted
+     *     groups with their names and content.
+     */
+    public RewriteGroupedCode rewrite(String context) {
+
+        BlockStatementGrouperVisitor visitor =
+                new BlockStatementGrouperVisitor(code, context, 
maxMethodLength, parameters);
+        JavaParser javaParser = new JavaParser(visitor.tokenStream);
+        javaParser.getInterpreter().setPredictionMode(PredictionMode.SLL);
+        visitor.visitStatement(javaParser.statement());
+
+        visitor.rewriteToGroups();
+        List<Pair<String, List<LocalGroupElement>>> groups = 
visitor.getGroups();
+
+        List<Pair<String, List<String>>> groupStrings = new 
ArrayList<>(groups.size());
+        for (Pair<String, List<LocalGroupElement>> group : groups) {
+            List<String> collectedStringGroups =
+                    group.getValue().stream()
+                            .map(LocalGroupElement::getBody)
+                            .collect(Collectors.toList());
+            groupStrings.add(Pair.of(group.getKey(), collectedStringGroups));
+        }
+
+        return new RewriteGroupedCode(visitor.rewriter.getText(), 
groupStrings);
+    }
+
+    private static class BlockStatementGrouperVisitor extends 
JavaParserBaseVisitor<Void> {
+
+        private final List<BlockStatementGrouperVisitor> children = new 
ArrayList<>();
+
+        private final String context;
+
+        private final CommonTokenStream tokenStream;
+
+        private final TokenStreamRewriter rewriter;
+
+        private final long maxMethodLength;
+
+        private final List<Pair<String, List<LocalGroupElement>>> groups = new 
ArrayList<>();
+
+        private final String parameters;
+
+        private int counter = 0;
+
+        private BlockStatementGrouperVisitor(
+                String code, String context, long maxMethodLength, String 
parameters) {
+            this.tokenStream = new CommonTokenStream(new 
JavaLexer(CharStreams.fromString(code)));
+            this.rewriter = new TokenStreamRewriter(tokenStream);
+            this.context = context;
+            this.maxMethodLength = maxMethodLength;
+            this.parameters = parameters;
+        }
+
+        @Override
+        public Void visitStatement(StatementContext ctx) {
+
+            if (ctx.getChildCount() == 0) {
+                return null;
+            }
+
+            if (ctx.WHILE() != null || ctx.IF() != null || ctx.ELSE() != null) 
{
+                for (StatementContext statement : ctx.statement()) {
+                    if (shouldExtract(statement)) {
+                        groupBlock(statement);
+                    }
+                }
+            } else {
+                if (shouldExtract(ctx)) {
+                    groupBlock(ctx);
+                }
+            }
+
+            return null;
+        }
+
+        public List<Pair<String, List<LocalGroupElement>>> getGroups() {
+            List<Pair<String, List<LocalGroupElement>>> groupsTmp = new 
ArrayList<>();
+            for (BlockStatementGrouperVisitor child : children) {
+                groupsTmp.addAll(child.getGroups());
+            }
+
+            groupsTmp.addAll(this.groups);
+
+            return groupsTmp;
+        }
+
+        private void addGroups(List<Pair<String, List<LocalGroupElement>>> 
groups) {
+            this.groups.addAll(groups);
+        }
+
+        private void groupBlock(StatementContext ctx) {
+            int localCounter = this.counter++;
+            int localGroupCodeLength = 0;
+            List<LocalGroupElement> localGroup = new ArrayList<>();
+            for (BlockStatementContext bsc : ctx.block().blockStatement()) {
+
+                StatementContext statement = bsc.statement();
+                if (statement.IF() != null
+                        || statement.ELSE() != null
+                        || statement.WHILE() != null) {
+                    String localContext = context + "_rewriteGroup" + 
localCounter;
+                    BlockStatementGrouperVisitor visitor =
+                            new BlockStatementGrouperVisitor(
+                                    CodeSplitUtil.getContextString(statement),
+                                    localContext,
+                                    maxMethodLength,
+                                    parameters);
+                    JavaParser javaParser = new 
JavaParser(visitor.tokenStream);
+                    
javaParser.getInterpreter().setPredictionMode(PredictionMode.SLL);
+                    visitor.visitStatement(javaParser.statement());
+
+                    localGroup.add(new RewriteContextGroupElement(statement, 
visitor.rewriter));
+                    children.add(visitor);
+                    localCounter = this.counter++;
+                } else {
+
+                    if (localGroupCodeLength + 1 + bsc.getText().length() <= 
maxMethodLength) {
+                        localGroup.add(new ContextGroupElement(bsc));
+                        localGroupCodeLength += bsc.getText().length();
+                    } else {
+                        if (localGroup.size() > 1
+                                || (localGroup.size() == 1
+                                        && canGroupAsSingleStatement(
+                                                
localGroup.get(0).getContext()))) {
+                            String localContext = context + "_rewriteGroup" + 
localCounter;
+                            groups.add(Pair.of(localContext, localGroup));
+                            localCounter = this.counter++;
+                            localGroup = new ArrayList<>();
+                        }
+                        localGroupCodeLength = bsc.getText().length();
+                        localGroup.add(new ContextGroupElement(bsc));
+                    }
+                }
+            }
+
+            if (localGroup.size() > 1
+                    || (localGroup.size() == 1
+                            && 
canGroupAsSingleStatement(localGroup.get(0).getContext()))) {
+                String localContext = context + "_rewriteGroup" + localCounter;
+                groups.add(Pair.of(localContext, localGroup));
+            }

Review Comment:
   Let's say each group will contain 5 statements. If there are 6 statements in 
the current method, this `if` checking will skip the last statement and it will 
be lost.
   
   Add a test to cover the corner case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to