dawidwys opened a new pull request, #24834:
URL: https://github.com/apache/flink/pull/24834

   
   ## What is the purpose of the change
   
   Make the `BlockStatementGrouper` work with less memory for complex `CASE 
WHEN` statements.
   
   
   ## Verifying this change
   
   1. All existing tests pass
   2. If you run the added test with complex `CASE WHEN` with `-Xmx128mb` the 
test quickly fails with:
   
   ```
   java.lang.RuntimeException: JavaCodeSplitter failed. This is a bug. Please 
file an issue.
   
        at 
org.apache.flink.table.codesplit.JavaCodeSplitter.split(JavaCodeSplitter.java:37)
        at 
org.apache.flink.table.runtime.generated.GeneratedClass.<init>(GeneratedClass.java:58)
        at 
org.apache.flink.table.runtime.generated.GeneratedOperator.<init>(GeneratedOperator.java:43)
        at 
org.apache.flink.table.planner.codegen.OperatorCodeGenerator$.generateOneInputStreamOperator(OperatorCodeGenerator.scala:130)
        at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.generateCalcOperator(CalcCodeGenerator.scala:60)
        at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator.generateCalcOperator(CalcCodeGenerator.scala)
        at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:100)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:167)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:258)
        at 
org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:178)
        at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:167)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
        at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike.map(TraversableLike.scala:233)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:84)
        at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:188)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1277)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:1055)
        at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1120)
        at 
org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:477)
        at 
org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$ResultTestItem.test(BuiltInFunctionTestBase.java:376)
        at 
org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestSetSpec.lambda$getTestCase$4(BuiltInFunctionTestBase.java:336)
        at 
org.apache.flink.table.planner.functions.BuiltInFunctionTestBase$TestCase.execute(BuiltInFunctionTestBase.java:115)
        at 
org.apache.flink.table.planner.functions.BuiltInFunctionTestBase.test(BuiltInFunctionTestBase.java:95)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
        at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
        at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
        at 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
        at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
        at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestTemplateMethod(TimeoutExtension.java:94)
        at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
        at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
        at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
        at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
        at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
        at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
        at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
        at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
        at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217)
        at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213)
        at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138)
        at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
        at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
        at 
org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
        at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
        at 
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:226)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask$DefaultDynamicTestExecutor.execute(NodeTestTask.java:204)
        at 
org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.execute(TestTemplateTestDescriptor.java:142)
        at 
org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.lambda$execute$2(TestTemplateTestDescriptor.java:110)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
        at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
        at 
java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:177)
        at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
        at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
        at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
        at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
        at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
        at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
        at 
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
        at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
        at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
        at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
        at 
java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)
        at 
java.base/java.util.stream.IntPipeline$1$1.accept(IntPipeline.java:180)
        at 
java.base/java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:104)
        at 
java.base/java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:699)
        at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
        at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
        at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
        at 
java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)
        at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
        at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
        at 
java.base/java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
        at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
        at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
        at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
        at 
java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)
        at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
        at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
        at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
        at 
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
        at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
        at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
        at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
        at 
java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:274)
        at 
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
        at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
        at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150)
        at 
java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173)
        at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:497)
        at 
org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.execute(TestTemplateTestDescriptor.java:110)
        at 
org.junit.jupiter.engine.descriptor.TestTemplateTestDescriptor.execute(TestTemplateTestDescriptor.java:44)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
        at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
        at 
org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
        at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
        at 
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
        at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
        at 
org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
        at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1541)
        at 
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
        at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
        at 
org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
        at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
        at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
        at 
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
        at 
org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
        at 
org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
        at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147)
        at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127)
        at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90)
        at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:55)
        at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:102)
        at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:54)
        at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
        at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
        at 
org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
        at 
org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
        at 
com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:57)
        at 
com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
        at 
com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
        at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
        at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:232)
        at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:55)
   Caused by: java.lang.OutOfMemoryError: Java heap space
   ```
   
   After the change it works fine:
   
   
![image](https://github.com/apache/flink/assets/6242259/ad7d6808-90b8-4cba-a838-a8b233ba0544)
   
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (yes / **no**)
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
     - The serializers: (yes / **no** / don't know)
     - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
     - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes / **no**)
     - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to