tsreaper commented on code in PR #21393:
URL: https://github.com/apache/flink/pull/21393#discussion_r1067994477


##########
flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/BlockStatementGrouper.java:
##########
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codesplit;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.codesplit.JavaParser.BlockStatementContext;
+import org.apache.flink.table.codesplit.JavaParser.StatementContext;
+
+import org.antlr.v4.runtime.CharStreams;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.ParserRuleContext;
+import org.antlr.v4.runtime.Token;
+import org.antlr.v4.runtime.TokenStreamRewriter;
+import org.antlr.v4.runtime.atn.PredictionMode;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Groups end extract single line statements such as operations on 
fields/local variables and
+ * extract new method for each group making them smaller.
+ *
+ * <p><i>Before</i>
+ *
+ * <pre><code>
+ * {
+ *     a[0] += b[1];
+ *     b[1] += a[1];
+ *     while (counter > 0) {
+ *         myFun_whileBody0_0(a, b);
+ *         if (a[0] > 0) {
+ *             myFun_whileBody0_0_ifBody0(a, b);
+ *         } else {
+ *             myFun_whileBody0_0_ifBody1(a, b);
+ *         }
+ *         counter--;
+ *     }
+ *
+ *     a[2] += b[2];
+ *     b[3] += a[3];
+ * }
+ *
+ * </code></pre>
+ *
+ * <p><i>After</i>
+ *
+ * <pre><code>
+ * {
+ *      myFun_rewriteGroup1(a, b);
+ *      myFun_rewriteGroup2(a, b);
+ * }
+ * </code></pre>
+ *
+ * <p>Where bodies of extracted "methods" are:
+ *
+ * <pre><code>
+ *     myFun_rewriteGroup1 ->
+ *         a[0] += b[1];
+ *         b[1] += a[1];
+ *         while (counter > 0) {
+ *             myFun_rewriteGroup0_rewriteGroup1(a, b);
+ *             counter--;
+ *         }
+ * </code></pre>
+ *
+ * <pre><code>
+ *     myFun_rewriteGroup2 ->
+ *         a[2] += b[2];
+ *         b[3] += a[3];
+ * </code></pre>
+ *
+ * <pre><code>
+ *     myFun_rewriteGroup0_rewriteGroup1 ->
+ *         myFun_whileBody0_0(a, b);
+ *         if (a[0] > 0) {
+ *             myFun_whileBody0_0_ifBody0(a, b);
+ *         } else {
+ *             myFun_whileBody0_0_ifBody1(a, b);
+ *         }
+ * </code></pre>
+ */
+@Internal
+public class BlockStatementGrouper {
+
+    private final String code;
+
+    private final long maxMethodLength;
+
+    private final String parameters;
+
+    /**
+     * Initialize new BlockStatementGrouper.
+     *
+     * @param code code block that should be rewritten for statement grouping.
+     * @param maxMethodLength maximal length of the extracted code block.
+     * @param parameters parameters definition that should be used for 
extracted methods.
+     */
+    public BlockStatementGrouper(String code, long maxMethodLength, String 
parameters) {
+        this.code = code;
+        this.maxMethodLength = maxMethodLength;
+        this.parameters = parameters;
+    }
+
+    /**
+     * Rewrite code block used for initialization of this object. The code 
block is grouped into new
+     * methods.
+     *
+     * @param context prefix used for extracted group names.
+     * @return {@link RewriteGroupedCode} representing rewritten code block 
and containing extracted
+     *     groups with their names and content.
+     */
+    public RewriteGroupedCode rewrite(String context) {
+
+        BlockStatementGrouperVisitor visitor =
+                new BlockStatementGrouperVisitor(code, context, 
maxMethodLength, parameters);
+        JavaParser javaParser = new JavaParser(visitor.tokenStream);
+        javaParser.getInterpreter().setPredictionMode(PredictionMode.SLL);
+        visitor.visitStatement(javaParser.statement());
+
+        visitor.rewriteToGroups();
+        List<Pair<String, List<LocalGroupElement>>> groups = 
visitor.getGroups();
+
+        List<Pair<String, List<String>>> groupStrings = new 
ArrayList<>(groups.size());
+        for (Pair<String, List<LocalGroupElement>> group : groups) {
+            List<String> collectedStringGroups =
+                    group.getValue().stream()
+                            .map(LocalGroupElement::getBody)
+                            .collect(Collectors.toList());
+            groupStrings.add(Pair.of(group.getKey(), collectedStringGroups));
+        }
+
+        return new RewriteGroupedCode(visitor.rewriter.getText(), 
groupStrings);
+    }
+
+    private static class BlockStatementGrouperVisitor extends 
JavaParserBaseVisitor<Void> {
+
+        private final List<BlockStatementGrouperVisitor> children = new 
ArrayList<>();
+
+        private final String context;
+
+        private final CommonTokenStream tokenStream;
+
+        private final TokenStreamRewriter rewriter;
+
+        private final long maxMethodLength;
+
+        private final List<Pair<String, List<LocalGroupElement>>> groups = new 
ArrayList<>();
+
+        private final String parameters;
+
+        private int counter = 0;
+
+        private BlockStatementGrouperVisitor(
+                String code, String context, long maxMethodLength, String 
parameters) {
+            this.tokenStream = new CommonTokenStream(new 
JavaLexer(CharStreams.fromString(code)));
+            this.rewriter = new TokenStreamRewriter(tokenStream);
+            this.context = context;
+            this.maxMethodLength = maxMethodLength;
+            this.parameters = parameters;
+        }
+
+        @Override
+        public Void visitStatement(StatementContext ctx) {
+
+            if (ctx.getChildCount() == 0) {
+                return null;
+            }
+
+            if (ctx.WHILE() != null || ctx.IF() != null || ctx.ELSE() != null) 
{
+                for (StatementContext statement : ctx.statement()) {
+                    if (shouldExtract(statement)) {
+                        groupBlock(statement);
+                    }
+                }
+            } else {
+                if (shouldExtract(ctx)) {
+                    groupBlock(ctx);
+                }
+            }
+
+            return null;
+        }
+
+        public List<Pair<String, List<LocalGroupElement>>> getGroups() {
+            List<Pair<String, List<LocalGroupElement>>> groupsTmp = new 
ArrayList<>();
+            for (BlockStatementGrouperVisitor child : children) {
+                groupsTmp.addAll(child.getGroups());
+            }
+
+            groupsTmp.addAll(this.groups);
+
+            return groupsTmp;
+        }
+
+        private void addGroups(List<Pair<String, List<LocalGroupElement>>> 
groups) {
+            this.groups.addAll(groups);
+        }
+
+        private void groupBlock(StatementContext ctx) {
+            int localCounter = this.counter++;
+            int localGroupCodeLength = 0;
+            List<LocalGroupElement> localGroup = new ArrayList<>();
+            for (BlockStatementContext bsc : ctx.block().blockStatement()) {
+
+                StatementContext statement = bsc.statement();
+                if (statement.IF() != null
+                        || statement.ELSE() != null
+                        || statement.WHILE() != null) {
+                    String localContext = context + "_rewriteGroup" + 
localCounter;
+                    BlockStatementGrouperVisitor visitor =
+                            new BlockStatementGrouperVisitor(
+                                    CodeSplitUtil.getContextString(statement),
+                                    localContext,
+                                    maxMethodLength,
+                                    parameters);
+                    JavaParser javaParser = new 
JavaParser(visitor.tokenStream);
+                    
javaParser.getInterpreter().setPredictionMode(PredictionMode.SLL);
+                    visitor.visitStatement(javaParser.statement());
+
+                    localGroup.add(new RewriteContextGroupElement(statement, 
visitor.rewriter));
+                    children.add(visitor);
+                    localCounter = this.counter++;

Review Comment:
   This block will be replaced by a method call. Add the length of method call 
to `localGroupCodeLength`?



##########
flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/BlockStatementSplitter.java:
##########
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codesplit;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.codesplit.JavaParser.BlockStatementContext;
+import org.apache.flink.table.codesplit.JavaParser.StatementContext;
+
+import org.antlr.v4.runtime.CharStreams;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.ParserRuleContext;
+import org.antlr.v4.runtime.TokenStreamRewriter;
+import org.antlr.v4.runtime.atn.PredictionMode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+
+/**
+ * Extract statements from IFs, ELSEs and WILEs blocks making them smaller.
+ *
+ * <p><i>Before</i>
+ *
+ * <pre><code>
+ * while (counter > 0) {
+ *     int localA = a + 1000;
+ *     System.out.println(localA);
+ *     if (a > 0) {
+ *         b = a * 2;
+ *         c = b * 2;
+ *         System.out.println(b);
+ *     } else {
+ *         b = a * 3;
+ *         System.out.println(b);
+ *     }
+ *     counter--;
+ * }
+ *
+ * </code></pre>
+ *
+ * <p><i>After</i>
+ *
+ * <pre><code>
+ * while (counter > 0) {
+ *     myFun_whileBody0_0(int a);
+ *     if (a > 0) {
+ *         myFun_whileBody0_0_ifBody0(int a);
+ *     } else {
+ *         myFun_whileBody0_0_ifBody1(int a);
+ *     }
+ *     counter--;
+ * }
+ * </code></pre>
+ *
+ * <p>Where bodies of extracted "methods" are:
+ *
+ * <pre><code>
+ * myFun_whileBody0_0(int a) ->
+ *     int localA = a + 1000;
+ *     System.out.println(localA);
+ * </code></pre>
+ *
+ * <pre><code>
+ * myFun_whileBody0_0_ifBody0(int a) ->
+ *     b = a * 2;
+ *     c = b * 2;
+ *     System.out.println(b);
+ * </code></pre>
+ *
+ * <pre><code>
+ * myFun_whileBody0_0_ifBody1(int a) ->
+ *     b = a * 3;
+ *     System.out.println(b);
+ * </code></pre>
+ */
+@Internal
+public class BlockStatementSplitter {
+
+    private final String code;
+
+    private final List<BlockStatementSplitter> children = new ArrayList<>();
+
+    private final String parameters;
+
+    private BlockStatementVisitor visitor;
+
+    /**
+     * Initialize new BlockStatementSplitter.
+     *
+     * @param code a code block that should be rewritten.
+     * @param parameters parameters definition that should be used for 
extracted methods.
+     */
+    public BlockStatementSplitter(String code, String parameters) {
+        this.code = code;
+        this.parameters = parameters;
+    }
+
+    /**
+     * This method extracts statements from IFs, ELSE's and WHILE blocks from 
block code used during
+     * initialization of this object. Every entry of returned map can be seen 
as new method nam (map
+     * key) and method's body. The block names will be prefixed with provided 
context.
+     *
+     * @param context prefix for extracted blocks.
+     * @return a map of block name to block statements mappings. The key can 
be interpreted as name
+     *     of extracted block/method and corresponding List represents 
individual statements (block'
+     *     lines) for this block.
+     */
+    public Map<String, List<String>> extractBlocks(String context) {
+
+        this.visitor = new BlockStatementVisitor(code, context, parameters);
+        JavaParser javaParser = new JavaParser(visitor.tokenStream);
+        javaParser.getInterpreter().setPredictionMode(PredictionMode.SLL);
+        visitor.visitStatement(javaParser.statement());
+
+        Map<String, List<String>> extractedBlocks = new HashMap<>();
+        for (BlockStatementVisitor child : visitor.children) {
+            int counter = 0;
+            for (ContextTextPair extractedBlock : child.extractedSingleBlocks) 
{
+                ParserRuleContext parserRuleContext = 
extractedBlock.parserRuleContext;
+                if (parserRuleContext instanceof BlockStatementContext) {
+                    StatementContext statement =
+                            ((BlockStatementContext) 
parserRuleContext).statement();
+
+                    if (statement != null
+                            && (statement.IF() != null
+                                    || statement.ELSE() != null
+                                    || statement.WHILE() != null)) {
+
+                        BlockStatementSplitter splitter =
+                                new BlockStatementSplitter(
+                                        extractedBlock.ruleText, 
this.parameters);
+                        Map<String, List<String>> rewrite =
+                                splitter.extractBlocks(child.context + "_" + 
counter++);
+                        this.children.add(splitter);

Review Comment:
   You use recursions both in `BlockStatementSplitter#extractBlocks` and in 
`BlockStatementVisitor#visitStatement`. This doesn't look right to me. Could 
you explain your logic in detail?



##########
flink-table/flink-table-code-splitter/src/main/java/org/apache/flink/table/codesplit/BlockStatementSplitter.java:
##########
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codesplit;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.codesplit.JavaParser.BlockStatementContext;
+import org.apache.flink.table.codesplit.JavaParser.StatementContext;
+
+import org.antlr.v4.runtime.CharStreams;
+import org.antlr.v4.runtime.CommonTokenStream;
+import org.antlr.v4.runtime.ParserRuleContext;
+import org.antlr.v4.runtime.TokenStreamRewriter;
+import org.antlr.v4.runtime.atn.PredictionMode;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
+
+/**
+ * Extract statements from IFs, ELSEs and WILEs blocks making them smaller.
+ *
+ * <p><i>Before</i>
+ *
+ * <pre><code>
+ * while (counter > 0) {
+ *     int localA = a + 1000;
+ *     System.out.println(localA);
+ *     if (a > 0) {
+ *         b = a * 2;
+ *         c = b * 2;
+ *         System.out.println(b);
+ *     } else {
+ *         b = a * 3;
+ *         System.out.println(b);
+ *     }
+ *     counter--;
+ * }
+ *
+ * </code></pre>
+ *
+ * <p><i>After</i>
+ *
+ * <pre><code>
+ * while (counter > 0) {
+ *     myFun_whileBody0_0(int a);
+ *     if (a > 0) {
+ *         myFun_whileBody0_0_ifBody0(int a);
+ *     } else {
+ *         myFun_whileBody0_0_ifBody1(int a);
+ *     }
+ *     counter--;
+ * }
+ * </code></pre>
+ *
+ * <p>Where bodies of extracted "methods" are:
+ *
+ * <pre><code>
+ * myFun_whileBody0_0(int a) ->
+ *     int localA = a + 1000;
+ *     System.out.println(localA);
+ * </code></pre>
+ *
+ * <pre><code>
+ * myFun_whileBody0_0_ifBody0(int a) ->
+ *     b = a * 2;
+ *     c = b * 2;
+ *     System.out.println(b);
+ * </code></pre>
+ *
+ * <pre><code>
+ * myFun_whileBody0_0_ifBody1(int a) ->
+ *     b = a * 3;
+ *     System.out.println(b);
+ * </code></pre>
+ */
+@Internal
+public class BlockStatementSplitter {
+
+    private final String code;
+
+    private final List<BlockStatementSplitter> children = new ArrayList<>();
+
+    private final String parameters;
+
+    private BlockStatementVisitor visitor;
+
+    /**
+     * Initialize new BlockStatementSplitter.
+     *
+     * @param code a code block that should be rewritten.
+     * @param parameters parameters definition that should be used for 
extracted methods.
+     */
+    public BlockStatementSplitter(String code, String parameters) {
+        this.code = code;
+        this.parameters = parameters;
+    }
+
+    /**
+     * This method extracts statements from IFs, ELSE's and WHILE blocks from 
block code used during
+     * initialization of this object. Every entry of returned map can be seen 
as new method nam (map
+     * key) and method's body. The block names will be prefixed with provided 
context.
+     *
+     * @param context prefix for extracted blocks.
+     * @return a map of block name to block statements mappings. The key can 
be interpreted as name
+     *     of extracted block/method and corresponding List represents 
individual statements (block'
+     *     lines) for this block.
+     */
+    public Map<String, List<String>> extractBlocks(String context) {
+
+        this.visitor = new BlockStatementVisitor(code, context, parameters);
+        JavaParser javaParser = new JavaParser(visitor.tokenStream);
+        javaParser.getInterpreter().setPredictionMode(PredictionMode.SLL);
+        visitor.visitStatement(javaParser.statement());
+
+        Map<String, List<String>> extractedBlocks = new HashMap<>();
+        for (BlockStatementVisitor child : visitor.children) {
+            int counter = 0;
+            for (ContextTextPair extractedBlock : child.extractedSingleBlocks) 
{
+                ParserRuleContext parserRuleContext = 
extractedBlock.parserRuleContext;
+                if (parserRuleContext instanceof BlockStatementContext) {
+                    StatementContext statement =
+                            ((BlockStatementContext) 
parserRuleContext).statement();
+
+                    if (statement != null
+                            && (statement.IF() != null
+                                    || statement.ELSE() != null
+                                    || statement.WHILE() != null)) {
+
+                        BlockStatementSplitter splitter =
+                                new BlockStatementSplitter(
+                                        extractedBlock.ruleText, 
this.parameters);
+                        Map<String, List<String>> rewrite =
+                                splitter.extractBlocks(child.context + "_" + 
counter++);
+                        this.children.add(splitter);
+
+                        mergeBlocks(rewrite, extractedBlocks);
+                    }
+                }
+            }
+        }
+
+        Map<String, List<String>> localBlocks = visitor.getAllBlocks();
+        mergeBlocks(localBlocks, extractedBlocks);
+
+        return extractedBlocks;
+    }
+
+    /**
+     * Rewrite code block that was used for this object initialization.
+     *
+     * @return a map which key represent rewritten block name and value 
represents rewritten code
+     *     block, including calls to extracted methods
+     */
+    public Map<String, String> rewriteBlock() {
+
+        for (BlockStatementSplitter child : children) {
+            child.visitor.rewrite();
+        }
+        visitor.rewrite();
+
+        Map<String, String> rewriteBlocks = new HashMap<>();
+        for (BlockStatementSplitter child : children) {
+            Map<String, String> childRewriteBlocks = child.rewriteBlock();
+            for (Entry<String, String> entry : childRewriteBlocks.entrySet()) {
+                rewriteBlocks.merge(
+                        entry.getKey(),
+                        entry.getValue(),
+                        (s, s2) -> {
+                            throw new RuntimeException(
+                                    String.format(
+                                            "Override rewritten block  for key 
%s. Blocks are %s -> %s",
+                                            entry.getKey(), s, s2));
+                        });
+            }
+        }
+
+        rewriteBlocks.put(visitor.context, visitor.rewriter.getText());
+        return rewriteBlocks;
+    }
+
+    private static void mergeBlocks(
+            Map<String, List<String>> mergeA, Map<String, List<String>> 
mergeB) {
+
+        for (Entry<String, List<String>> entry : mergeA.entrySet()) {
+            mergeB.merge(
+                    entry.getKey(),
+                    entry.getValue(),
+                    (aBlocks, bBlocks) -> {
+                        List<String> merge = new ArrayList<>(aBlocks.size() + 
bBlocks.size());
+                        merge.addAll(aBlocks);
+                        merge.addAll(bBlocks);
+                        return merge;
+                    });
+        }
+    }
+
+    private static class BlockStatementVisitor extends 
JavaParserBaseVisitor<Void> {
+
+        private final List<BlockStatementVisitor> children = new ArrayList<>();
+
+        private final List<ContextTextPair> extractedSingleBlocks = new 
ArrayList<>();
+
+        private final String context;
+
+        private final CommonTokenStream tokenStream;
+
+        private final TokenStreamRewriter rewriter;
+
+        private final String parameters;
+
+        private int counter = 0;
+
+        private int bscCounter = 0;
+
+        private BlockStatementVisitor(String code, String context, String 
parameters) {
+            this.tokenStream = new CommonTokenStream(new 
JavaLexer(CharStreams.fromString(code)));
+            this.rewriter = new TokenStreamRewriter(tokenStream);
+            this.context = context;
+            this.parameters = parameters;
+        }
+
+        private BlockStatementVisitor(
+                TokenStreamRewriter rewriter, String context, String 
parameters) {
+            this.tokenStream = null;
+            this.rewriter = rewriter;
+            this.context = context;
+            this.parameters = parameters;
+        }
+
+        @Override
+        public Void visitStatement(StatementContext ctx) {
+
+            if (ctx.getChildCount() == 0) {
+                return null;
+            }
+
+            if (ctx.WHILE() != null) {
+                for (StatementContext statement : ctx.statement()) {
+                    BlockStatementVisitor visitor =
+                            new BlockStatementVisitor(
+                                    rewriter, context + "_whileBody" + 
counter++, parameters);
+                    children.add(visitor);
+                    visitor.visitStatement(statement);
+                }
+            } else if (ctx.IF() != null) {
+                for (StatementContext sc : ctx.statement()) {
+                    BlockStatementVisitor visitor =
+                            new BlockStatementVisitor(
+                                    rewriter, context + "_ifBody" + counter++, 
parameters);
+                    children.add(visitor);
+                    visitor.visitStatement(sc);
+                }
+            } else {

Review Comment:
   What about `ctx.ELSE() != null`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to