This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new af9a1128f72 [FLINK-27246][TableSQL/Runtime] Include WHILE blocks in 
split generated java methods.
af9a1128f72 is described below

commit af9a1128f728c691b896bc9c591e9be1327601c4
Author: kristoffSC <krzysiek.chmielew...@gmail.com>
AuthorDate: Mon Feb 6 03:48:40 2023 +0100

    [FLINK-27246][TableSQL/Runtime] Include WHILE blocks in split generated 
java methods.
    
    This closes #21393.
---
 flink-table/flink-table-code-splitter/pom.xml      |  12 +
 .../table/codesplit/BlockStatementGrouper.java     | 477 +++++++++++++++++++++
 .../table/codesplit/BlockStatementRewriter.java    | 247 +++++++++++
 .../table/codesplit/BlockStatementSplitter.java    | 277 ++++++++++++
 .../flink/table/codesplit/IfStatementRewriter.java | 260 -----------
 .../flink/table/codesplit/JavaCodeSplitter.java    |   2 +-
 .../table/codesplit/ReturnAndJumpCounter.java}     |  29 +-
 .../table/codesplit/BlockStatementGrouperTest.java | 165 +++++++
 .../codesplit/BlockStatementRewriterTest.java      |  89 ++++
 .../codesplit/BlockStatementSplitterTest.java      |  77 ++++
 .../table/codesplit/CodeRewriterTestBase.java      |  12 +-
 .../flink/table/codesplit/CodeSplitTestUtil.java   | 121 ++++++
 .../table/codesplit/JavaCodeSplitterTest.java      |  21 +-
 .../block/code/TestIfInsideWhileLoopRewrite.java   | 100 +++++
 .../TestIfMultipleSingleLineStatementRewrite.java  |  22 +
 .../{if => block}/code/TestIfStatementRewrite.java |   0
 .../block/code/TestIfStatementRewrite1.java        |  34 ++
 .../block/code/TestIfStatementRewrite2.java        |  32 ++
 .../block/code/TestIfStatementRewrite3.java        |  21 +
 ...ewriteIfStatementInFunctionWithReturnValue.java |   0
 .../{if => block}/code/TestRewriteInnerClass.java  |   0
 .../block/code/TestWhileLoopInsideIfRewrite.java   |  46 ++
 .../resources/block/code/TestWhileLoopRewrite.java |  36 ++
 .../expected/TestIfInsideWhileLoopRewrite.java     | 134 ++++++
 .../TestIfMultipleSingleLineStatementRewrite.java  |  48 +++
 .../block/expected/TestIfStatementRewrite.java     |  68 +++
 .../block/expected/TestIfStatementRewrite1.java    |  80 ++++
 .../block/expected/TestIfStatementRewrite2.java    |  66 +++
 .../block/expected/TestIfStatementRewrite3.java    |  43 ++
 ...ewriteIfStatementInFunctionWithReturnValue.java |   0
 .../expected}/TestRewriteInnerClass.java           |  36 +-
 .../expected/TestWhileLoopInsideIfRewrite.java     |  80 ++++
 .../block/expected/TestWhileLoopRewrite.java       |  62 +++
 .../src/test/resources/groups/code/IfInWhile.txt   |  25 ++
 .../src/test/resources/groups/code/WhileInIf.txt   |  26 ++
 .../test/resources/groups/expected/IfInWhile.txt   |   5 +
 .../test/resources/groups/expected/WhileInIf.txt   |   5 +
 .../if/expected/TestIfStatementRewrite.java        |  79 ----
 .../if/expected/TestRewriteInnerClass.java         |  51 ---
 .../splitter/code/TestNotSplitJavaCode.java        |   2 +-
 .../splitter/expected/TestNotSplitJavaCode.java    |   2 +-
 .../splitter/expected/TestSplitJavaCode.java       | 180 ++++----
 .../runtime/batch/sql/CodeSplitITCase.scala        |  59 +++
 43 files changed, 2627 insertions(+), 504 deletions(-)

diff --git a/flink-table/flink-table-code-splitter/pom.xml 
b/flink-table/flink-table-code-splitter/pom.xml
index fe7e34dda9c..0da829d2c4d 100644
--- a/flink-table/flink-table-code-splitter/pom.xml
+++ b/flink-table/flink-table-code-splitter/pom.xml
@@ -61,6 +61,18 @@ under the License.
                        <version>${project.version}</version>
                        <scope>test</scope>
                </dependency>
+
+               <!-- Java compiler -->
+               <dependency>
+                       <groupId>org.codehaus.janino</groupId>
+                       <artifactId>janino</artifactId>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.codehaus.janino</groupId>
+                       <artifactId>commons-compiler</artifactId>
+                       <scope>test</scope>
+               </dependency>
        </dependencies>
 
        <build>
diff --git 
a/flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/BlockStatementGrouper.java
 
b/flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/BlockStatementGrouper.java
new file mode 100644
index 00000000000..c2bfac1a10e
--- /dev/null
+++ 
b/flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/BlockStatementGrouper.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codesplit;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.codesplit.JavaParser.BlockStatementContext;
+import org.apache.flink.table.codesplit.JavaParser.StatementContext;
+
+import org.antlr.v4.runtime.CharStreams;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.ParserRuleContext;
+import org.antlr.v4.runtime.Token;
+import org.antlr.v4.runtime.TokenStreamRewriter;
+import org.antlr.v4.runtime.atn.PredictionMode;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+
+/**
+ * Groups end extract single line statements such as operations on 
fields/local variables, IF and
+ * WHILE statements and extract new method for each group making them smaller.
+ *
+ * <p>BlockStatementGrouper does not recognize if statement operates on local 
of class member
+ * variable. Because of that, code must be preprocessed by {@link 
DeclarationRewriter} which
+ * converts all local variables extracted as to member variables.
+ *
+ * <p><i>Before</i>
+ *
+ * <pre><code>
+ * {
+ *     a[0] += b[1];
+ *     b[1] += a[1];
+ *     while (counter > 0) {
+ *         myFun_whileBody0_0(a, b);
+ *         if (a[0] > 0) {
+ *             myFun_whileBody0_0_ifBody0(a, b);
+ *         } else {
+ *             myFun_whileBody0_0_ifBody1(a, b);
+ *         }
+ *         counter--;
+ *     }
+ *
+ *     a[2] += b[2];
+ *     b[3] += a[3];
+ * }
+ *
+ * </code></pre>
+ *
+ * <p><i>After</i>
+ *
+ * <pre><code>
+ * {
+ *     myFun_rewriteGroup4(a, b);
+ *     myFun_rewriteGroup5(a, b);
+ * }
+ * </code></pre>
+ *
+ * <p>Where bodies of extracted "methods" are:
+ *
+ * <pre><code>
+ *     myFun_rewriteGroup4 ->
+ *         a[0] += b[1];
+ *         b[1] += a[1];
+ *         while (counter > 0) {
+ *             myFun_rewriteGroup0_1_rewriteGroup3(a, b);
+ *             counter--;
+ *         }
+ * </code></pre>
+ *
+ * <pre><code>
+ *     myFun_rewriteGroup5 ->
+ *         a[2] += b[2];
+ *         b[3] += a[3];
+ * </code></pre>
+ *
+ * <pre><code>
+ *     myFun_rewriteGroup0_1_rewriteGroup3 ->
+ *         myFun_whileBody0_0(a, b);
+ *         if (a[0] > 0) {
+ *             myFun_whileBody0_0_ifBody0(a, b);
+ *         } else {
+ *             myFun_whileBody0_0_ifBody1(a, b);
+ *         }
+ * </code></pre>
+ */
+@Internal
+public class BlockStatementGrouper {
+
+    private final String code;
+
+    private final long maxMethodLength;
+
+    private final String parameters;
+
+    /**
+     * Initialize new BlockStatementGrouper.
+     *
+     * @param code code block that should be rewritten for statement grouping.
+     * @param maxMethodLength maximal length of the extracted code block.
+     * @param parameters parameters definition that should be used for 
extracted methods.
+     */
+    public BlockStatementGrouper(String code, long maxMethodLength, String 
parameters) {
+        this.code = code;
+        this.maxMethodLength = maxMethodLength;
+        this.parameters = parameters;
+    }
+
+    /**
+     * Rewrite code block used for initialization of this object. The code 
block is grouped into new
+     * methods.
+     *
+     * @param context prefix used for extracted group names.
+     * @return {@link RewriteGroupedCode} representing rewritten code block 
and containing extracted
+     *     groups with their names and content.
+     */
+    public RewriteGroupedCode rewrite(String context) {
+
+        BlockStatementGrouperVisitor visitor =
+                new BlockStatementGrouperVisitor(maxMethodLength, parameters);
+        CommonTokenStream tokenStream =
+                new CommonTokenStream(new 
JavaLexer(CharStreams.fromString(code)));
+        JavaParser javaParser = new JavaParser(tokenStream);
+        javaParser.getInterpreter().setPredictionMode(PredictionMode.SLL);
+        TokenStreamRewriter rewriter = new TokenStreamRewriter(tokenStream);
+        visitor.visitStatement(javaParser.statement(), context, rewriter);
+
+        visitor.rewrite();
+        Map<String, Pair<TokenStreamRewriter, List<LocalGroupElement>>> groups 
= visitor.groups;
+
+        Map<String, List<String>> groupStrings = new HashMap<>(groups.size());
+        for (Entry<String, Pair<TokenStreamRewriter, List<LocalGroupElement>>> 
group :
+                groups.entrySet()) {
+            List<String> collectedStringGroups =
+                    group.getValue().getValue().stream()
+                            .map(LocalGroupElement::getBody)
+                            .collect(Collectors.toList());
+
+            groupStrings.put(group.getKey(), collectedStringGroups);
+        }
+
+        return new RewriteGroupedCode(rewriter.getText(), groupStrings);
+    }
+
+    private static class BlockStatementGrouperVisitor {
+
+        private final Map<String, Pair<TokenStreamRewriter, 
List<LocalGroupElement>>> groups =
+                new HashMap<>();
+
+        private final long maxMethodLength;
+
+        private final String parameters;
+
+        private int counter = 0;
+
+        private BlockStatementGrouperVisitor(long maxMethodLength, String 
parameters) {
+            this.maxMethodLength = maxMethodLength;
+            this.parameters = parameters;
+        }
+
+        public void visitStatement(
+                StatementContext ctx, String context, TokenStreamRewriter 
rewriter) {
+
+            if (ctx.getChildCount() == 0) {
+                return;
+            }
+
+            // For these statements here we want to process all "branches" 
separately, for example
+            // TRUE and FALSE branch of IF/ELSE block.
+            // each statement can be rewritten and extracted.
+            if (ctx.WHILE() != null || ctx.IF() != null || ctx.ELSE() != null) 
{
+                for (StatementContext statement : ctx.statement()) {
+                    if (shouldExtract(statement)) {
+                        String localContext = String.format("%s_%d", context, 
counter++);
+                        groupBlock(statement, localContext, rewriter);
+                    }
+                }
+            } else {
+                // The block did not start from IF/ELSE/WHILE statement
+                if (shouldExtract(ctx)) {
+                    groupBlock(ctx, context, rewriter);
+                }
+            }
+        }
+
+        // Group continuous block of statements together. If Statement is an 
IF/ELSE/WHILE,
+        // its body can be further grouped by recursive call to visitStatement 
method.
+        private void groupBlock(
+                StatementContext ctx, String context, TokenStreamRewriter 
rewriter) {
+            int localGroupCodeLength = 0;
+            List<LocalGroupElement> localGroup = new ArrayList<>();
+            for (BlockStatementContext bsc : ctx.block().blockStatement()) {
+
+                StatementContext statement = bsc.statement();
+                if (statement.IF() != null
+                        || statement.ELSE() != null
+                        || statement.WHILE() != null) {
+                    String localContext = context + "_rewriteGroup" + 
this.counter++;
+
+                    CommonTokenStream tokenStream =
+                            new CommonTokenStream(
+                                    new JavaLexer(
+                                            CharStreams.fromString(
+                                                    
CodeSplitUtil.getContextString(statement))));
+                    TokenStreamRewriter localRewriter = new 
TokenStreamRewriter(tokenStream);
+                    JavaParser javaParser = new JavaParser(tokenStream);
+                    
javaParser.getInterpreter().setPredictionMode(PredictionMode.SLL);
+                    visitStatement(javaParser.statement(), localContext, 
localRewriter);
+
+                    localGroup.add(new RewriteContextGroupElement(statement, 
localRewriter));
+
+                    // new method call length to the localGroupCodeLength. The 
"3" contains two
+                    // brackets for parameters and semicolon at the end of 
method call
+                    localGroupCodeLength += 3 + localContext.length() + 
parameters.length();
+                } else {
+
+                    if (localGroupCodeLength + 1 + bsc.getText().length() <= 
maxMethodLength) {
+                        localGroup.add(new ContextGroupElement(bsc));
+                        localGroupCodeLength += bsc.getText().length();
+                    } else {
+                        if (addLocalGroup(localGroup, context, rewriter)) {
+                            localGroup = new ArrayList<>();
+                            localGroupCodeLength = 0;
+                        }
+                        localGroupCodeLength += bsc.getText().length();
+                        localGroup.add(new ContextGroupElement(bsc));
+                    }
+                }
+            }
+
+            // Groups that have only one statement that is "single line 
statement" such as
+            // "a[2] += b[2];" will not be extracted.
+            addLocalGroup(localGroup, context, rewriter);
+        }
+
+        private boolean addLocalGroup(
+                List<LocalGroupElement> localGroup, String context, 
TokenStreamRewriter rewriter) {
+            if (localGroup.size() > 1
+                    || (localGroup.size() == 1
+                            && 
canGroupAsSingleStatement(localGroup.get(0).getContext()))) {
+                String localContext = context + "_rewriteGroup" + 
this.counter++;
+                groups.put(localContext, Pair.of(rewriter, localGroup));
+                return true;
+            }
+
+            return false;
+        }
+
+        private boolean canGroupAsSingleStatement(ParserRuleContext context) {
+
+            StatementContext statement;
+
+            if (context instanceof StatementContext) {
+                statement = (StatementContext) context;
+            } else if (context instanceof BlockStatementContext) {
+                statement = ((BlockStatementContext) context).statement();
+            } else {
+                return false;
+            }
+
+            return statement != null
+                    && (statement.IF() != null
+                            || statement.ELSE() != null
+                            || statement.WHILE() != null);
+        }
+
+        private boolean shouldExtract(StatementContext ctx) {
+            return ctx != null
+                    && ctx.block() != null
+                    && ctx.block().blockStatement() != null
+                    // if there is only one statement in the block it's 
useless to extract
+                    // it into a separate function
+                    && ctx.block().blockStatement().size() > 1
+                    // should not extract blocks with return statements
+                    && getNumOfReturnOrJumpStatements(ctx.block()) == 0;
+        }
+
+        private int getNumOfReturnOrJumpStatements(ParserRuleContext ctx) {
+            ReturnAndJumpCounter counter = new ReturnAndJumpCounter();
+            counter.visit(ctx);
+            return counter.getCounter();
+        }
+
+        private void rewrite() {
+            for (Entry<String, Pair<TokenStreamRewriter, 
List<LocalGroupElement>>> group :
+                    groups.entrySet()) {
+                Pair<TokenStreamRewriter, List<LocalGroupElement>> pair = 
group.getValue();
+                TokenStreamRewriter rewriter = pair.getKey();
+                List<LocalGroupElement> value = pair.getValue();
+                rewriter.replace(
+                        value.get(0).getStart(),
+                        value.get(value.size() - 1).getStop(),
+                        group.getKey() + "(" + this.parameters + ");");
+            }
+        }
+    }
+
+    /**
+     * Represents an extracted statement, it boundaries (start and stop token) 
and its String
+     * representation. For example single line statement like: int a = 3; or 
block statement like
+     * IF/ELSE/WHILE bodies.
+     */
+    private interface LocalGroupElement {
+
+        /** @return start {@link Token} for this group element. */
+        Token getStart();
+
+        /** @return stop {@link Token} for this group element. */
+        Token getStop();
+
+        /** @return String representation of this group element. */
+        String getBody();
+
+        ParserRuleContext getContext();
+    }
+
+    /**
+     * Extracted element that is represented solely by {@link 
ParserRuleContext}. It's used for
+     * extracted statements that represent single line statement such as 
variable operation that
+     * will not be further rewritten.
+     */
+    private static class ContextGroupElement implements LocalGroupElement {
+
+        private final ParserRuleContext parserRuleContext;
+
+        private ContextGroupElement(ParserRuleContext parserRuleContext) {
+            this.parserRuleContext = parserRuleContext;
+        }
+
+        @Override
+        public Token getStart() {
+            return this.parserRuleContext.start;
+        }
+
+        @Override
+        public Token getStop() {
+            return this.parserRuleContext.stop;
+        }
+
+        @Override
+        public String getBody() {
+            return CodeSplitUtil.getContextString(this.parserRuleContext);
+        }
+
+        @Override
+        public ParserRuleContext getContext() {
+            return this.parserRuleContext;
+        }
+    }
+
+    /**
+     * Extracted element that is represented by {@link ParserRuleContext} and 
{@link
+     * TokenStreamRewriter} It's used for extracted block statements that 
represent TRUE/FALSE
+     * branches of IF/ELSE statements or WHILE's statement body. The string 
representation is
+     * extracted from {@link TokenStreamRewriter} which could be further 
rewritten.
+     *
+     * <p>This element can be used for IF/ELSE/WHILE statements that will be 
rewritten in further
+     * processing. Then this statement in the original code cen be rewritten 
using Start and Stop
+     * tokens, whereas the getBody() method backed by TokenStreamRewriter will 
return the rewritten
+     * value of the original statement.
+     *
+     * <p>The example would parserRuleContext and rewriter representing below 
statement:
+     *
+     * <pre><code>
+     *   while (counter > 0) {
+     *     myFun_whileBody0_0(a, b);
+     *     if (a[0] > 0) {
+     *       myFun_whileBody0_0_ifBody0(a, b);
+     *     } else {
+     *       myFun_whileBody0_0_ifBody1(a, b);
+     *     }
+     *
+     *     a[2] += b[2];
+     *     b[3] += a[3];
+     *     if (a[0] > 0) {
+     *       System.out.println("Hello");
+     *     } else {
+     *       System.out.println("World");
+     *     }
+     *
+     *         counter--;
+     *     }
+     * </code></pre>
+     *
+     * <p>This statement, being a part of a different statement will be 
further rewritten, new
+     * methods will be extracted from its body. At the end in order to rewrite 
the original
+     * statement we need to know what is the current form of this expression. 
For that we can call
+     * getBody() method, which in this case will return:
+     *
+     * <pre><code>
+     * while (counter > 0) {
+     *   myFun_rewriteGroup0_1_rewriteGroup3(a, b);
+     *
+     *   myFun_rewriteGroup0_1_rewriteGroup5(a, b);
+     *
+     *   counter--;
+     * }
+     * </code></pre>
+     */
+    private static class RewriteContextGroupElement implements 
LocalGroupElement {
+
+        private final ParserRuleContext parserRuleContext;
+
+        private final TokenStreamRewriter rewriter;
+
+        private RewriteContextGroupElement(
+                ParserRuleContext parserRuleContext, TokenStreamRewriter 
rewriter) {
+            this.parserRuleContext = parserRuleContext;
+            this.rewriter = rewriter;
+        }
+
+        @Override
+        public Token getStart() {
+            return this.parserRuleContext.start;
+        }
+
+        @Override
+        public Token getStop() {
+            return this.parserRuleContext.stop;
+        }
+
+        @Override
+        public String getBody() {
+            return this.rewriter.getText();
+        }
+
+        @Override
+        public ParserRuleContext getContext() {
+            return this.parserRuleContext;
+        }
+    }
+
+    /**
+     * This object represents a rewritten code block. It contains its new form 
along with all
+     * extracted groups and their names.
+     */
+    public static class RewriteGroupedCode {
+
+        /** Rewritten code block containing calls to extracted methods. */
+        private final String rewriteCode;
+
+        /** All extracted groups with their names. */
+        private final Map<String, List<String>> groups;
+
+        public RewriteGroupedCode(String rewriteCode, Map<String, 
List<String>> groups) {
+            this.rewriteCode = rewriteCode;
+            this.groups = groups;
+        }
+
+        public String getRewriteCode() {
+            return rewriteCode;
+        }
+
+        public Map<String, List<String>> getGroups() {
+            return this.groups;
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/BlockStatementRewriter.java
 
b/flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/BlockStatementRewriter.java
new file mode 100644
index 00000000000..94f07de7048
--- /dev/null
+++ 
b/flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/BlockStatementRewriter.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codesplit;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.table.codesplit.BlockStatementGrouper.RewriteGroupedCode;
+import org.apache.flink.table.codesplit.JavaParser.StatementContext;
+
+import org.antlr.v4.runtime.CharStreams;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.TokenStreamRewriter;
+import org.antlr.v4.runtime.atn.PredictionMode;
+
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.StringJoiner;
+
+/**
+ * Extract branch of IFs, ELSEs statements and WHILEs code blocks into smaller 
methods. Single line
+ * statement blocks, with more than one statement, like field or local 
variable operations will be
+ * extracted also to separate methods.
+ *
+ * <p>This rewriter only deals with functions without return values. Functions 
with return values
+ * should have been converted by {@link ReturnValueRewriter}. Also, this 
rewriter will not extract
+ * blocks containing <code>return</code> statements for correctness.
+ *
+ * <p><i>Before</i>
+ *
+ * <pre><code>
+ * public class Example {
+ *
+ *     int b;
+ *     int c;
+ *
+ *     public void myFun(int a) {
+ *
+ *         int counter = 10;
+ *         while (counter > 0) {
+ *             int localA = a + 1000;
+ *             System.out.println(localA);
+ *             if (a > 0) {
+ *                 b = a * 2;
+ *                 c = b * 2;
+ *                 System.out.println(b);
+ *             } else {
+ *                 b = a * 3;
+ *                 System.out.println(b);
+ *             }
+ *             counter--;
+ *         }
+ *     }
+ * }
+ * </code></pre>
+ *
+ * <p><i>After</i>
+ *
+ * <pre><code>
+ * public class Example {
+ *
+ *     int b;
+ *     int c;
+ *
+ *     public void myFun(int a) {
+ *
+ *         int counter = 10;
+ *
+ *         while (counter > 0) {
+ *             myFun_rewriteGroup1(a);
+ *             counter--;
+ *         }
+ *     }
+ *
+ *     void myFun_rewriteGroup1(int a) {
+ *         myFun_whileBody0_0(a);
+ *         if (a > 0) {
+ *             myFun_whileBody0_0_ifBody0(a);
+ *         } else {
+ *             myFun_whileBody0_0_ifBody1(a);
+ *         }
+ *     }
+ *
+ *     void myFun_whileBody0_0(int a) {
+ *         int localA = a + 1000;
+ *         System.out.println(localA);
+ *     }
+ *
+ *     void myFun_whileBody0_0_ifBody1(int a) {
+ *         b = a * 3;
+ *         System.out.println(b);
+ *     }
+ *
+ *     void myFun_whileBody0_0_ifBody0(int a) {
+ *         b = a * 2;
+ *         c = b * 2;
+ *         System.out.println(b);
+ *     }
+ * }
+ * </code></pre>
+ */
+@Internal
+public class BlockStatementRewriter implements CodeRewriter {
+
+    private final long maxMethodLength;
+
+    private final BlockVisitor visitor;
+
+    public BlockStatementRewriter(String code, long maxMethodLength) {
+        this.maxMethodLength = maxMethodLength;
+        this.visitor = new BlockVisitor(code);
+    }
+
+    @Override
+    public String rewrite() {
+        return visitor.rewriteAndGetCode();
+    }
+
+    private class BlockVisitor extends JavaParserBaseVisitor<Void> {
+
+        private final CommonTokenStream tokenStream;
+
+        private final TokenStreamRewriter rewriter;
+
+        private BlockVisitor(String code) {
+            this.tokenStream = new CommonTokenStream(new 
JavaLexer(CharStreams.fromString(code)));
+            this.rewriter = new TokenStreamRewriter(tokenStream);
+        }
+
+        private String rewriteAndGetCode() {
+            JavaParser javaParser = new JavaParser(tokenStream);
+            javaParser.getInterpreter().setPredictionMode(PredictionMode.SLL);
+            visit(javaParser.compilationUnit());
+            return rewriter.getText();
+        }
+
+        @Override
+        public Void visitMethodDeclaration(JavaParser.MethodDeclarationContext 
ctx) {
+
+            if (!"void".equals(ctx.typeTypeOrVoid().getText())) {
+                return null;
+            }
+
+            // function real parameters
+            LinkedHashSet<String> declarationContext = new LinkedHashSet<>();
+            new JavaParserBaseVisitor<Void>() {
+                @Override
+                public Void 
visitFormalParameter(JavaParser.FormalParameterContext ctx) {
+                    
declarationContext.add(ctx.variableDeclaratorId().getText());
+                    return null;
+                }
+            }.visit(ctx);
+
+            String type = CodeSplitUtil.getContextString(ctx.typeTypeOrVoid());
+            String functionName = ctx.IDENTIFIER().getText();
+            String parameters = 
CodeSplitUtil.getContextString(ctx.formalParameters());
+
+            String methodQualifier = "";
+            if (ctx.THROWS() != null) {
+                methodQualifier =
+                        " throws " + 
CodeSplitUtil.getContextString(ctx.qualifiedNameList());
+            }
+
+            for (JavaParser.BlockStatementContext blockStatementContext :
+                    ctx.methodBody().block().blockStatement()) {
+
+                StatementContext statement = blockStatementContext.statement();
+                if (statement != null
+                        && statement.getText().length() > maxMethodLength
+                        && (statement.IF() != null
+                                || statement.ELSE() != null
+                                || statement.WHILE() != null)) {
+
+                    BlockStatementSplitter splitter =
+                            new BlockStatementSplitter(
+                                    CodeSplitUtil.getContextString(statement),
+                                    String.join(", ", declarationContext));
+
+                    // Rewrite function's body to include calls to extracted 
methods.
+                    String blockRewrittenBody = 
splitter.rewriteBlock(functionName);
+
+                    // Get extract methods from block's original body.
+                    Map<String, List<String>> newMethods = 
splitter.extractBlocks();
+
+                    // Try to group statements in function's new body
+                    StringBuilder functionGroupedBody = new StringBuilder();
+                    BlockStatementGrouper statementGrouper =
+                            new BlockStatementGrouper(
+                                    blockRewrittenBody,
+                                    maxMethodLength,
+                                    String.join(", ", declarationContext));
+
+                    RewriteGroupedCode groupedCode = 
statementGrouper.rewrite(functionName);
+                    // add new methods, representing extracted groups.
+                    newMethods.putAll(groupedCode.getGroups());
+
+                    
functionGroupedBody.append("\n").append(groupedCode.getRewriteCode());
+
+                    // replace original functions body with new, rewritten and 
grouped body.
+                    rewriter.replace(statement.start, statement.stop, 
functionGroupedBody);
+
+                    // build new method definitions.
+                    StringJoiner newMethodDefinitions =
+                            new StringJoiner(System.lineSeparator() + 
System.lineSeparator());
+                    for (Entry<String, List<String>> entry : 
newMethods.entrySet()) {
+                        StringBuilder sb =
+                                new StringBuilder(type)
+                                        .append(" ")
+                                        .append(entry.getKey())
+                                        .append(parameters)
+                                        .append(methodQualifier)
+                                        .append(" {")
+                                        .append(System.lineSeparator());
+
+                        StringJoiner bodyLines = new 
StringJoiner(System.lineSeparator());
+                        for (String bodyLine : entry.getValue()) {
+                            bodyLines.add(bodyLine);
+                        }
+
+                        
sb.append(bodyLines).append(System.lineSeparator()).append("}");
+                        newMethodDefinitions.add(sb.toString());
+                    }
+
+                    // add new method definitions to class
+                    rewriter.insertAfter(
+                            ctx.getParent().stop, "\n\n" + 
newMethodDefinitions + "\n");
+                }
+            }
+            return null;
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/BlockStatementSplitter.java
 
b/flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/BlockStatementSplitter.java
new file mode 100644
index 00000000000..281c7141db9
--- /dev/null
+++ 
b/flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/BlockStatementSplitter.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codesplit;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.codesplit.JavaParser.BlockStatementContext;
+import org.apache.flink.table.codesplit.JavaParser.StatementContext;
+import org.apache.flink.util.Preconditions;
+
+import org.antlr.v4.runtime.CharStreams;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.ParserRuleContext;
+import org.antlr.v4.runtime.TokenStreamRewriter;
+import org.antlr.v4.runtime.atn.PredictionMode;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+
+/**
+ * Extract statements from IFs, ELSEs and WHILEs blocks making them smaller.
+ *
+ * <p>BlockStatementSplitter does not recognize if statement operates on local 
of class member
+ * variable. Because of that, code must be preprocessed by {@link 
DeclarationRewriter} which
+ * converts all local variables extracted as to member variables.
+ *
+ * <p><i>Before</i>
+ *
+ * <pre><code>
+ * while (counter > 0) {
+ *     int localA = a + 1000;
+ *     System.out.println(localA);
+ *     if (a > 0) {
+ *         b = a * 2;
+ *         c = b * 2;
+ *         System.out.println(b);
+ *     } else {
+ *         b = a * 3;
+ *         System.out.println(b);
+ *     }
+ *     counter--;
+ * }
+ *
+ * </code></pre>
+ *
+ * <p><i>After</i>
+ *
+ * <pre><code>
+ * while (counter > 0) {
+ *     myFun_0_1(a, b, c);
+ *     if (a > 0) {
+ *         myFun_0_1_2(a, b, c);
+ *     } else {
+ *         myFun_0_1_3(a, b, c);
+ *     }
+ *     counter--;
+ * }
+ * </code></pre>
+ *
+ * <p>Where bodies of extracted "methods" are:
+ *
+ * <pre><code>
+ * myFun_0_1(int a, int b, int c) ->
+ *     int localA = a + 1000;
+ *     System.out.println(localA);
+ * </code></pre>
+ *
+ * <pre><code>
+ * myFun_0_1_3(int a, int b, int c) ->
+ *     b = a * 2;
+ *     c = b * 2;
+ *     System.out.println(b);
+ * </code></pre>
+ *
+ * <pre><code>
+ * myFun_whileBody0_0_ifBody1(int a) ->
+ *     b = a * 3;
+ *     System.out.println(b);
+ * </code></pre>
+ */
+@Internal
+public class BlockStatementSplitter {
+
+    private final String code;
+
+    private final String parameters;
+
+    private BlockStatementVisitor visitor;
+
+    /**
+     * Initialize new BlockStatementSplitter.
+     *
+     * @param code a code block that should be rewritten.
+     * @param parameters parameters definition that should be used for 
extracted methods.
+     */
+    public BlockStatementSplitter(String code, String parameters) {
+        this.code = code;
+        this.parameters = parameters;
+    }
+
+    /**
+     * Rewrite code block that was used for this object initialization.
+     *
+     * @param context prefix for extracted blocks.
+     * @return a map which key represent rewritten block name and value 
represents rewritten code
+     *     block, including calls to extracted methods
+     */
+    public String rewriteBlock(String context) {
+
+        this.visitor = new BlockStatementVisitor(code, parameters);
+        JavaParser javaParser = new JavaParser(visitor.tokenStream);
+        javaParser.getInterpreter().setPredictionMode(PredictionMode.SLL);
+        visitor.visitStatement(javaParser.statement(), context);
+        visitor.rewrite();
+        return visitor.rewriter.getText();
+    }
+
+    /**
+     * This method extracts statements from IFs, ELSE's and WHILE blocks from 
block code used during
+     * initialization of this object. Every entry of returned map can be seen 
as new method name
+     * (map key) and method's body (map value). The block names will be 
prefixed with provided
+     * context.
+     *
+     * @return a map of block name to block statements mappings. The key can 
be interpreted as name
+     *     of extracted block/method and corresponding List represents 
individual statements (block'
+     *     lines) for this block.
+     */
+    public Map<String, List<String>> extractBlocks() {
+
+        Map<String, List<String>> allBlocks = new 
HashMap<>(visitor.blocks.size());
+
+        for (Entry<String, List<ParserRuleContext>> entry : 
visitor.blocks.entrySet()) {
+
+            List<String> blocks =
+                    entry.getValue().stream()
+                            .map(CodeSplitUtil::getContextString)
+                            .collect(Collectors.toList());
+            allBlocks.put(entry.getKey(), blocks);
+        }
+
+        return allBlocks;
+    }
+
+    private static class BlockStatementVisitor {
+
+        private final Map<String, List<ParserRuleContext>> blocks = new 
HashMap<>();
+
+        private final CommonTokenStream tokenStream;
+
+        private final TokenStreamRewriter rewriter;
+
+        private final String parameters;
+
+        private int counter = 0;
+
+        private BlockStatementVisitor(String code, String parameters) {
+            this.tokenStream = new CommonTokenStream(new 
JavaLexer(CharStreams.fromString(code)));
+            this.rewriter = new TokenStreamRewriter(tokenStream);
+            this.parameters = parameters;
+        }
+
+        public void visitStatement(StatementContext ctx, String context) {
+
+            if (ctx.getChildCount() == 0 || 
getNumOfReturnOrJumpStatements(ctx) != 0) {
+                return;
+            }
+
+            if (ctx.block() == null) {
+                for (StatementContext statementContext : ctx.statement()) {
+                    String localContext = String.format("%s_%d", context, 
counter++);
+                    visitStatement(statementContext, localContext);
+                }
+            } else {
+                List<ParserRuleContext> extractedSingleBlocks = new 
ArrayList<>();
+                for (BlockStatementContext bsc : ctx.block().blockStatement()) 
{
+                    if (bsc.statement() != null
+                            && (bsc.statement().IF() != null
+                                    || bsc.statement().ELSE() != null
+                                    || bsc.statement().WHILE() != null)) {
+
+                        String localContext = String.format("%s_%d", context, 
counter++);
+
+                        tryGroupAsSingleStatement(extractedSingleBlocks, 
localContext);
+
+                        extractedSingleBlocks = new ArrayList<>();
+                        visitStatement(bsc.statement(), localContext);
+                    } else {
+                        extractedSingleBlocks.add(bsc);
+                    }
+                }
+
+                tryGroupAsSingleStatement(extractedSingleBlocks, context);
+            }
+        }
+
+        private void tryGroupAsSingleStatement(
+                List<ParserRuleContext> extractedSingleBlocks, String context) 
{
+            // if there is only one statement in the block, and it is not 
IF/ELSE/WHILE
+            // statement it's pointless to extract it into a separate function.
+            if (canGroupAsSingleStatement(extractedSingleBlocks)) {
+                List<ParserRuleContext> previous = blocks.put(context, 
extractedSingleBlocks);
+                Preconditions.checkState(
+                        previous == null,
+                        String.format(
+                                "Overriding extracted block %s - this should 
not happen.",
+                                context));
+            }
+        }
+
+        private void rewrite() {
+
+            for (Entry<String, List<ParserRuleContext>> entry : 
blocks.entrySet()) {
+                List<ParserRuleContext> statements = entry.getValue();
+                String statementContext = entry.getKey();
+                // if there is only one statement in the block, and it is not 
IF/ELSE/WHILE
+                // statement
+                // it's pointless to extract it into a separate function.
+                if (statements.size() > 1
+                        || (statements.size() == 1
+                                && 
canGroupAsSingleStatement(statements.get(0)))) {
+                    rewriter.replace(
+                            statements.get(0).start,
+                            statements.get(statements.size() - 1).stop,
+                            statementContext + "(" + parameters + ");");
+                }
+            }
+        }
+
+        private boolean canGroupAsSingleStatement(List<ParserRuleContext> 
extractedSingleBlocks) {
+            return extractedSingleBlocks.size() > 1
+                    || (extractedSingleBlocks.size() == 1
+                            && 
canGroupAsSingleStatement(extractedSingleBlocks.get(0)));
+        }
+
+        private boolean canGroupAsSingleStatement(ParserRuleContext 
parserRuleContext) {
+
+            StatementContext statement;
+
+            if (parserRuleContext instanceof StatementContext) {
+                statement = (StatementContext) parserRuleContext;
+            } else if (parserRuleContext instanceof BlockStatementContext) {
+                statement = ((BlockStatementContext) 
parserRuleContext).statement();
+            } else {
+                return false;
+            }
+
+            return statement != null
+                    && (statement.IF() != null
+                            || statement.ELSE() != null
+                            || statement.WHILE() != null);
+        }
+
+        private int getNumOfReturnOrJumpStatements(ParserRuleContext ctx) {
+            ReturnAndJumpCounter counter = new ReturnAndJumpCounter();
+            counter.visit(ctx);
+            return counter.getCounter();
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/IfStatementRewriter.java
 
b/flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/IfStatementRewriter.java
deleted file mode 100644
index 4867a648e08..00000000000
--- 
a/flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/IfStatementRewriter.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.codesplit;
-
-import org.apache.flink.annotation.Internal;
-
-import org.antlr.v4.runtime.CharStreams;
-import org.antlr.v4.runtime.CommonTokenStream;
-import org.antlr.v4.runtime.ParserRuleContext;
-import org.antlr.v4.runtime.TokenStreamRewriter;
-import org.antlr.v4.runtime.atn.PredictionMode;
-
-import java.util.LinkedHashSet;
-
-/**
- * Extract true and false branch of IFs and ELSEs in long methods into two 
smaller methods.
- *
- * <p>This rewriter only deals with functions without return values. Functions 
with return values
- * should have been converted by {@link ReturnValueRewriter}. Also, this 
rewriter will not extract
- * blocks containing <code>return</code> statements for correctness.
- *
- * <p><i>Before</i>
- *
- * <pre><code>
- * public class Example {
- *     int b;
- *     public void myFun(int a) {
- *         if (a > 0) {
- *             b = a * 2;
- *             System.out.println(b);
- *         } else {
- *             b = a * 3;
- *             System.out.println(b);
- *         }
- *     }
- * }
- * </code></pre>
- *
- * <p><i>After</i>
- *
- * <pre><code>
- * public class Example {
- *     int b;
- *     public void myFun(int a) {
- *         if (a > 0) {
- *             myFun_trueFilter1(a);
- *         } else {
- *             myFun_falseFilter2(a);
- *         }
- *     }
- *     void myFun_trueFilter1(int a) {
- *         b = a * 2;
- *         System.out.println(b);
- *     }
- *     void myFun_falseFilter2(int a) {
- *         b = a * 3;
- *         System.out.println(b);
- *     }
- * }
- * </code></pre>
- */
-@Internal
-public class IfStatementRewriter implements CodeRewriter {
-
-    private final long maxMethodLength;
-    private IfStatementVisitor visitor;
-
-    public IfStatementRewriter(String code, long maxMethodLength) {
-        this.maxMethodLength = maxMethodLength;
-        this.visitor = new IfStatementVisitor(code);
-    }
-
-    public String rewrite() {
-        String rewriterCode = visitor.rewriteAndGetCode();
-        while (visitor.hasRewrite()) {
-            visitor = new IfStatementVisitor(rewriterCode);
-            rewriterCode = visitor.rewriteAndGetCode();
-        }
-        return rewriterCode;
-    }
-
-    private class IfStatementVisitor extends JavaParserBaseVisitor<Void> {
-
-        private final CommonTokenStream tokenStream;
-
-        private final TokenStreamRewriter rewriter;
-
-        private long rewriteCount;
-
-        private IfStatementVisitor(String code) {
-            this.tokenStream = new CommonTokenStream(new 
JavaLexer(CharStreams.fromString(code)));
-            this.rewriter = new TokenStreamRewriter(tokenStream);
-        }
-
-        @Override
-        public Void visitMethodDeclaration(JavaParser.MethodDeclarationContext 
ctx) {
-
-            if (!"void".equals(ctx.typeTypeOrVoid().getText())) {
-                return null;
-            }
-
-            // function real parameters
-            LinkedHashSet<String> declarationContext = new LinkedHashSet<>();
-            new JavaParserBaseVisitor<Void>() {
-                @Override
-                public Void 
visitFormalParameter(JavaParser.FormalParameterContext ctx) {
-                    
declarationContext.add(ctx.variableDeclaratorId().getText());
-                    return null;
-                }
-            }.visit(ctx);
-
-            String type = CodeSplitUtil.getContextString(ctx.typeTypeOrVoid());
-            String functionName = ctx.IDENTIFIER().getText();
-            String parameters = 
CodeSplitUtil.getContextString(ctx.formalParameters());
-
-            String methodQualifier = "";
-            if (ctx.THROWS() != null) {
-                methodQualifier =
-                        " throws " + 
CodeSplitUtil.getContextString(ctx.qualifiedNameList());
-            }
-
-            for (JavaParser.BlockStatementContext blockStatementContext :
-                    ctx.methodBody().block().blockStatement()) {
-
-                if (blockStatementContext.statement() != null
-                        && blockStatementContext.statement().IF() != null
-                        && 
blockStatementContext.statement().getText().length() > maxMethodLength) {
-                    if 
(shouldExtract(blockStatementContext.statement().statement(0))) {
-                        long counter = 
CodeSplitUtil.getCounter().incrementAndGet();
-
-                        String methodDef =
-                                type
-                                        + " "
-                                        + functionName
-                                        + "_trueFilter"
-                                        + counter
-                                        + parameters
-                                        + methodQualifier;
-
-                        String newMethod =
-                                methodDef
-                                        + CodeSplitUtil.getContextString(
-                                                blockStatementContext
-                                                        .statement()
-                                                        .statement(0)
-                                                        .block())
-                                        + "\n";
-
-                        String newMethodCall =
-                                functionName
-                                        + "_trueFilter"
-                                        + counter
-                                        + "("
-                                        + String.join(", ", declarationContext)
-                                        + ");\n";
-                        rewriter.replace(
-                                
blockStatementContext.statement().statement(0).block().start,
-                                
blockStatementContext.statement().statement(0).block().stop,
-                                "{\n" + newMethodCall + "\n}\n");
-                        rewriter.insertAfter(ctx.getParent().stop, "\n" + 
newMethod + "\n");
-                        rewriteCount++;
-                    }
-
-                    if 
(shouldExtract(blockStatementContext.statement().statement(1))) {
-                        long counter = 
CodeSplitUtil.getCounter().incrementAndGet();
-
-                        String methodDef =
-                                type
-                                        + " "
-                                        + functionName
-                                        + "_falseFilter"
-                                        + counter
-                                        + parameters
-                                        + methodQualifier;
-
-                        String newMethod =
-                                methodDef
-                                        + CodeSplitUtil.getContextString(
-                                                blockStatementContext
-                                                        .statement()
-                                                        .statement(1)
-                                                        .block())
-                                        + "\n";
-
-                        String newMethodCall =
-                                functionName
-                                        + "_falseFilter"
-                                        + counter
-                                        + "("
-                                        + String.join(", ", declarationContext)
-                                        + ");\n";
-                        rewriter.replace(
-                                
blockStatementContext.statement().statement(1).block().start,
-                                
blockStatementContext.statement().statement(1).block().stop,
-                                "{\n" + newMethodCall + "\n}\n");
-                        rewriter.insertAfter(ctx.getParent().stop, "\n" + 
newMethod + "\n");
-                        rewriteCount++;
-                    }
-                }
-            }
-            return null;
-        }
-
-        private boolean shouldExtract(JavaParser.StatementContext ctx) {
-            return ctx != null
-                    && ctx.block() != null
-                    && ctx.block().blockStatement() != null
-                    // if there is only one statement in the block it's 
useless to extract
-                    // it into a separate function
-                    && ctx.block().blockStatement().size() > 1
-                    // should not extract blocks with return statements
-                    && getNumReturnsInContext(ctx.block()) == 0;
-        }
-
-        private String rewriteAndGetCode() {
-            JavaParser javaParser = new JavaParser(tokenStream);
-            javaParser.getInterpreter().setPredictionMode(PredictionMode.SLL);
-            visit(javaParser.compilationUnit());
-            return rewriter.getText();
-        }
-
-        private boolean hasRewrite() {
-            return rewriteCount > 0L;
-        }
-    }
-
-    private int getNumReturnsInContext(ParserRuleContext ctx) {
-        ReturnCounter counter = new ReturnCounter();
-        counter.visit(ctx);
-        return counter.returnCount;
-    }
-
-    private static class ReturnCounter extends JavaParserBaseVisitor<Void> {
-
-        private int returnCount = 0;
-
-        @Override
-        public Void visitStatement(JavaParser.StatementContext ctx) {
-            if (ctx.RETURN() != null) {
-                returnCount++;
-            }
-            return visitChildren(ctx);
-        }
-    }
-}
diff --git 
a/flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/JavaCodeSplitter.java
 
b/flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/JavaCodeSplitter.java
index 73f58a2954c..8347ef3049f 100644
--- 
a/flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/JavaCodeSplitter.java
+++ 
b/flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/JavaCodeSplitter.java
@@ -52,7 +52,7 @@ public class JavaCodeSplitter {
         return Optional.ofNullable(
                         new DeclarationRewriter(returnValueRewrittenCode, 
maxMethodLength)
                                 .rewrite())
-                .map(text -> new IfStatementRewriter(text, 
maxMethodLength).rewrite())
+                .map(text -> new BlockStatementRewriter(text, 
maxMethodLength).rewrite())
                 .map(text -> new FunctionSplitter(text, 
maxMethodLength).rewrite())
                 .map(text -> new MemberFieldRewriter(text, 
maxClassMemberCount).rewrite())
                 .orElse(code);
diff --git 
a/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/IfStatementRewriterTest.java
 
b/flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/ReturnAndJumpCounter.java
similarity index 58%
rename from 
flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/IfStatementRewriterTest.java
rename to 
flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/ReturnAndJumpCounter.java
index 7b784060f54..36473755f4a 100644
--- 
a/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/IfStatementRewriterTest.java
+++ 
b/flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/ReturnAndJumpCounter.java
@@ -17,27 +17,22 @@
 
 package org.apache.flink.table.codesplit;
 
-import org.junit.jupiter.api.Test;
+import org.apache.flink.table.codesplit.JavaParser.StatementContext;
 
-/** Tests for {@link IfStatementRewriter}. */
-class IfStatementRewriterTest extends 
CodeRewriterTestBase<IfStatementRewriter> {
+/** Simple parser that counts combined number of 'return', 'continue' and 
'break' kay words. */
+public class ReturnAndJumpCounter extends JavaParserBaseVisitor<Void> {
 
-    public IfStatementRewriterTest() {
-        super("if", code -> new IfStatementRewriter(code, 20));
-    }
-
-    @Test
-    void testIfStatementRewrite() {
-        runTest("TestIfStatementRewrite");
-    }
+    private int counter = 0;
 
-    @Test
-    void testNotRewriteIfStatementInFunctionWithReturnValue() {
-        runTest("TestNotRewriteIfStatementInFunctionWithReturnValue");
+    @Override
+    public Void visitStatement(StatementContext ctx) {
+        if (ctx.RETURN() != null || ctx.BREAK() != null || ctx.CONTINUE() != 
null) {
+            counter++;
+        }
+        return visitChildren(ctx);
     }
 
-    @Test
-    void testRewriteInnerClass() {
-        runTest("TestRewriteInnerClass");
+    public int getCounter() {
+        return counter;
     }
 }
diff --git 
a/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/BlockStatementGrouperTest.java
 
b/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/BlockStatementGrouperTest.java
new file mode 100644
index 00000000000..6c28a6f3cb7
--- /dev/null
+++ 
b/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/BlockStatementGrouperTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codesplit;
+
+import 
org.apache.flink.table.codesplit.BlockStatementGrouper.RewriteGroupedCode;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.codesplit.CodeSplitTestUtil.readResource;
+import static org.apache.flink.table.codesplit.CodeSplitTestUtil.trimLines;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link BlockStatementGrouper}. */
+class BlockStatementGrouperTest {
+
+    @Test
+    public void testExtractIfInWhileGroups() {
+        String parameters = "a, b";
+        String givenBlock = readResource("groups/code/IfInWhile.txt");
+        String expectedBlock = readResource("groups/expected/IfInWhile.txt");
+
+        BlockStatementGrouper grouper = new BlockStatementGrouper(givenBlock, 
10, parameters);
+
+        RewriteGroupedCode rewriteGroupedCode = grouper.rewrite("myFun");
+
+        String rewriteCode = rewriteGroupedCode.getRewriteCode();
+        Map<String, List<String>> groups = rewriteGroupedCode.getGroups();
+
+        // Trying to mitigate any indentation issues between all sort of 
platforms by simply
+        // trim every line of the "class". Before this change, code-splitter 
test could fail on
+        // Windows machines while passing on Unix.
+        assertThat(trimLines(rewriteCode)).isEqualTo(trimLines(expectedBlock));
+
+        assertThat(groups).hasSize(4);
+        List<String> group1 = 
groups.get("myFun_rewriteGroup0_1_rewriteGroup3");
+        assertThat(group1).hasSize(2);
+        assertThat(group1.get(0)).isEqualTo("myFun_whileBody0_0(a, b);");
+        assertThat(trimLines(group1.get(1)))
+                .isEqualTo(
+                        trimLines(
+                                ""
+                                        + " if (a[0] > 0) {\n"
+                                        + "     myFun_whileBody0_0_ifBody0(a, 
b);\n"
+                                        + " } else {\n"
+                                        + "     myFun_whileBody0_0_ifBody1(a, 
b);\n"
+                                        + " }"));
+
+        List<String> group2 = 
groups.get("myFun_rewriteGroup0_1_rewriteGroup5");
+        assertThat(group2).hasSize(3);
+        assertThat(group2.get(0)).isEqualTo("a[2] += b[2];");
+        assertThat(group2.get(1)).isEqualTo("b[3] += a[3];");
+        assertThat(trimLines(group2.get(2)))
+                .isEqualTo(
+                        trimLines(
+                                "if (a[0] > 0) {\n"
+                                        + "            
System.out.println(\"Hello\");\n"
+                                        + "        } else {\n"
+                                        + "            
System.out.println(\"World\");\n"
+                                        + "        }"));
+
+        List<String> group3 = groups.get("myFun_rewriteGroup6");
+        assertThat(group3).hasSize(3);
+        assertThat(group3.get(0)).isEqualTo("a[0] += b[1];");
+        assertThat(group3.get(1)).isEqualTo("b[1] += a[1];");
+        assertThat(trimLines(group3.get(2)))
+                .isEqualTo(
+                        trimLines(
+                                " while (counter > 0) {\n"
+                                        + "    
myFun_rewriteGroup0_1_rewriteGroup3(a, b);\n"
+                                        + "    \n"
+                                        + "    
myFun_rewriteGroup0_1_rewriteGroup5(a, b);\n"
+                                        + "    \n"
+                                        + "    counter--;\n"
+                                        + "}"));
+
+        List<String> group4 = groups.get("myFun_rewriteGroup7");
+        assertThat(group4).containsExactly("a[4] += b[4];", "b[5] += a[5];");
+    }
+
+    @Test
+    public void testExtractWhileInIfGroups() {
+        String parameters = "a, b";
+        String givenBlock = readResource("groups/code/WhileInIf.txt");
+        String expectedBlock = readResource("groups/expected/WhileInIf.txt");
+
+        BlockStatementGrouper grouper = new BlockStatementGrouper(givenBlock, 
10, parameters);
+
+        RewriteGroupedCode rewriteGroupedCode = grouper.rewrite("myFun");
+
+        String rewriteCode = rewriteGroupedCode.getRewriteCode();
+        Map<String, List<String>> groups = rewriteGroupedCode.getGroups();
+
+        // Trying to mitigate any indentation issues between all sort of 
platforms by simply
+        // trim every line of the "class". Before this change, code-splitter 
test could fail on
+        // Windows machines while passing on Unix.
+        assertThat(trimLines(rewriteCode)).isEqualTo(trimLines(expectedBlock));
+
+        assertThat(groups).hasSize(5);
+        List<String> group1 = 
groups.get("myFun_rewriteGroup0_1_rewriteGroup2_3_rewriteGroup5");
+        assertThat(group1).hasSize(2);
+        assertThat(group1.get(0)).isEqualTo("myFun_whileBody0_0(a, b);");
+        assertThat(trimLines(group1.get(1)))
+                .isEqualTo(
+                        trimLines(
+                                ""
+                                        + " if (a[0] > 0) {\n"
+                                        + "     myFun_whileBody0_0_ifBody0(a, 
b);\n"
+                                        + " } else {\n"
+                                        + "     myFun_whileBody0_0_ifBody1(a, 
b);\n"
+                                        + " }"));
+
+        List<String> group2 = 
groups.get("myFun_rewriteGroup0_1_rewriteGroup6");
+        assertThat(group2).hasSize(1);
+        assertThat(trimLines(group2.get(0)))
+                .isEqualTo(
+                        trimLines(
+                                ""
+                                        + "while (counter > 0) {\n"
+                                        + "    
myFun_rewriteGroup0_1_rewriteGroup2_3_rewriteGroup5(a, b);\n"
+                                        + "    counter--;\n"
+                                        + "}"));
+
+        List<String> group3 = 
groups.get("myFun_rewriteGroup0_1_rewriteGroup7");
+        assertThat(group3).containsExactly("a[2] += b[2];", "b[3] += a[3];");
+
+        List<String> group4 = groups.get("myFun_rewriteGroup8");
+        assertThat(group4).hasSize(3);
+        assertThat(group4.get(0)).isEqualTo("a[0] += b[1];");
+        assertThat(group4.get(1)).isEqualTo("b[1] += a[1];");
+        assertThat(trimLines(group4.get(2)))
+                .isEqualTo(
+                        trimLines(
+                                "if (a.length < 100) {\n"
+                                        + "      
myFun_rewriteGroup0_1_rewriteGroup6(a, b);\n"
+                                        + "     \n"
+                                        + "      
myFun_rewriteGroup0_1_rewriteGroup7(a, b);\n"
+                                        + "    } else {\n"
+                                        + "      while (counter < 100) {\n"
+                                        + "          b[4] = b[4]++;\n"
+                                        + "          counter++;\n"
+                                        + "      }\n"
+                                        + "}"));
+
+        List<String> group5 = groups.get("myFun_rewriteGroup9");
+        assertThat(group5).containsExactly("a[5] += b[5];", "b[6] += a[6];");
+    }
+}
diff --git 
a/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/BlockStatementRewriterTest.java
 
b/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/BlockStatementRewriterTest.java
new file mode 100644
index 00000000000..f561478bc8d
--- /dev/null
+++ 
b/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/BlockStatementRewriterTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codesplit;
+
+import org.junit.jupiter.api.Test;
+
+/** Tests for {@link BlockStatementRewriterTest}. */
+class BlockStatementRewriterTest extends 
CodeRewriterTestBase<BlockStatementRewriter> {
+
+    public BlockStatementRewriterTest() {
+        super("block", code -> new BlockStatementRewriter(code, 20));
+    }
+
+    @Test
+    void testIfInsideWhileLoopRewrite() {
+        runTest("TestIfInsideWhileLoopRewrite");
+    }
+
+    @Test
+    void testWhileLoopInsideIfRewrite() {
+        runTest("TestWhileLoopInsideIfRewrite");
+    }
+
+    @Test
+    void testWhileLoopRewrite() {
+        runTest("TestWhileLoopRewrite");
+    }
+
+    @Test
+    void testIfStatementRewrite() {
+        runTest("TestIfStatementRewrite");
+    }
+
+    @Test
+    void testIfStatementRewrite1() {
+        runTest("TestIfStatementRewrite1");
+    }
+
+    @Test
+    void testIfStatementRewrite2() {
+        runTest("TestIfStatementRewrite2");
+    }
+
+    @Test
+    void testIfStatementRewrite3() {
+        runTest("TestIfStatementRewrite3");
+    }
+
+    @Test
+    void testIfMultipleSingleLineStatementRewrite() {
+        runTest("TestIfMultipleSingleLineStatementRewrite");
+    }
+
+    @Test
+    void testNotRewriteIfStatementInFunctionWithReturnValue() {
+        runTest("TestNotRewriteIfStatementInFunctionWithReturnValue");
+    }
+
+    @Test
+    void testRewriteInnerClass() {
+        runTest("TestRewriteInnerClass");
+    }
+
+    /**
+     * Check whether the given and expected classes are actually a valid Java 
code -> it compiles.
+     * If this test fails on "expected" files, it probably means that code 
split logic is invalid
+     * and an issue was missed when preparing test files.
+     */
+    @Test
+    void shouldCompileGivenAndExpectedCode() throws Exception {
+        CodeSplitTestUtil.tryCompile("block/code/");
+        CodeSplitTestUtil.tryCompile("block/expected/");
+    }
+}
diff --git 
a/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/BlockStatementSplitterTest.java
 
b/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/BlockStatementSplitterTest.java
new file mode 100644
index 00000000000..be00b1ffddc
--- /dev/null
+++ 
b/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/BlockStatementSplitterTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codesplit;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.table.codesplit.CodeSplitTestUtil.trimLines;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link BlockStatementSplitter}. */
+class BlockStatementSplitterTest {
+
+    private static final String GIVEN_WHILE_IF_BLOCK =
+            ""
+                    + " while (counter > 0) {\n"
+                    + "     int localA = a + 1000;\n"
+                    + "     System.out.println(localA);\n"
+                    + "     if (a > 0) {\n"
+                    + "         b = a * 2;\n"
+                    + "         c = b * 2;\n"
+                    + "         System.out.println(b);\n"
+                    + "     } else {\n"
+                    + "         b = a * 3;\n"
+                    + "         System.out.println(b);\n"
+                    + "     }\n"
+                    + "     counter--;\n"
+                    + " }";
+
+    private static final String EXPECTED_WHILE_IF_BLOCK =
+            ""
+                    + "while (counter > 0) {\n"
+                    + "  myFun_0_1(a, b, c);\n"
+                    + "  if (a > 0) {\n"
+                    + "    myFun_0_1_2(a, b, c);\n"
+                    + "  } else {\n"
+                    + "    myFun_0_1_3(a, b, c);\n"
+                    + "  }\n"
+                    + "  counter--;\n"
+                    + " }";
+
+    @Test
+    public void testWhileIfBlockExtract() {
+        String parameters = "a, b, c";
+        BlockStatementSplitter splitter =
+                new BlockStatementSplitter(GIVEN_WHILE_IF_BLOCK, parameters);
+
+        String rewriteBlock = splitter.rewriteBlock("myFun");
+        Map<String, List<String>> extractedBlocks = splitter.extractBlocks();
+
+        
assertThat(trimLines(rewriteBlock)).isEqualTo(trimLines(EXPECTED_WHILE_IF_BLOCK));
+        assertThat(extractedBlocks.size()).isEqualTo(3);
+        assertThat(extractedBlocks.get("myFun_0_1"))
+                .containsExactly("int localA = a + 1000;", 
"System.out.println(localA);");
+        assertThat(extractedBlocks.get("myFun_0_1_3"))
+                .containsExactly("b = a * 3;", "System.out.println(b);");
+        assertThat(extractedBlocks.get("myFun_0_1_2"))
+                .containsExactly("b = a * 2;", "c = b * 2;", 
"System.out.println(b);");
+    }
+}
diff --git 
a/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/CodeRewriterTestBase.java
 
b/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/CodeRewriterTestBase.java
index 65520e3fbd3..443f803d67d 100644
--- 
a/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/CodeRewriterTestBase.java
+++ 
b/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/CodeRewriterTestBase.java
@@ -22,6 +22,7 @@ import org.apache.flink.util.FileUtils;
 import java.io.File;
 import java.util.function.Function;
 
+import static org.apache.flink.table.codesplit.CodeSplitTestUtil.trimLines;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Base test class for {@link CodeRewriter}. */
@@ -53,9 +54,16 @@ abstract class CodeRewriterTestBase<R extends CodeRewriter> {
                                             .getResource(
                                                     resourceDir + "/expected/" 
+ filename + ".java")
                                             .toURI()));
+
             R rewriter = rewriterProvider.apply(code);
-            String actual = rewriter.rewrite();
-            assertThat(actual == null ? "" : actual).isEqualTo(expected);
+
+            // Trying to mitigate any indentation issues between all sort of 
platforms by simply
+            // trim every line of the "class". Before this change, 
code-splitter test could fail on
+            // Windows machines while passing on Unix.
+            expected = trimLines(expected);
+            String actual = trimLines(rewriter.rewrite());
+
+            assertThat(actual).isEqualTo(expected);
             return rewriter;
         } catch (Exception e) {
             throw new RuntimeException(e);
diff --git 
a/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/CodeSplitTestUtil.java
 
b/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/CodeSplitTestUtil.java
new file mode 100644
index 00000000000..f5b83514466
--- /dev/null
+++ 
b/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/CodeSplitTestUtil.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codesplit;
+
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.codehaus.commons.compiler.CompileException;
+import org.codehaus.janino.SimpleCompiler;
+import org.junit.jupiter.api.Assertions;
+
+import java.io.File;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Util class for code split tests. */
+public final class CodeSplitTestUtil {
+
+    /**
+     * A pattern matcher linebreak regexp that represents any Unicode 
linebreak sequence making it
+     * effectively equivalent to below codes.
+     *
+     * <pre>{@code
+     * 
&#92;u000D&#92;u000A|[&#92;u000A&#92;u000B&#92;u000C&#92;u000D&#92;u0085&#92;u2028&#92;u2029]
+     * }</pre>
+     */
+    public static final String UNIVERSAL_NEW_LINE_REGEXP = "\\R";
+
+    private CodeSplitTestUtil() {}
+
+    /**
+     * Trim every line of provided multiline String.
+     *
+     * @param multilineString multiline string which line should be trimmed.
+     * @return multiline string with trimmed lines.
+     */
+    public static String trimLines(String multilineString) {
+        if (StringUtils.isNullOrWhitespaceOnly(multilineString)) {
+            return "";
+        }
+
+        return Arrays.stream(multilineString.split(UNIVERSAL_NEW_LINE_REGEXP))
+                .map(String::trim)
+                .collect(Collectors.joining(System.lineSeparator()));
+    }
+
+    public static String readResource(String resourcePath) {
+        try {
+            return FileUtils.readFileUtf8(
+                    new File(
+                            BlockStatementGrouperTest.class
+                                    .getClassLoader()
+                                    .getResource(resourcePath)
+                                    .toURI()));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static void tryCompile(String baseResource) throws Exception {
+        URI baseUri =
+                
BlockStatementGrouperTest.class.getClassLoader().getResource(baseResource).toURI();
+
+        Map<String, String> expectedClasses =
+                Files.list(Paths.get(baseUri))
+                        .filter(path -> 
path.toUri().toString().endsWith(".java"))
+                        .map(
+                                path ->
+                                        Pair.of(
+                                                path.toUri().toString(),
+                                                CodeSplitTestUtil.readResource(
+                                                        baseResource + 
path.getFileName())))
+                        .collect(Collectors.toMap(Pair::getKey, 
Pair::getValue));
+
+        assertThat(expectedClasses).isNotEmpty();
+
+        for (Entry<String, String> entry : expectedClasses.entrySet()) {
+            String classFile = entry.getKey();
+            String code = entry.getValue();
+            try {
+                
CodeSplitTestUtil.tryCompile(CodeSplitTestUtil.class.getClassLoader(), code);
+            } catch (CompileException e) {
+                Assertions.fail(
+                        String.format(
+                                "Compilation for file [%s] failed with 
message: %s",
+                                classFile, e.getMessage()));
+            }
+        }
+    }
+
+    public static void tryCompile(ClassLoader cl, String code) throws 
CompileException {
+        checkNotNull(cl, "Classloader must not be null.");
+        SimpleCompiler compiler = new SimpleCompiler();
+        compiler.setParentClassLoader(cl);
+        compiler.cook(code);
+    }
+}
diff --git 
a/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/JavaCodeSplitterTest.java
 
b/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/JavaCodeSplitterTest.java
index 0072c51f0a7..22b7b6d5c64 100644
--- 
a/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/JavaCodeSplitterTest.java
+++ 
b/flink-table/flink-table-code-splitter/src/test/java/org/apache/flink/table/codesplit/JavaCodeSplitterTest.java
@@ -25,6 +25,7 @@ import org.junit.jupiter.api.Test;
 
 import java.io.File;
 
+import static org.apache.flink.table.codesplit.CodeSplitTestUtil.trimLines;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.assertj.core.api.HamcrestCondition.matching;
@@ -88,6 +89,17 @@ class JavaCodeSplitterTest {
                 .hasMessage("maxClassMemberCount must be greater than 0");
     }
 
+    /**
+     * Check whether the given and expected classes are actually a valid Java 
code -> it compiles.
+     * If this test fails on "expected" files, it probably means that code 
split logic is invalid
+     * and an issue was missed when preparing test files.
+     */
+    @Test
+    void shouldCompileGivenAndExpectedCode() throws Exception {
+        CodeSplitTestUtil.tryCompile("splitter/code/");
+        CodeSplitTestUtil.tryCompile("splitter/expected/");
+    }
+
     private void runTest(String filename, int maxLength, int maxMembers) {
         try {
             String code =
@@ -104,7 +116,14 @@ class JavaCodeSplitterTest {
                                             .getClassLoader()
                                             .getResource("splitter/expected/" 
+ filename + ".java")
                                             .toURI()));
-            assertThat(JavaCodeSplitter.split(code, maxLength, 
maxMembers)).isEqualTo(expected);
+
+            // Trying to mitigate any indentation issues between all sort of 
platforms by simply
+            // trim every line of the "class". Before this change, 
code-splitter test could fail on
+            // Windows machines while passing on Unix.
+            expected = trimLines(expected);
+            String actual = JavaCodeSplitter.split(code, maxLength, 
maxMembers);
+
+            assertThat(trimLines(actual)).isEqualTo(expected);
         } catch (Exception e) {
             throw new RuntimeException(e);
         } finally {
diff --git 
a/flink-table/flink-table-code-splitter/src/test/resources/block/code/TestIfInsideWhileLoopRewrite.java
 
b/flink-table/flink-table-code-splitter/src/test/resources/block/code/TestIfInsideWhileLoopRewrite.java
new file mode 100644
index 00000000000..a424d86add4
--- /dev/null
+++ 
b/flink-table/flink-table-code-splitter/src/test/resources/block/code/TestIfInsideWhileLoopRewrite.java
@@ -0,0 +1,100 @@
+public class TestIfInsideWhileLoopRewrite {
+
+    int counter = 0;
+
+    public void myFun(int[] a, int[] b, int[] c ) throws RuntimeException {
+
+        a[0] += b[1];
+        b[1] += a[1];
+        while (counter < 10) {
+            c[counter] = a[0] + 1000;
+            System.out.println(c);
+            if (a[counter] > 0) {
+                b[counter] = a[counter] * 2;
+                c[counter] = b[counter] * 2;
+                System.out.println(b[counter]);
+            } else {
+                b[counter] = a[counter] * 3;
+                System.out.println(b[counter]);
+            }
+
+            a[2] += b[2];
+            b[3] += a[3];
+            if (a[0] > 0) {
+                System.out.println("Hello");
+            } else {
+                System.out.println("World");
+            }
+
+            counter--;
+        }
+
+        a[4] += b[4];
+        b[5] += a[5];
+    }
+
+    public void myFun2(int[] a, int[] b, int[] c) throws RuntimeException {
+
+        a[0] += b[1];
+        b[1] += a[1];
+        while (counter < 10) {
+            c[counter] = a[0] + 1000;
+            System.out.println(c);
+            if (a[counter] > 0) {
+                b[counter] = a[counter] * 2;
+                c[counter] = b[counter] * 2;
+                System.out.println(b[counter]);
+            } else {
+                b[counter] = a[counter] * 3;
+                System.out.println(b[counter]);
+            }
+
+            a[2] += b[2];
+            b[3] += a[3];
+            if (a[0] > 0) {
+                System.out.println("Hello");
+            } else {
+                System.out.println("World");
+                break;
+            }
+
+            counter--;
+        }
+
+        a[4] += b[4];
+        b[5] += a[5];
+    }
+
+    public void myFun3(int[] a, int[] b, int[] c) throws RuntimeException {
+
+        a[0] += b[1];
+        b[1] += a[1];
+        while (counter < 10) {
+            c[counter] = a[0] + 1000;
+            System.out.println(c);
+            if (a[counter] > 0) {
+                b[counter] = a[counter] * 2;
+                c[counter] = b[counter] * 2;
+                System.out.println(b[counter]);
+            } else {
+                b[counter] = a[counter] * 3;
+                System.out.println(b[counter]);
+                continue;
+            }
+
+            a[2] += b[2];
+            b[3] += a[3];
+            if (a[0] > 0) {
+                System.out.println("Hello");
+            } else {
+                System.out.println("World");
+                break;
+            }
+
+            counter--;
+        }
+
+        a[4] += b[4];
+        b[5] += a[5];
+    }
+}
diff --git 
a/flink-table/flink-table-code-splitter/src/test/resources/block/code/TestIfMultipleSingleLineStatementRewrite.java
 
b/flink-table/flink-table-code-splitter/src/test/resources/block/code/TestIfMultipleSingleLineStatementRewrite.java
new file mode 100644
index 00000000000..74462ff89b9
--- /dev/null
+++ 
b/flink-table/flink-table-code-splitter/src/test/resources/block/code/TestIfMultipleSingleLineStatementRewrite.java
@@ -0,0 +1,22 @@
+public class TestIfMultipleSingleLineStatementRewrite {
+    public void myFun1(int[] a, int[] b) throws RuntimeException {
+        if (a[0] == 0) {
+            a[11] = b[0];
+            a[12] = b[0];
+            if (a[2] == 0) {
+                a[21] = 1;
+                a[22] = 1;
+            } else {
+                a[23] = b[2];
+                a[24] = b[2];
+            }
+
+            a[13] = b[0];
+            a[14] = b[0];
+        } else {
+            a[0] = b[0];
+            a[1] = b[1];
+            a[2] = b[2];
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-code-splitter/src/test/resources/if/code/TestIfStatementRewrite.java
 
b/flink-table/flink-table-code-splitter/src/test/resources/block/code/TestIfStatementRewrite.java
similarity index 100%
rename from 
flink-table/flink-table-code-splitter/src/test/resources/if/code/TestIfStatementRewrite.java
rename to 
flink-table/flink-table-code-splitter/src/test/resources/block/code/TestIfStatementRewrite.java
diff --git 
a/flink-table/flink-table-code-splitter/src/test/resources/block/code/TestIfStatementRewrite1.java
 
b/flink-table/flink-table-code-splitter/src/test/resources/block/code/TestIfStatementRewrite1.java
new file mode 100644
index 00000000000..e8c7150795f
--- /dev/null
+++ 
b/flink-table/flink-table-code-splitter/src/test/resources/block/code/TestIfStatementRewrite1.java
@@ -0,0 +1,34 @@
+public class TestIfStatementRewrite1 {
+    public void myFun1(int[] a, int[] b) throws RuntimeException {
+        if (a[0] == 0) {
+            System.out.println("0");
+            System.out.println("0");
+            if (a[1] == 0) {
+                System.out.println("1");
+                System.out.println("2");
+                if (a[2] == 0) {
+                    a[2] = 1;
+                    a[22] = 1;
+                } else {
+                    a[2] = b[2];
+                    a[22] = b[2];
+                }
+            } else {
+                a[1] = b[1];
+                a[2] = b[2];
+            }
+        } else {
+            System.out.println("3");
+            System.out.println("3");
+            if (a[1] == 1) {
+                a[0] = b[0];
+                a[1] = b[1];
+                a[2] = b[2];
+            } else {
+                a[0] = 2 * b[0];
+                a[1] = 2 * b[1];
+                a[2] = 2 * b[2];
+            }
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-code-splitter/src/test/resources/block/code/TestIfStatementRewrite2.java
 
b/flink-table/flink-table-code-splitter/src/test/resources/block/code/TestIfStatementRewrite2.java
new file mode 100644
index 00000000000..6c35f77ca0e
--- /dev/null
+++ 
b/flink-table/flink-table-code-splitter/src/test/resources/block/code/TestIfStatementRewrite2.java
@@ -0,0 +1,32 @@
+public class TestIfStatementRewrite2 {
+    public void myFun1(int[] a, int[] b) throws RuntimeException {
+        if (a[0] == 0) {
+            a[0] = 1;
+            if (a[1] == 0) {
+                a[1] = 1;
+                if (a[2] == 0) {
+                    a[2] = 1;
+                } else if (a[3] == 0) {
+                    a[3] = b[3];
+                    a[33] = b[33];
+                } else if (a[4] == 0) {
+                    a[4] = b[4];
+                    a[44] = b[44];
+                } else {
+                    a[5] = 5;
+                    System.out.println("nothing");
+                }
+            } else {
+                a[1] = b[1];
+                a[2] = b[2];
+            }
+        } else if(a[1] == 22) {
+            a[1] = b[12];
+            a[2] = b[22];
+        } else {
+            a[0] = b[0];
+            a[1] = b[1];
+            a[2] = b[2];
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-code-splitter/src/test/resources/block/code/TestIfStatementRewrite3.java
 
b/flink-table/flink-table-code-splitter/src/test/resources/block/code/TestIfStatementRewrite3.java
new file mode 100644
index 00000000000..4d42f78eec6
--- /dev/null
+++ 
b/flink-table/flink-table-code-splitter/src/test/resources/block/code/TestIfStatementRewrite3.java
@@ -0,0 +1,21 @@
+public class TestIfStatementRewrite3 {
+    public void myFun1(int[] a, int[] b) throws RuntimeException {
+        if (a[0] == 0) {
+            a[0] = 1;
+            a[1] = 1;
+        } else if (a[1] == 22) {
+            a[1] = b[12];
+            a[2] = b[22];
+        } else if (a[3] == 0) {
+            a[3] = b[3];
+            a[33] = b[33];
+        } else if (a[4] == 0) {
+            a[4] = b[4];
+            a[44] = b[44];
+        } else {
+            a[0] = b[0];
+            a[1] = b[1];
+            a[2] = b[2];
+        }
+    }
+}
diff --git 
a/flink-table/flink-table-code-splitter/src/test/resources/if/code/TestNotRewriteIfStatementInFunctionWithReturnValue.java
 
b/flink-table/flink-table-code-splitter/src/test/resources/block/code/TestNotRewriteIfStatementInFunctionWithReturnValue.java
similarity index 100%
rename from 
flink-table/flink-table-code-splitter/src/test/resources/if/code/TestNotRewriteIfStatementInFunctionWithReturnValue.java
rename to 
flink-table/flink-table-code-splitter/src/test/resources/block/code/TestNotRewriteIfStatementInFunctionWithReturnValue.java
diff --git 
a/flink-table/flink-table-code-splitter/src/test/resources/if/code/TestRewriteInnerClass.java
 
b/flink-table/flink-table-code-splitter/src/test/resources/block/code/TestRewriteInnerClass.java
similarity index 100%
copy from 
flink-table/flink-table-code-splitter/src/test/resources/if/code/TestRewriteInnerClass.java
copy to 
flink-table/flink-table-code-splitter/src/test/resources/block/code/TestRewriteInnerClass.java
diff --git 
a/flink-table/flink-table-code-splitter/src/test/resources/block/code/TestWhileLoopInsideIfRewrite.java
 
b/flink-table/flink-table-code-splitter/src/test/resources/block/code/TestWhileLoopInsideIfRewrite.java
new file mode 100644
index 00000000000..3f830b6bcca
--- /dev/null
+++ 
b/flink-table/flink-table-code-splitter/src/test/resources/block/code/TestWhileLoopInsideIfRewrite.java
@@ -0,0 +1,46 @@
+public class TestWhileLoopInsideIfRewrite {
+
+    int counter = 0;
+
+    public void myFun(int[] a, int[] b, int[] c) {
+
+        a[0] += b[1];
+        b[1] += a[1];
+        if (a.length < 100) {
+            while (counter < 10) {
+                c[counter] = a[0] + 1000;
+                System.out.println(c);
+                if (a[counter] > 0) {
+                    b[counter] = a[counter] * 2;
+                    c[counter] = b[counter] * 2;
+                    System.out.println(b[counter]);
+                } else {
+                    b[counter] = a[counter] * 3;
+                    System.out.println(b[counter]);
+                }
+
+                a[2] += b[2];
+                b[3] += a[3];
+                if (a[0] > 0) {
+                    System.out.println("Hello");
+                } else {
+                    System.out.println("World");
+                }
+
+                counter--;
+                System.out.println("World ffff");
+            }
+        } else {
+            while (counter < 10) {
+                b[counter] = b[counter]++;
+                counter++;
+            }
+
+            System.out.println("World Else");
+            System.out.println("World Else 2");
+        }
+
+        a[4] += b[4];
+        b[5] += a[5];
+    }
+}
diff --git 
a/flink-table/flink-table-code-splitter/src/test/resources/block/code/TestWhileLoopRewrite.java
 
b/flink-table/flink-table-code-splitter/src/test/resources/block/code/TestWhileLoopRewrite.java
new file mode 100644
index 00000000000..b0c88654892
--- /dev/null
+++ 
b/flink-table/flink-table-code-splitter/src/test/resources/block/code/TestWhileLoopRewrite.java
@@ -0,0 +1,36 @@
+public class TestWhileLoopRewrite {
+
+    int counter = 0;
+
+    public void myFun(int[] a, int[] b, int[] c) {
+
+        a[0] += b[1];
+        b[1] += a[1];
+
+        while (counter < 10) {
+            c[counter] = a[0] + 1000;
+            System.out.println(c);
+            if (a[counter] > 0) {
+                b[counter] = a[counter] * 2;
+                c[counter] = b[counter] * 2;
+                System.out.println(b[counter]);
+            } else {
+                b[counter] = a[counter] * 3;
+                System.out.println(b[counter]);
+            }
+
+            a[2] += b[2];
+            b[3] += a[3];
+            if (a[0] > 0) {
+                System.out.println("Hello");
+            } else {
+                System.out.println("World");
+            }
+
+            counter--;
+        }
+
+        a[4] += b[4];
+        b[5] += a[5];
+    }
+}
diff --git 
a/flink-table/flink-table-code-splitter/src/test/resources/block/expected/TestIfInsideWhileLoopRewrite.java
 
b/flink-table/flink-table-code-splitter/src/test/resources/block/expected/TestIfInsideWhileLoopRewrite.java
new file mode 100644
index 00000000000..9c19c3510c5
--- /dev/null
+++ 
b/flink-table/flink-table-code-splitter/src/test/resources/block/expected/TestIfInsideWhileLoopRewrite.java
@@ -0,0 +1,134 @@
+public class TestIfInsideWhileLoopRewrite {
+
+    int counter = 0;
+
+    public void myFun(int[] a, int[] b, int[] c ) throws RuntimeException {
+
+        a[0] += b[1];
+        b[1] += a[1];
+
+        while (counter < 10) {
+            myFun_0_rewriteGroup2(a, b, c);
+
+            myFun_0_rewriteGroup4(a, b, c);
+
+            counter--;
+        }
+
+        a[4] += b[4];
+        b[5] += a[5];
+    }
+
+    void myFun_0_1(int[] a, int[] b, int[] c ) throws RuntimeException {
+        c[counter] = a[0] + 1000;
+        System.out.println(c);
+    }
+
+    void myFun_0_1_2(int[] a, int[] b, int[] c ) throws RuntimeException {
+        b[counter] = a[counter] * 2;
+        c[counter] = b[counter] * 2;
+        System.out.println(b[counter]);
+    }
+
+    void myFun_0_rewriteGroup4(int[] a, int[] b, int[] c ) throws 
RuntimeException {
+        myFun_0_4(a, b, c);
+        if (a[0] > 0) {
+            System.out.println("Hello");
+        } else {
+            System.out.println("World");
+        }
+    }
+
+    void myFun_0_1_3(int[] a, int[] b, int[] c ) throws RuntimeException {
+        b[counter] = a[counter] * 3;
+        System.out.println(b[counter]);
+    }
+
+    void myFun_0_rewriteGroup2(int[] a, int[] b, int[] c ) throws 
RuntimeException {
+        myFun_0_1(a, b, c);
+        if (a[counter] > 0) {
+            myFun_0_1_2(a, b, c);
+        } else {
+            myFun_0_1_3(a, b, c);
+        }
+    }
+
+    void myFun_0_4(int[] a, int[] b, int[] c ) throws RuntimeException {
+        a[2] += b[2];
+        b[3] += a[3];
+    }
+
+
+    public void myFun2(int[] a, int[] b, int[] c) throws RuntimeException {
+
+        a[0] += b[1];
+        b[1] += a[1];
+
+        while (counter < 10) {
+            c[counter] = a[0] + 1000;
+            System.out.println(c);
+            if (a[counter] > 0) {
+                b[counter] = a[counter] * 2;
+                c[counter] = b[counter] * 2;
+                System.out.println(b[counter]);
+            } else {
+                b[counter] = a[counter] * 3;
+                System.out.println(b[counter]);
+            }
+
+            a[2] += b[2];
+            b[3] += a[3];
+            if (a[0] > 0) {
+                System.out.println("Hello");
+            } else {
+                System.out.println("World");
+                break;
+            }
+
+            counter--;
+        }
+
+        a[4] += b[4];
+        b[5] += a[5];
+    }
+
+
+
+
+    public void myFun3(int[] a, int[] b, int[] c) throws RuntimeException {
+
+        a[0] += b[1];
+        b[1] += a[1];
+
+        while (counter < 10) {
+            c[counter] = a[0] + 1000;
+            System.out.println(c);
+            if (a[counter] > 0) {
+                b[counter] = a[counter] * 2;
+                c[counter] = b[counter] * 2;
+                System.out.println(b[counter]);
+            } else {
+                b[counter] = a[counter] * 3;
+                System.out.println(b[counter]);
+                continue;
+            }
+
+            a[2] += b[2];
+            b[3] += a[3];
+            if (a[0] > 0) {
+                System.out.println("Hello");
+            } else {
+                System.out.println("World");
+                break;
+            }
+
+            counter--;
+        }
+
+        a[4] += b[4];
+        b[5] += a[5];
+    }
+
+
+
+}
diff --git 
a/flink-table/flink-table-code-splitter/src/test/resources/block/expected/TestIfMultipleSingleLineStatementRewrite.java
 
b/flink-table/flink-table-code-splitter/src/test/resources/block/expected/TestIfMultipleSingleLineStatementRewrite.java
new file mode 100644
index 00000000000..6fd81164480
--- /dev/null
+++ 
b/flink-table/flink-table-code-splitter/src/test/resources/block/expected/TestIfMultipleSingleLineStatementRewrite.java
@@ -0,0 +1,48 @@
+public class TestIfMultipleSingleLineStatementRewrite {
+    public void myFun1(int[] a, int[] b) throws RuntimeException {
+
+        if (a[0] == 0) {
+            myFun1_0_rewriteGroup2(a, b);
+
+            myFun1_0(a, b);
+        } else {
+            myFun1_4(a, b);
+        }
+    }
+
+    void myFun1_0_rewriteGroup2(int[] a, int[] b) throws RuntimeException {
+        myFun1_0_1(a, b);
+        if (a[2] == 0) {
+            myFun1_0_1_2(a, b);
+        } else {
+            myFun1_0_1_3(a, b);
+        }
+    }
+
+    void myFun1_0_1_2(int[] a, int[] b) throws RuntimeException {
+        a[21] = 1;
+        a[22] = 1;
+    }
+
+    void myFun1_0_1_3(int[] a, int[] b) throws RuntimeException {
+        a[23] = b[2];
+        a[24] = b[2];
+    }
+
+    void myFun1_4(int[] a, int[] b) throws RuntimeException {
+        a[0] = b[0];
+        a[1] = b[1];
+        a[2] = b[2];
+    }
+
+    void myFun1_0_1(int[] a, int[] b) throws RuntimeException {
+        a[11] = b[0];
+        a[12] = b[0];
+    }
+
+    void myFun1_0(int[] a, int[] b) throws RuntimeException {
+        a[13] = b[0];
+        a[14] = b[0];
+    }
+
+}
diff --git 
a/flink-table/flink-table-code-splitter/src/test/resources/block/expected/TestIfStatementRewrite.java
 
b/flink-table/flink-table-code-splitter/src/test/resources/block/expected/TestIfStatementRewrite.java
new file mode 100644
index 00000000000..47a21fd7826
--- /dev/null
+++ 
b/flink-table/flink-table-code-splitter/src/test/resources/block/expected/TestIfStatementRewrite.java
@@ -0,0 +1,68 @@
+public class TestIfStatementRewrite {
+    public void myFun1(int[] a, int[] b) throws RuntimeException {
+
+        if (a[0] == 0) {
+            myFun1_0_rewriteGroup5(a, b);
+        } else {
+            myFun1_7(a, b);
+        }
+    }
+
+    void myFun1_0_rewriteGroup5(int[] a, int[] b) throws RuntimeException {
+        a[0] = 1;
+        if (a[1] == 0) {
+            myFun1_0_rewriteGroup1_2_rewriteGroup4(a, b);
+        } else {
+            myFun1_0_1_6(a, b);
+        }
+    }
+
+    void myFun1_0_1_6(int[] a, int[] b) throws RuntimeException {
+        a[1] = b[1];
+        a[2] = b[2];
+    }
+
+    void myFun1_7(int[] a, int[] b) throws RuntimeException {
+        a[0] = b[0];
+        a[1] = b[1];
+        a[2] = b[2];
+    }
+
+    void myFun1_0_rewriteGroup1_2_rewriteGroup4(int[] a, int[] b) throws 
RuntimeException {
+        a[1] = 1;
+        if (a[2] == 0) {
+            a[2] = 1;
+        } else {
+            a[2] = b[2];
+        }
+    }
+
+
+    public void myFun2(int[] a, int[] b) throws RuntimeException {
+
+        if (a[0] == 0) {
+            a[0] = 1;
+            if (a[1] == 0) {
+                a[1] = 1;
+                if (a[2] == 0) {
+                    a[2] = 1;
+                } else {
+                    a[2] = b[2];
+                }
+            } else {
+                a[1] = b[1];
+                a[2] = b[2];
+                return;
+            }
+        } else {
+            myFun2_0_rewriteGroup1(a, b);
+            a[2] = b[2];
+        }
+    }
+
+    void myFun2_0_rewriteGroup1(int[] a, int[] b) throws RuntimeException {
+        a[0] = b[0];
+        a[1] = b[1];
+    }
+
+}
diff --git 
a/flink-table/flink-table-code-splitter/src/test/resources/block/expected/TestIfStatementRewrite1.java
 
b/flink-table/flink-table-code-splitter/src/test/resources/block/expected/TestIfStatementRewrite1.java
new file mode 100644
index 00000000000..293d5b58e47
--- /dev/null
+++ 
b/flink-table/flink-table-code-splitter/src/test/resources/block/expected/TestIfStatementRewrite1.java
@@ -0,0 +1,80 @@
+public class TestIfStatementRewrite1 {
+    public void myFun1(int[] a, int[] b) throws RuntimeException {
+
+        if (a[0] == 0) {
+            myFun1_0_rewriteGroup5(a, b);
+        } else {
+            myFun1_6_rewriteGroup8(a, b);
+        }
+    }
+
+    void myFun1_7_8_9(int[] a, int[] b) throws RuntimeException {
+        a[0] = b[0];
+        a[1] = b[1];
+        a[2] = b[2];
+    }
+
+    void myFun1_6_rewriteGroup8(int[] a, int[] b) throws RuntimeException {
+        myFun1_7_8(a, b);
+        if (a[1] == 1) {
+            myFun1_7_8_9(a, b);
+        } else {
+            myFun1_7_8_10(a, b);
+        }
+    }
+
+    void myFun1_7_8(int[] a, int[] b) throws RuntimeException {
+        System.out.println("3");
+        System.out.println("3");
+    }
+
+    void myFun1_7_8_10(int[] a, int[] b) throws RuntimeException {
+        a[0] = 2 * b[0];
+        a[1] = 2 * b[1];
+        a[2] = 2 * b[2];
+    }
+
+    void myFun1_0_rewriteGroup5(int[] a, int[] b) throws RuntimeException {
+        myFun1_0_1(a, b);
+        if (a[1] == 0) {
+            myFun1_0_rewriteGroup1_2_rewriteGroup4(a, b);
+        } else {
+            myFun1_0_1_6(a, b);
+        }
+    }
+
+    void myFun1_0_rewriteGroup1_2_rewriteGroup4(int[] a, int[] b) throws 
RuntimeException {
+        myFun1_0_1_2_3(a, b);
+        if (a[2] == 0) {
+            myFun1_0_1_2_3_4(a, b);
+        } else {
+            myFun1_0_1_2_3_5(a, b);
+        }
+    }
+
+    void myFun1_0_1_2_3(int[] a, int[] b) throws RuntimeException {
+        System.out.println("1");
+        System.out.println("2");
+    }
+
+    void myFun1_0_1_2_3_5(int[] a, int[] b) throws RuntimeException {
+        a[2] = b[2];
+        a[22] = b[2];
+    }
+
+    void myFun1_0_1(int[] a, int[] b) throws RuntimeException {
+        System.out.println("0");
+        System.out.println("0");
+    }
+
+    void myFun1_0_1_6(int[] a, int[] b) throws RuntimeException {
+        a[1] = b[1];
+        a[2] = b[2];
+    }
+
+    void myFun1_0_1_2_3_4(int[] a, int[] b) throws RuntimeException {
+        a[2] = 1;
+        a[22] = 1;
+    }
+
+}
diff --git 
a/flink-table/flink-table-code-splitter/src/test/resources/block/expected/TestIfStatementRewrite2.java
 
b/flink-table/flink-table-code-splitter/src/test/resources/block/expected/TestIfStatementRewrite2.java
new file mode 100644
index 00000000000..8e8baf37c00
--- /dev/null
+++ 
b/flink-table/flink-table-code-splitter/src/test/resources/block/expected/TestIfStatementRewrite2.java
@@ -0,0 +1,66 @@
+public class TestIfStatementRewrite2 {
+    public void myFun1(int[] a, int[] b) throws RuntimeException {
+
+        if (a[0] == 0) {
+            myFun1_0_rewriteGroup5(a, b);
+        } else if(a[1] == 22) {
+            myFun1_11_12(a, b);
+        } else {
+            myFun1_11_13(a, b);
+        }
+    }
+
+    void myFun1_11_13(int[] a, int[] b) throws RuntimeException {
+        a[0] = b[0];
+        a[1] = b[1];
+        a[2] = b[2];
+    }
+
+    void myFun1_0_1_2_3_5_7_9(int[] a, int[] b) throws RuntimeException {
+        a[5] = 5;
+        System.out.println("nothing");
+    }
+
+    void myFun1_0_rewriteGroup5(int[] a, int[] b) throws RuntimeException {
+        a[0] = 1;
+        if (a[1] == 0) {
+            myFun1_0_rewriteGroup1_2_rewriteGroup4(a, b);
+        } else {
+            myFun1_0_1_10(a, b);
+        }
+    }
+
+    void myFun1_0_rewriteGroup1_2_rewriteGroup4(int[] a, int[] b) throws 
RuntimeException {
+        a[1] = 1;
+        if (a[2] == 0) {
+            a[2] = 1;
+        } else if (a[3] == 0) {
+            myFun1_0_1_2_3_5_6(a, b);
+        } else if (a[4] == 0) {
+            myFun1_0_1_2_3_5_7_8(a, b);
+        } else {
+            myFun1_0_1_2_3_5_7_9(a, b);
+        }
+    }
+
+    void myFun1_0_1_2_3_5_7_8(int[] a, int[] b) throws RuntimeException {
+        a[4] = b[4];
+        a[44] = b[44];
+    }
+
+    void myFun1_0_1_10(int[] a, int[] b) throws RuntimeException {
+        a[1] = b[1];
+        a[2] = b[2];
+    }
+
+    void myFun1_0_1_2_3_5_6(int[] a, int[] b) throws RuntimeException {
+        a[3] = b[3];
+        a[33] = b[33];
+    }
+
+    void myFun1_11_12(int[] a, int[] b) throws RuntimeException {
+        a[1] = b[12];
+        a[2] = b[22];
+    }
+
+}
diff --git 
a/flink-table/flink-table-code-splitter/src/test/resources/block/expected/TestIfStatementRewrite3.java
 
b/flink-table/flink-table-code-splitter/src/test/resources/block/expected/TestIfStatementRewrite3.java
new file mode 100644
index 00000000000..160eb872f21
--- /dev/null
+++ 
b/flink-table/flink-table-code-splitter/src/test/resources/block/expected/TestIfStatementRewrite3.java
@@ -0,0 +1,43 @@
+public class TestIfStatementRewrite3 {
+    public void myFun1(int[] a, int[] b) throws RuntimeException {
+
+        if (a[0] == 0) {
+            myFun1_0(a, b);
+        } else if (a[1] == 22) {
+            myFun1_1_2(a, b);
+        } else if (a[3] == 0) {
+            myFun1_1_3_4(a, b);
+        } else if (a[4] == 0) {
+            myFun1_1_3_5_6(a, b);
+        } else {
+            myFun1_1_3_5_7(a, b);
+        }
+    }
+
+    void myFun1_1_3_5_6(int[] a, int[] b) throws RuntimeException {
+        a[4] = b[4];
+        a[44] = b[44];
+    }
+
+    void myFun1_1_3_4(int[] a, int[] b) throws RuntimeException {
+        a[3] = b[3];
+        a[33] = b[33];
+    }
+
+    void myFun1_0(int[] a, int[] b) throws RuntimeException {
+        a[0] = 1;
+        a[1] = 1;
+    }
+
+    void myFun1_1_3_5_7(int[] a, int[] b) throws RuntimeException {
+        a[0] = b[0];
+        a[1] = b[1];
+        a[2] = b[2];
+    }
+
+    void myFun1_1_2(int[] a, int[] b) throws RuntimeException {
+        a[1] = b[12];
+        a[2] = b[22];
+    }
+
+}
diff --git 
a/flink-table/flink-table-code-splitter/src/test/resources/if/expected/TestNotRewriteIfStatementInFunctionWithReturnValue.java
 
b/flink-table/flink-table-code-splitter/src/test/resources/block/expected/TestNotRewriteIfStatementInFunctionWithReturnValue.java
similarity index 100%
rename from 
flink-table/flink-table-code-splitter/src/test/resources/if/expected/TestNotRewriteIfStatementInFunctionWithReturnValue.java
rename to 
flink-table/flink-table-code-splitter/src/test/resources/block/expected/TestNotRewriteIfStatementInFunctionWithReturnValue.java
diff --git 
a/flink-table/flink-table-code-splitter/src/test/resources/if/code/TestRewriteInnerClass.java
 
b/flink-table/flink-table-code-splitter/src/test/resources/block/expected/TestRewriteInnerClass.java
similarity index 50%
rename from 
flink-table/flink-table-code-splitter/src/test/resources/if/code/TestRewriteInnerClass.java
rename to 
flink-table/flink-table-code-splitter/src/test/resources/block/expected/TestRewriteInnerClass.java
index 026e4511011..ce0953da651 100644
--- 
a/flink-table/flink-table-code-splitter/src/test/resources/if/code/TestRewriteInnerClass.java
+++ 
b/flink-table/flink-table-code-splitter/src/test/resources/block/expected/TestRewriteInnerClass.java
@@ -1,23 +1,43 @@
 public class TestRewriteInnerClass {
     public void myFun(int[] a, int[] b) {
+
         if (a[0] == 0) {
-            a[0] += b[0];
-            a[1] += b[1];
+            myFun_0(a, b);
         } else {
-            a[0] += b[1];
-            a[1] += b[0];
+            myFun_1(a, b);
         }
     }
 
+    void myFun_0(int[] a, int[] b) {
+        a[0] += b[0];
+        a[1] += b[1];
+    }
+
+    void myFun_1(int[] a, int[] b) {
+        a[0] += b[1];
+        a[1] += b[0];
+    }
+
+
     public class InnerClass {
         public void myFun(int[] a, int[] b) {
+
             if (a[0] == 0) {
-                a[0] += b[0];
-                a[1] += b[1];
+                myFun_0(a, b);
             } else {
-                a[0] += b[1];
-                a[1] += b[0];
+                myFun_1(a, b);
             }
         }
+
+        void myFun_0(int[] a, int[] b) {
+            a[0] += b[0];
+            a[1] += b[1];
+        }
+
+        void myFun_1(int[] a, int[] b) {
+            a[0] += b[1];
+            a[1] += b[0];
+        }
+
     }
 }
diff --git 
a/flink-table/flink-table-code-splitter/src/test/resources/block/expected/TestWhileLoopInsideIfRewrite.java
 
b/flink-table/flink-table-code-splitter/src/test/resources/block/expected/TestWhileLoopInsideIfRewrite.java
new file mode 100644
index 00000000000..1b835d54206
--- /dev/null
+++ 
b/flink-table/flink-table-code-splitter/src/test/resources/block/expected/TestWhileLoopInsideIfRewrite.java
@@ -0,0 +1,80 @@
+public class TestWhileLoopInsideIfRewrite {
+
+    int counter = 0;
+
+    public void myFun(int[] a, int[] b, int[] c) {
+
+        a[0] += b[1];
+        b[1] += a[1];
+
+        if (a.length < 100) {
+            while (counter < 10) {
+                myFun_0_1_2_3(a, b, c);
+                if (a[counter] > 0) {
+                    myFun_0_1_2_3_4(a, b, c);
+                } else {
+                    myFun_0_1_2_3_5(a, b, c);
+                }
+
+                myFun_0_1_2_6(a, b, c);
+                if (a[0] > 0) {
+                    System.out.println("Hello");
+                } else {
+                    System.out.println("World");
+                }
+
+                myFun_0_1_2(a, b, c);
+            }
+        } else {
+            myFun_0_rewriteGroup2(a, b, c);
+
+            myFun_9(a, b, c);
+        }
+
+        a[4] += b[4];
+        b[5] += a[5];
+    }
+
+    void myFun_0_rewriteGroup2(int[] a, int[] b, int[] c) {
+        while (counter < 10) {
+            myFun_9_10_11(a, b, c);
+        }
+    }
+
+    void myFun_9_10_11(int[] a, int[] b, int[] c) {
+        b[counter] = b[counter]++;
+        counter++;
+    }
+
+    void myFun_0_1_2_6(int[] a, int[] b, int[] c) {
+        a[2] += b[2];
+        b[3] += a[3];
+    }
+
+    void myFun_0_1_2(int[] a, int[] b, int[] c) {
+        counter--;
+        System.out.println("World ffff");
+    }
+
+    void myFun_0_1_2_3(int[] a, int[] b, int[] c) {
+        c[counter] = a[0] + 1000;
+        System.out.println(c);
+    }
+
+    void myFun_0_1_2_3_5(int[] a, int[] b, int[] c) {
+        b[counter] = a[counter] * 3;
+        System.out.println(b[counter]);
+    }
+
+    void myFun_0_1_2_3_4(int[] a, int[] b, int[] c) {
+        b[counter] = a[counter] * 2;
+        c[counter] = b[counter] * 2;
+        System.out.println(b[counter]);
+    }
+
+    void myFun_9(int[] a, int[] b, int[] c) {
+        System.out.println("World Else");
+        System.out.println("World Else 2");
+    }
+
+}
diff --git 
a/flink-table/flink-table-code-splitter/src/test/resources/block/expected/TestWhileLoopRewrite.java
 
b/flink-table/flink-table-code-splitter/src/test/resources/block/expected/TestWhileLoopRewrite.java
new file mode 100644
index 00000000000..f3c33b4ce33
--- /dev/null
+++ 
b/flink-table/flink-table-code-splitter/src/test/resources/block/expected/TestWhileLoopRewrite.java
@@ -0,0 +1,62 @@
+public class TestWhileLoopRewrite {
+
+    int counter = 0;
+
+    public void myFun(int[] a, int[] b, int[] c) {
+
+        a[0] += b[1];
+        b[1] += a[1];
+
+
+        while (counter < 10) {
+            myFun_0_rewriteGroup2(a, b, c);
+
+            myFun_0_rewriteGroup4(a, b, c);
+
+            counter--;
+        }
+
+        a[4] += b[4];
+        b[5] += a[5];
+    }
+
+    void myFun_0_1(int[] a, int[] b, int[] c) {
+        c[counter] = a[0] + 1000;
+        System.out.println(c);
+    }
+
+    void myFun_0_1_2(int[] a, int[] b, int[] c) {
+        b[counter] = a[counter] * 2;
+        c[counter] = b[counter] * 2;
+        System.out.println(b[counter]);
+    }
+
+    void myFun_0_rewriteGroup4(int[] a, int[] b, int[] c) {
+        myFun_0_4(a, b, c);
+        if (a[0] > 0) {
+            System.out.println("Hello");
+        } else {
+            System.out.println("World");
+        }
+    }
+
+    void myFun_0_1_3(int[] a, int[] b, int[] c) {
+        b[counter] = a[counter] * 3;
+        System.out.println(b[counter]);
+    }
+
+    void myFun_0_rewriteGroup2(int[] a, int[] b, int[] c) {
+        myFun_0_1(a, b, c);
+        if (a[counter] > 0) {
+            myFun_0_1_2(a, b, c);
+        } else {
+            myFun_0_1_3(a, b, c);
+        }
+    }
+
+    void myFun_0_4(int[] a, int[] b, int[] c) {
+        a[2] += b[2];
+        b[3] += a[3];
+    }
+
+}
diff --git 
a/flink-table/flink-table-code-splitter/src/test/resources/groups/code/IfInWhile.txt
 
b/flink-table/flink-table-code-splitter/src/test/resources/groups/code/IfInWhile.txt
new file mode 100644
index 00000000000..b6a55a1a743
--- /dev/null
+++ 
b/flink-table/flink-table-code-splitter/src/test/resources/groups/code/IfInWhile.txt
@@ -0,0 +1,25 @@
+{
+    a[0] += b[1];
+    b[1] += a[1];
+    while (counter > 0) {
+        myFun_whileBody0_0(a, b);
+        if (a[0] > 0) {
+            myFun_whileBody0_0_ifBody0(a, b);
+        } else {
+            myFun_whileBody0_0_ifBody1(a, b);
+        }
+
+        a[2] += b[2];
+        b[3] += a[3];
+        if (a[0] > 0) {
+            System.out.println("Hello");
+        } else {
+            System.out.println("World");
+        }
+
+        counter--;
+    }
+
+    a[4] += b[4];
+    b[5] += a[5];
+}
diff --git 
a/flink-table/flink-table-code-splitter/src/test/resources/groups/code/WhileInIf.txt
 
b/flink-table/flink-table-code-splitter/src/test/resources/groups/code/WhileInIf.txt
new file mode 100644
index 00000000000..8d34ffd2a05
--- /dev/null
+++ 
b/flink-table/flink-table-code-splitter/src/test/resources/groups/code/WhileInIf.txt
@@ -0,0 +1,26 @@
+{
+    a[0] += b[1];
+    b[1] += a[1];
+    if (a.length < 100) {
+      while (counter > 0) {
+          myFun_whileBody0_0(a, b);
+          if (a[0] > 0) {
+              myFun_whileBody0_0_ifBody0(a, b);
+          } else {
+             myFun_whileBody0_0_ifBody1(a, b);
+          }
+          counter--;
+      }
+
+      a[2] += b[2];
+      b[3] += a[3];
+    } else {
+      while (counter < 100) {
+          b[4] = b[4]++;
+          counter++;
+      }
+    }
+
+    a[5] += b[5];
+    b[6] += a[6];
+}
diff --git 
a/flink-table/flink-table-code-splitter/src/test/resources/groups/expected/IfInWhile.txt
 
b/flink-table/flink-table-code-splitter/src/test/resources/groups/expected/IfInWhile.txt
new file mode 100644
index 00000000000..0e746173c0a
--- /dev/null
+++ 
b/flink-table/flink-table-code-splitter/src/test/resources/groups/expected/IfInWhile.txt
@@ -0,0 +1,5 @@
+{
+    myFun_rewriteGroup6(a, b);
+
+    myFun_rewriteGroup7(a, b);
+}
diff --git 
a/flink-table/flink-table-code-splitter/src/test/resources/groups/expected/WhileInIf.txt
 
b/flink-table/flink-table-code-splitter/src/test/resources/groups/expected/WhileInIf.txt
new file mode 100644
index 00000000000..eb33a32e31c
--- /dev/null
+++ 
b/flink-table/flink-table-code-splitter/src/test/resources/groups/expected/WhileInIf.txt
@@ -0,0 +1,5 @@
+{
+  myFun_rewriteGroup8(a, b);
+
+  myFun_rewriteGroup9(a, b);
+}
diff --git 
a/flink-table/flink-table-code-splitter/src/test/resources/if/expected/TestIfStatementRewrite.java
 
b/flink-table/flink-table-code-splitter/src/test/resources/if/expected/TestIfStatementRewrite.java
deleted file mode 100644
index 04c3800f5e8..00000000000
--- 
a/flink-table/flink-table-code-splitter/src/test/resources/if/expected/TestIfStatementRewrite.java
+++ /dev/null
@@ -1,79 +0,0 @@
-public class TestIfStatementRewrite {
-    public void myFun1(int[] a, int[] b) throws RuntimeException {
-        if (a[0] == 0) {
-myFun1_trueFilter1(a, b);
-
-}
- else {
-myFun1_falseFilter2(a, b);
-
-}
-
-    }
-void myFun1_trueFilter1(int[] a, int[] b) throws RuntimeException{
-            a[0] = 1;
-            if (a[1] == 0) {
-myFun1_trueFilter1_trueFilter4(a, b);
-
-}
- else {
-myFun1_trueFilter1_falseFilter5(a, b);
-
-}
-
-        }
-void myFun1_trueFilter1_trueFilter4(int[] a, int[] b) throws RuntimeException{
-                a[1] = 1;
-                if (a[2] == 0) {
-                    a[2] = 1;
-                } else {
-                    a[2] = b[2];
-                }
-            }
-
-
-void myFun1_trueFilter1_falseFilter5(int[] a, int[] b) throws RuntimeException{
-                a[1] = b[1];
-                a[2] = b[2];
-            }
-
-
-
-
-void myFun1_falseFilter2(int[] a, int[] b) throws RuntimeException{
-            a[0] = b[0];
-            a[1] = b[1];
-            a[2] = b[2];
-        }
-
-
-
-    public void myFun2(int[] a, int[] b) throws RuntimeException {
-        if (a[0] == 0) {
-            a[0] = 1;
-            if (a[1] == 0) {
-                a[1] = 1;
-                if (a[2] == 0) {
-                    a[2] = 1;
-                } else {
-                    a[2] = b[2];
-                }
-            } else {
-                a[1] = b[1];
-                a[2] = b[2];
-                return;
-            }
-        } else {
-myFun2_falseFilter3(a, b);
-
-}
-
-    }
-void myFun2_falseFilter3(int[] a, int[] b) throws RuntimeException{
-            a[0] = b[0];
-            a[1] = b[1];
-            a[2] = b[2];
-        }
-
-
-}
diff --git 
a/flink-table/flink-table-code-splitter/src/test/resources/if/expected/TestRewriteInnerClass.java
 
b/flink-table/flink-table-code-splitter/src/test/resources/if/expected/TestRewriteInnerClass.java
deleted file mode 100644
index b78b07e5627..00000000000
--- 
a/flink-table/flink-table-code-splitter/src/test/resources/if/expected/TestRewriteInnerClass.java
+++ /dev/null
@@ -1,51 +0,0 @@
-public class TestRewriteInnerClass {
-    public void myFun(int[] a, int[] b) {
-        if (a[0] == 0) {
-myFun_trueFilter1(a, b);
-
-}
- else {
-myFun_falseFilter2(a, b);
-
-}
-
-    }
-void myFun_trueFilter1(int[] a, int[] b){
-            a[0] += b[0];
-            a[1] += b[1];
-        }
-
-
-void myFun_falseFilter2(int[] a, int[] b){
-            a[0] += b[1];
-            a[1] += b[0];
-        }
-
-
-
-    public class InnerClass {
-        public void myFun(int[] a, int[] b) {
-            if (a[0] == 0) {
-myFun_trueFilter3(a, b);
-
-}
- else {
-myFun_falseFilter4(a, b);
-
-}
-
-        }
-void myFun_trueFilter3(int[] a, int[] b){
-                a[0] += b[0];
-                a[1] += b[1];
-            }
-
-
-void myFun_falseFilter4(int[] a, int[] b){
-                a[0] += b[1];
-                a[1] += b[0];
-            }
-
-
-    }
-}
diff --git 
a/flink-table/flink-table-code-splitter/src/test/resources/splitter/code/TestNotSplitJavaCode.java
 
b/flink-table/flink-table-code-splitter/src/test/resources/splitter/code/TestNotSplitJavaCode.java
index 0a7fe619821..2259b6851ba 100644
--- 
a/flink-table/flink-table-code-splitter/src/test/resources/splitter/code/TestNotSplitJavaCode.java
+++ 
b/flink-table/flink-table-code-splitter/src/test/resources/splitter/code/TestNotSplitJavaCode.java
@@ -2,7 +2,7 @@ public class TestNotSplitJavaCode {
     private int a = 1;
     public final int[] b;
 
-    public TestSplitJavaCode(int[] b) {
+    public TestNotSplitJavaCode(int[] b) {
         this.b = b;
     }
 
diff --git 
a/flink-table/flink-table-code-splitter/src/test/resources/splitter/expected/TestNotSplitJavaCode.java
 
b/flink-table/flink-table-code-splitter/src/test/resources/splitter/expected/TestNotSplitJavaCode.java
index 0a7fe619821..2259b6851ba 100644
--- 
a/flink-table/flink-table-code-splitter/src/test/resources/splitter/expected/TestNotSplitJavaCode.java
+++ 
b/flink-table/flink-table-code-splitter/src/test/resources/splitter/expected/TestNotSplitJavaCode.java
@@ -2,7 +2,7 @@ public class TestNotSplitJavaCode {
     private int a = 1;
     public final int[] b;
 
-    public TestSplitJavaCode(int[] b) {
+    public TestNotSplitJavaCode(int[] b) {
         this.b = b;
     }
 
diff --git 
a/flink-table/flink-table-code-splitter/src/test/resources/splitter/expected/TestSplitJavaCode.java
 
b/flink-table/flink-table-code-splitter/src/test/resources/splitter/expected/TestSplitJavaCode.java
index 4749c92c940..1345f139fb9 100644
--- 
a/flink-table/flink-table-code-splitter/src/test/resources/splitter/expected/TestSplitJavaCode.java
+++ 
b/flink-table/flink-table-code-splitter/src/test/resources/splitter/expected/TestSplitJavaCode.java
@@ -1,11 +1,13 @@
 public class TestSplitJavaCode {
-boolean[] rewrite$14 = new boolean[2];
-int[][] rewrite$15 = new int[1][];
-int[] rewrite$16 = new int[6];
+    boolean[] rewrite$16 = new boolean[2];
+    int[][] rewrite$17 = new int[1][];
+    int[] rewrite$18 = new int[6];
+
+    {
+        rewrite$18[5] = 1;
+    }
+
 
-{
-rewrite$16[5] = 1;
-}
 
 
 
@@ -16,15 +18,13 @@ rewrite$16[5] = 1;
 
 
 
-    
-    
 
     public TestSplitJavaCode(int[] b) {
-        this.rewrite$15[0] = b;
+        this.rewrite$17[0] = b;
     }
 
     public void myFun1(int a) {
-rewrite$14[0] = false;
+        rewrite$16[0] = false;
         myFun1_split6(a);
 
         myFun1_split7(a);
@@ -32,104 +32,118 @@ rewrite$14[0] = false;
         myFun1_split8(a);
 
         myFun1_split9(a);
-if (rewrite$14[0]) { return; }
+        if (rewrite$16[0]) { return; }
+
+
+
+
+
+
 
-         
-         
-         
-        
 
-        
     }
-void myFun1_split6(int a) {
+    void myFun1_split6(int a) {
+
+        this.rewrite$18[5] = a;
+        rewrite$17[0][0] += rewrite$17[0][1];
+        rewrite$17[0][1] += rewrite$17[0][2];
+        rewrite$17[0][2] += rewrite$17[0][3];
+        rewrite$18[0] = rewrite$17[0][0] + rewrite$17[0][1];
+        rewrite$18[1] = rewrite$17[0][1] + rewrite$17[0][2];
+    }
 
-this.rewrite$16[5] = a;
-rewrite$15[0][0] += rewrite$15[0][1];
-rewrite$15[0][1] += rewrite$15[0][2];
-rewrite$15[0][2] += rewrite$15[0][3];
-rewrite$16[0] = rewrite$15[0][0] + rewrite$15[0][1];
-rewrite$16[1] = rewrite$15[0][1] + rewrite$15[0][2];
-}
+    void myFun1_split7(int a) {
+        rewrite$18[2] = rewrite$17[0][2] + rewrite$17[0][0];
+    }
 
-void myFun1_split7(int a) {
-rewrite$16[2] = rewrite$15[0][2] + rewrite$15[0][0];
-}
+    void myFun1_split8(int a) {
+        for (int x : rewrite$17[0]) {rewrite$18[3] = x;
+            System.out.println(rewrite$18[3] + rewrite$18[0] + rewrite$18[1] + 
rewrite$18[2]);
+        }
+    }
 
-void myFun1_split8(int a) {
-for (int x : rewrite$15[0]) {rewrite$16[3] = x;
-            System.out.println(rewrite$16[3] + rewrite$16[0] + rewrite$16[1] + 
rewrite$16[2]);
+    void myFun1_split9(int a) {
+        if (rewrite$18[0] > 0) {
+            myFun1_0_rewriteGroup5(a);
+        } else {
+            rewrite$17[0][0] += 50;
+            rewrite$17[0][1] += 100;
+            rewrite$17[0][2] += 150;
+            rewrite$17[0][3] += 200;
+            rewrite$17[0][4] += 250;
+            rewrite$17[0][5] += 300;
+            rewrite$17[0][6] += 350;
+            { rewrite$16[0] = true; return; }
         }
-}
+    }
 
-void myFun1_split9(int a) {
-if (rewrite$16[0] > 0) {
-myFun1_trueFilter3(a);
 
-}
- else {
-            rewrite$15[0][0] += 50;
-            rewrite$15[0][1] += 100;
-            rewrite$15[0][2] += 150;
-            rewrite$15[0][3] += 200;
-            rewrite$15[0][4] += 250;
-            rewrite$15[0][5] += 300;
-            rewrite$15[0][6] += 350;
-            { rewrite$14[0] = true; return; }
+    void myFun1_0_rewriteGroup5(int a) {
+        myFun1_0_rewriteGroup5_split10(a);
+
+        myFun1_0_rewriteGroup5_split11(a);
+
+    }
+    void myFun1_0_rewriteGroup5_split10(int a) {
+
+        rewrite$17[0][0] += 100;
+    }
+
+    void myFun1_0_rewriteGroup5_split11(int a) {
+        if (rewrite$18[1] > 0) {
+            myFun1_0_rewriteGroup1_2_rewriteGroup4(a);
+        } else {
+            rewrite$17[0][1] += 50;
         }
-}
+    }
+
+
+    void myFun1_0_rewriteGroup1_2_rewriteGroup4(int a) {
+        myFun1_0_rewriteGroup1_2_rewriteGroup4_split12(a);
+
+        myFun1_0_rewriteGroup1_2_rewriteGroup4_split13(a);
 
-void myFun1_trueFilter3(int a){
-            myFun1_trueFilter3_split10(a);
+    }
+    void myFun1_0_rewriteGroup1_2_rewriteGroup4_split12(int a) {
 
-            myFun1_trueFilter3_split11(a);
+        rewrite$17[0][1] += 100;
+    }
 
+    void myFun1_0_rewriteGroup1_2_rewriteGroup4_split13(int a) {
+        if (rewrite$18[2] > 0) {
+            rewrite$17[0][2] += 100;
+        } else {
+            rewrite$17[0][2] += 50;
         }
-void myFun1_trueFilter3_split10(int a) {
+    }
 
-rewrite$15[0][0] += 100;
-}
 
-void myFun1_trueFilter3_split11(int a) {
-if (rewrite$16[1] > 0) {
-                rewrite$15[0][1] += 100;
-                if (rewrite$16[2] > 0) {
-                    rewrite$15[0][2] += 100;
-                } else {
-                    rewrite$15[0][2] += 50;
-                }
-            } else {
-                rewrite$15[0][1] += 50;
-            }
-}
 
+    public int myFun2(int[] a) { myFun2Impl(a); return rewrite$18[4]; }
 
+    void myFun2Impl(int[] a) {
+        rewrite$16[1] = false;
+        myFun2Impl_split14(a);
+
+        myFun2Impl_split15(a);
+        if (rewrite$16[1]) { return; }
 
 
-    public int myFun2(int[] a) { myFun2Impl(a); return rewrite$16[4]; }
 
-void myFun2Impl(int[] a) {
-rewrite$14[1] = false;
-        myFun2Impl_split12(a);
 
-        myFun2Impl_split13(a);
-if (rewrite$14[1]) { return; }
 
-        
-        
-        
-        
     }
-void myFun2Impl_split12(int[] a) {
+    void myFun2Impl_split14(int[] a) {
 
-a[0] += 1;
-a[1] += 2;
-a[2] += 3;
-a[3] += 4;
-a[4] += 5;
-}
+        a[0] += 1;
+        a[1] += 2;
+        a[2] += 3;
+        a[3] += 4;
+        a[4] += 5;
+    }
 
-void myFun2Impl_split13(int[] a) {
-{ rewrite$16[4] = a[0] + a[1] + a[2] + a[3] + a[4]; { rewrite$14[1] = true; 
return; } }
-}
+    void myFun2Impl_split15(int[] a) {
+        { rewrite$18[4] = a[0] + a[1] + a[2] + a[3] + a[4]; { rewrite$16[1] = 
true; return; } }
+    }
 
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CodeSplitITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CodeSplitITCase.scala
index 1b768a0bb9b..077978651f5 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CodeSplitITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/CodeSplitITCase.scala
@@ -120,6 +120,65 @@ class CodeSplitITCase extends BatchTestBase {
     Assert.assertEquals(expected, 
TestValuesTableFactory.getResults("test_many_values"))
   }
 
+  /**
+   * This tests replicates a production query that was causing FLINK-27246. 
The generated code
+   * contains long WHILE statements followed by nested IF/ELSE statements 
which original CodeSplit
+   * logic was unable to rewrite causing compilation error on 
processElement(..) and endInput()
+   * methods being to big.
+   */
+  @Test
+  def testManyAggregationsWithGroupBy(): Unit = {
+
+    tEnv.getConfig.set(TableConfigOptions.MAX_LENGTH_GENERATED_CODE, 
Int.box(4000))
+    tEnv.getConfig.set(TableConfigOptions.MAX_MEMBERS_GENERATED_CODE, 
Int.box(10000))
+
+    tEnv
+      .executeSql(
+        s"""
+           |CREATE TABLE test_many_values (
+           |`ID` INT,
+           |${Range.inclusive(1, 250).map(i => s"  a_$i INT").mkString(",\n")}
+           |) WITH (
+           |  'connector' = 'datagen',
+           |  'number-of-rows' = '10',
+           |  'fields.ID.kind' = 'sequence',
+           |  'fields.ID.start' = '1',
+           |  'fields.ID.end' = '10',
+           |${Range.inclusive(1, 250).map(i => s"  'fields.a_$i.kind' = 
'sequence'").mkString(",\n")},
+           |${Range.inclusive(1, 250).map(i => s"  'fields.a_$i.start' = 
'1'").mkString(",\n")},
+           |${Range.inclusive(1, 250).map(i => s"  'fields.a_$i.end' = 
'10'").mkString(",\n")}
+           |)
+           |""".stripMargin
+      )
+      .await()
+
+    val sqlQuery =
+      s"""
+         |SELECT
+         |${Range.inclusive(1, 250).map(i => s"  SUM(a_$i) as 
a_$i").mkString(",\n")}
+         |    FROM `test_many_values` T1
+         |    GROUP BY ID ORDER BY a_1;
+         |""".stripMargin
+
+    val table = parseQuery(sqlQuery)
+    val result = executeQuery(table)
+
+    // The result table should contain 250 columns from a_1 to a_250 and 10 
rows with values from 1 to 10.
+    Assert.assertEquals(10, result.size)
+    for (rowNumber <- result.indices) {
+      val row = result(rowNumber)
+      Assert.assertEquals(250, row.getArity)
+      for (j <- 1 to 250) {
+        // column value starts from 1 and ends at 10.
+        val expectedRowValue = rowNumber + 1
+        Assert.assertEquals(
+          "Invalid value for row %d and column a_%d.".format(rowNumber, j),
+          expectedRowValue,
+          row.getField("a_" + j))
+      }
+    }
+  }
+
   @Test
   def testManyIns(): Unit = {
     val sql = new StringBuilder("SELECT a FROM SmallTable3 WHERE a IN (")

Reply via email to