[jira] [Updated] (FLINK-35737) Prevent Memory Leak by Closing MemoryExecutionGraphInfoStore on MiniCluster Shutdown
[ https://issues.apache.org/jira/browse/FLINK-35737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Feng Jiajie updated FLINK-35737: Description: MemoryExecutionGraphInfoStore registers a ShutdownHook upon construction and deregisters it within its close() method. {code:java} public MemoryExecutionGraphInfoStore(...) { ... this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, getClass().getSimpleName(), LOG); } @Override public void close() throws IOException { ... // Remove shutdown hook to prevent resource leaks ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG); }{code} Currently, MiniCluster instantiates a MemoryExecutionGraphInfoStore object but doesn't retain a reference to it, nor does it call close() during its own shutdown process. {code:java} final DispatcherResourceManagerComponent dispatcherResourceManagerComponent = dispatcherResourceManagerComponentFactory.create( ... new MemoryExecutionGraphInfoStore(), // -> new ...); {code} This behavior leads to an accumulation of ShutdownHooks when running multiple Flink jobs within the same local JVM. These accumulating hooks, along with their associated references, contribute to a memory leak. This patch addresses the issue by ensuring that MemoryExecutionGraphInfoStore's close() method is invoked during MiniCluster shutdown. https://github.com/apache/flink/pull/25009 was: MemoryExecutionGraphInfoStore registers a ShutdownHook upon construction and deregisters it within its close() method. {code:java} public MemoryExecutionGraphInfoStore(...) { ... this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, getClass().getSimpleName(), LOG); } @Override public void close() throws IOException { ... // Remove shutdown hook to prevent resource leaks ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG); }{code} Currently, MiniCluster instantiates a MemoryExecutionGraphInfoStore object but doesn't retain a reference to it, nor does it call close() during its own shutdown process. {code:java} final DispatcherResourceManagerComponent dispatcherResourceManagerComponent = dispatcherResourceManagerComponentFactory.create( ... new MemoryExecutionGraphInfoStore(), // -> new ...); {code} This behavior leads to an accumulation of ShutdownHooks when running multiple Flink jobs within the same local JVM. These accumulating hooks, along with their associated references, contribute to a memory leak. This patch addresses the issue by ensuring that MemoryExecutionGraphInfoStore's close() method is invoked during MiniCluster shutdown. > Prevent Memory Leak by Closing MemoryExecutionGraphInfoStore on MiniCluster > Shutdown > > > Key: FLINK-35737 > URL: https://issues.apache.org/jira/browse/FLINK-35737 > Project: Flink > Issue Type: Bug > Components: Runtime / Task >Affects Versions: 1.18.1 >Reporter: Feng Jiajie >Priority: Critical > Fix For: 1.18.2, 1.20.0, 1.19.2 > > > MemoryExecutionGraphInfoStore registers a ShutdownHook upon construction and > deregisters it within its close() method. > {code:java} > public MemoryExecutionGraphInfoStore(...) { > ... > this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, > getClass().getSimpleName(), LOG); > } > @Override > public void close() throws IOException { > ... > // Remove shutdown hook to prevent resource leaks > ShutdownHookUtil.removeShutdownHook(shutdownHook, > getClass().getSimpleName(), LOG); > }{code} > Currently, MiniCluster instantiates a MemoryExecutionGraphInfoStore object > but doesn't retain a reference to it, nor does it call close() during its own > shutdown process. > {code:java} > final DispatcherResourceManagerComponent > dispatcherResourceManagerComponent = > dispatcherResourceManagerComponentFactory.create( > ... > new MemoryExecutionGraphInfoStore(), // -> new > ...); {code} > This behavior leads to an accumulation of ShutdownHooks when running multiple > Flink jobs within the same local JVM. These accumulating hooks, along with > their associated references, contribute to a memory leak. > This patch addresses the issue by ensuring that > MemoryExecutionGraphInfoStore's close() method is invoked during MiniCluster > shutdown. > https://github.com/apache/flink/pull/25009 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35737) Prevent Memory Leak by Closing MemoryExecutionGraphInfoStore on MiniCluster Shutdown
Feng Jiajie created FLINK-35737: --- Summary: Prevent Memory Leak by Closing MemoryExecutionGraphInfoStore on MiniCluster Shutdown Key: FLINK-35737 URL: https://issues.apache.org/jira/browse/FLINK-35737 Project: Flink Issue Type: Bug Components: Runtime / Task Affects Versions: 1.18.1 Reporter: Feng Jiajie Fix For: 1.18.2, 1.20.0, 1.19.2 MemoryExecutionGraphInfoStore registers a ShutdownHook upon construction and deregisters it within its close() method. {code:java} public MemoryExecutionGraphInfoStore(...) { ... this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, getClass().getSimpleName(), LOG); } @Override public void close() throws IOException { ... // Remove shutdown hook to prevent resource leaks ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG); }{code} Currently, MiniCluster instantiates a MemoryExecutionGraphInfoStore object but doesn't retain a reference to it, nor does it call close() during its own shutdown process. {code:java} final DispatcherResourceManagerComponent dispatcherResourceManagerComponent = dispatcherResourceManagerComponentFactory.create( ... new MemoryExecutionGraphInfoStore(), // -> new ...); {code} This behavior leads to an accumulation of ShutdownHooks when running multiple Flink jobs within the same local JVM. These accumulating hooks, along with their associated references, contribute to a memory leak. This patch addresses the issue by ensuring that MemoryExecutionGraphInfoStore's close() method is invoked during MiniCluster shutdown. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33981) File Descriptor References Not Released After Job Execution in MiniCluster Mode
[ https://issues.apache.org/jira/browse/FLINK-33981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17814180#comment-17814180 ] Feng Jiajie commented on FLINK-33981: - Hi [~fanrui] , thanks for the review, I have submitted the changes to 1.17 and master branch, the tests have been passed. > File Descriptor References Not Released After Job Execution in MiniCluster > Mode > --- > > Key: FLINK-33981 > URL: https://issues.apache.org/jira/browse/FLINK-33981 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Feng Jiajie >Assignee: Feng Jiajie >Priority: Major > Labels: pull-request-available > > When using MiniCluster mode, file descriptors like > *{{/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState}}* are > not released after a Job completes. Executing multiple Jobs in the same JVM > might result in leftover file descriptors, potentially leading to problems. > After executing the reproducing code provided below (after entering the > sleep), running *lsof -p 18162* reveals: > {code:java} > ... > java18162 sa_cluster 30r DIR 253,1 01311962 > /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted) > java18162 sa_cluster 31r DIR 253,1 01311962 > /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted) > java18162 sa_cluster 32r DIR 253,1 01310787 > /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted) > java18162 sa_cluster 33r DIR 253,1 01310787 > /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted) > java18162 sa_cluster 34r DIR 253,1 01311960 > /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted) > java18162 sa_cluster 35r DIR 253,1 01311960 > /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted) > java18162 sa_cluster 36r DIR 253,1 01311974 > /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted) > java18162 sa_cluster 37r DIR 253,1 01311974 > /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted) > java18162 sa_cluster 38r DIR 253,1 01311979 > /tmp/minicluster_8413f476b77245203ed1ef759eb0d2de/tm_0/localState (deleted) > ... > {code} > The code used for reproduction is as follows: > {code:java} > import org.apache.flink.api.common.JobExecutionResult; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.core.execution.JobClient; > import org.apache.flink.streaming.api.datastream.DataStreamSource; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.sink.DiscardingSink; > import org.apache.flink.streaming.api.graph.StreamGraph; > import java.util.concurrent.CompletableFuture; > import java.util.concurrent.TimeUnit; > import java.util.concurrent.TimeoutException; > /** > * javac -cp 'lib/*' TestReleaseFd.java > * java -Xmx600m -cp '.:lib/*' TestReleaseFd > */ > public class TestReleaseFd { > public static void main(String[] args) throws Exception { > for (int i = 0; i < 10; ++i) { > int round = i; > Thread thread = new Thread(() -> { > try { > Configuration configuration = new Configuration(); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(configuration); > env.setParallelism(1); > DataStreamSource longDataStreamSource = env.fromSequence(1, > 10); > longDataStreamSource.addSink(new DiscardingSink<>()); > StreamGraph streamGraph = env.getStreamGraph(); > streamGraph.setJobName("test-" + System.nanoTime()); > JobClient jobClient = env.executeAsync(streamGraph); > CompletableFuture > jobExecutionResultCompletableFuture = jobClient.getJobExecutionResult(); > JobExecutionResult jobExecutionResult = null; > while (jobExecutionResult == null) { > try { > jobExecutionResult = > jobExecutionResultCompletableFuture.get(20, TimeUnit.SECONDS); > } catch (TimeoutException timeoutException) { > // ignore > } > } > System.out.println("finished round: " + round); > env.close(); > } catch (Exception e) { > throw new RuntimeException(e); > } > }); > thread.setDaemon(true); > thread.start(); >
[jira] [Updated] (FLINK-33981) File Descriptor References Not Released After Job Execution in MiniCluster Mode
[ https://issues.apache.org/jira/browse/FLINK-33981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Feng Jiajie updated FLINK-33981: Fix Version/s: 1.19.0 1.17.3 1.18.2 > File Descriptor References Not Released After Job Execution in MiniCluster > Mode > --- > > Key: FLINK-33981 > URL: https://issues.apache.org/jira/browse/FLINK-33981 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Feng Jiajie >Assignee: Feng Jiajie >Priority: Major > Labels: pull-request-available > Fix For: 1.19.0, 1.17.3, 1.18.2 > > > When using MiniCluster mode, file descriptors like > *{{/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState}}* are > not released after a Job completes. Executing multiple Jobs in the same JVM > might result in leftover file descriptors, potentially leading to problems. > After executing the reproducing code provided below (after entering the > sleep), running *lsof -p 18162* reveals: > {code:java} > ... > java18162 sa_cluster 30r DIR 253,1 01311962 > /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted) > java18162 sa_cluster 31r DIR 253,1 01311962 > /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted) > java18162 sa_cluster 32r DIR 253,1 01310787 > /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted) > java18162 sa_cluster 33r DIR 253,1 01310787 > /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted) > java18162 sa_cluster 34r DIR 253,1 01311960 > /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted) > java18162 sa_cluster 35r DIR 253,1 01311960 > /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted) > java18162 sa_cluster 36r DIR 253,1 01311974 > /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted) > java18162 sa_cluster 37r DIR 253,1 01311974 > /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted) > java18162 sa_cluster 38r DIR 253,1 01311979 > /tmp/minicluster_8413f476b77245203ed1ef759eb0d2de/tm_0/localState (deleted) > ... > {code} > The code used for reproduction is as follows: > {code:java} > import org.apache.flink.api.common.JobExecutionResult; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.core.execution.JobClient; > import org.apache.flink.streaming.api.datastream.DataStreamSource; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.sink.DiscardingSink; > import org.apache.flink.streaming.api.graph.StreamGraph; > import java.util.concurrent.CompletableFuture; > import java.util.concurrent.TimeUnit; > import java.util.concurrent.TimeoutException; > /** > * javac -cp 'lib/*' TestReleaseFd.java > * java -Xmx600m -cp '.:lib/*' TestReleaseFd > */ > public class TestReleaseFd { > public static void main(String[] args) throws Exception { > for (int i = 0; i < 10; ++i) { > int round = i; > Thread thread = new Thread(() -> { > try { > Configuration configuration = new Configuration(); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(configuration); > env.setParallelism(1); > DataStreamSource longDataStreamSource = env.fromSequence(1, > 10); > longDataStreamSource.addSink(new DiscardingSink<>()); > StreamGraph streamGraph = env.getStreamGraph(); > streamGraph.setJobName("test-" + System.nanoTime()); > JobClient jobClient = env.executeAsync(streamGraph); > CompletableFuture > jobExecutionResultCompletableFuture = jobClient.getJobExecutionResult(); > JobExecutionResult jobExecutionResult = null; > while (jobExecutionResult == null) { > try { > jobExecutionResult = > jobExecutionResultCompletableFuture.get(20, TimeUnit.SECONDS); > } catch (TimeoutException timeoutException) { > // ignore > } > } > System.out.println("finished round: " + round); > env.close(); > } catch (Exception e) { > throw new RuntimeException(e); > } > }); > thread.setDaemon(true); > thread.start(); > thread.join(); >
[jira] [Commented] (FLINK-33981) File Descriptor References Not Released After Job Execution in MiniCluster Mode
[ https://issues.apache.org/jira/browse/FLINK-33981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17811659#comment-17811659 ] Feng Jiajie commented on FLINK-33981: - I found the issue and will submit a fix later. Please assign the Jira issue to me, thanks. > File Descriptor References Not Released After Job Execution in MiniCluster > Mode > --- > > Key: FLINK-33981 > URL: https://issues.apache.org/jira/browse/FLINK-33981 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Feng Jiajie >Priority: Major > > When using MiniCluster mode, file descriptors like > *{{/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState}}* are > not released after a Job completes. Executing multiple Jobs in the same JVM > might result in leftover file descriptors, potentially leading to problems. > After executing the reproducing code provided below (after entering the > sleep), running *lsof -p 18162* reveals: > {code:java} > ... > java18162 sa_cluster 30r DIR 253,1 01311962 > /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted) > java18162 sa_cluster 31r DIR 253,1 01311962 > /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted) > java18162 sa_cluster 32r DIR 253,1 01310787 > /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted) > java18162 sa_cluster 33r DIR 253,1 01310787 > /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted) > java18162 sa_cluster 34r DIR 253,1 01311960 > /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted) > java18162 sa_cluster 35r DIR 253,1 01311960 > /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted) > java18162 sa_cluster 36r DIR 253,1 01311974 > /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted) > java18162 sa_cluster 37r DIR 253,1 01311974 > /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted) > java18162 sa_cluster 38r DIR 253,1 01311979 > /tmp/minicluster_8413f476b77245203ed1ef759eb0d2de/tm_0/localState (deleted) > ... > {code} > The code used for reproduction is as follows: > {code:java} > import org.apache.flink.api.common.JobExecutionResult; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.core.execution.JobClient; > import org.apache.flink.streaming.api.datastream.DataStreamSource; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.sink.DiscardingSink; > import org.apache.flink.streaming.api.graph.StreamGraph; > import java.util.concurrent.CompletableFuture; > import java.util.concurrent.TimeUnit; > import java.util.concurrent.TimeoutException; > /** > * javac -cp 'lib/*' TestReleaseFd.java > * java -Xmx600m -cp '.:lib/*' TestReleaseFd > */ > public class TestReleaseFd { > public static void main(String[] args) throws Exception { > for (int i = 0; i < 10; ++i) { > int round = i; > Thread thread = new Thread(() -> { > try { > Configuration configuration = new Configuration(); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(configuration); > env.setParallelism(1); > DataStreamSource longDataStreamSource = env.fromSequence(1, > 10); > longDataStreamSource.addSink(new DiscardingSink<>()); > StreamGraph streamGraph = env.getStreamGraph(); > streamGraph.setJobName("test-" + System.nanoTime()); > JobClient jobClient = env.executeAsync(streamGraph); > CompletableFuture > jobExecutionResultCompletableFuture = jobClient.getJobExecutionResult(); > JobExecutionResult jobExecutionResult = null; > while (jobExecutionResult == null) { > try { > jobExecutionResult = > jobExecutionResultCompletableFuture.get(20, TimeUnit.SECONDS); > } catch (TimeoutException timeoutException) { > // ignore > } > } > System.out.println("finished round: " + round); > env.close(); > } catch (Exception e) { > throw new RuntimeException(e); > } > }); > thread.setDaemon(true); > thread.start(); > thread.join(); > System.out.println("done ... " + i); > } > > // ===
[jira] [Updated] (FLINK-33981) File Descriptor References Not Released After Job Execution in MiniCluster Mode
[ https://issues.apache.org/jira/browse/FLINK-33981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Feng Jiajie updated FLINK-33981: Description: When using MiniCluster mode, file descriptors like *{{/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState}}* are not released after a Job completes. Executing multiple Jobs in the same JVM might result in leftover file descriptors, potentially leading to problems. After executing the reproducing code provided below (after entering the sleep), running *lsof -p 18162* reveals: {code:java} ... java18162 sa_cluster 30r DIR 253,1 01311962 /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted) java18162 sa_cluster 31r DIR 253,1 01311962 /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted) java18162 sa_cluster 32r DIR 253,1 01310787 /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted) java18162 sa_cluster 33r DIR 253,1 01310787 /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted) java18162 sa_cluster 34r DIR 253,1 01311960 /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted) java18162 sa_cluster 35r DIR 253,1 01311960 /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted) java18162 sa_cluster 36r DIR 253,1 01311974 /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted) java18162 sa_cluster 37r DIR 253,1 01311974 /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted) java18162 sa_cluster 38r DIR 253,1 01311979 /tmp/minicluster_8413f476b77245203ed1ef759eb0d2de/tm_0/localState (deleted) ... {code} The code used for reproduction is as follows: {code:java} import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.graph.StreamGraph; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * javac -cp 'lib/*' TestReleaseFd.java * java -Xmx600m -cp '.:lib/*' TestReleaseFd */ public class TestReleaseFd { public static void main(String[] args) throws Exception { for (int i = 0; i < 10; ++i) { int round = i; Thread thread = new Thread(() -> { try { Configuration configuration = new Configuration(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); env.setParallelism(1); DataStreamSource longDataStreamSource = env.fromSequence(1, 10); longDataStreamSource.addSink(new DiscardingSink<>()); StreamGraph streamGraph = env.getStreamGraph(); streamGraph.setJobName("test-" + System.nanoTime()); JobClient jobClient = env.executeAsync(streamGraph); CompletableFuture jobExecutionResultCompletableFuture = jobClient.getJobExecutionResult(); JobExecutionResult jobExecutionResult = null; while (jobExecutionResult == null) { try { jobExecutionResult = jobExecutionResultCompletableFuture.get(20, TimeUnit.SECONDS); } catch (TimeoutException timeoutException) { // ignore } } System.out.println("finished round: " + round); env.close(); } catch (Exception e) { throw new RuntimeException(e); } }); thread.setDaemon(true); thread.start(); thread.join(); System.out.println("done ... " + i); } // === lsof -p 18162 Thread.sleep(500_000_000); } } {code} The above code can be consistently reproduced in Flink 1.18.0, but there is no issue in Flink 1.14.6. was: When using MiniCluster mode, file descriptors like {{/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState}} are not released after a Job completes. Executing multiple Jobs in the same JVM might result in leftover file descriptors, potentially leading to problems. After executing the reproducing code provided below (after entering the sleep), running lsof -p 18162 reveals: {code:java} ... java18162 sa_cluster 30r DIR 253,1 01311962 /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted) java18162
[jira] [Updated] (FLINK-33981) File Descriptor References Not Released After Job Execution in MiniCluster Mode
[ https://issues.apache.org/jira/browse/FLINK-33981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Feng Jiajie updated FLINK-33981: Description: When using MiniCluster mode, file descriptors like {{/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState}} are not released after a Job completes. Executing multiple Jobs in the same JVM might result in leftover file descriptors, potentially leading to problems. After executing the reproducing code provided below (after entering the sleep), running lsof -p 18162 reveals: {code:java} ... java18162 sa_cluster 30r DIR 253,1 01311962 /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted) java18162 sa_cluster 31r DIR 253,1 01311962 /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted) java18162 sa_cluster 32r DIR 253,1 01310787 /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted) java18162 sa_cluster 33r DIR 253,1 01310787 /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted) java18162 sa_cluster 34r DIR 253,1 01311960 /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted) java18162 sa_cluster 35r DIR 253,1 01311960 /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted) java18162 sa_cluster 36r DIR 253,1 01311974 /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted) java18162 sa_cluster 37r DIR 253,1 01311974 /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted) java18162 sa_cluster 38r DIR 253,1 01311979 /tmp/minicluster_8413f476b77245203ed1ef759eb0d2de/tm_0/localState (deleted) ... {code} The code used for reproduction is as follows: {code:java} import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.graph.StreamGraph; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * javac -cp 'lib/*' TestReleaseFd.java * java -Xmx600m -cp '.:lib/*' TestReleaseFd */ public class TestReleaseFd { public static void main(String[] args) throws Exception { for (int i = 0; i < 10; ++i) { int round = i; Thread thread = new Thread(() -> { try { Configuration configuration = new Configuration(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); env.setParallelism(1); DataStreamSource longDataStreamSource = env.fromSequence(1, 10); longDataStreamSource.addSink(new DiscardingSink<>()); StreamGraph streamGraph = env.getStreamGraph(); streamGraph.setJobName("test-" + System.nanoTime()); JobClient jobClient = env.executeAsync(streamGraph); CompletableFuture jobExecutionResultCompletableFuture = jobClient.getJobExecutionResult(); JobExecutionResult jobExecutionResult = null; while (jobExecutionResult == null) { try { jobExecutionResult = jobExecutionResultCompletableFuture.get(20, TimeUnit.SECONDS); } catch (TimeoutException timeoutException) { // ignore } } System.out.println("finished round: " + round); env.close(); } catch (Exception e) { throw new RuntimeException(e); } }); thread.setDaemon(true); thread.start(); thread.join(); System.out.println("done ... " + i); } // === lsof -p 18162 Thread.sleep(500_000_000); } } {code} The above code can be consistently reproduced in Flink 1.18.0, but there is no issue in Flink 1.14.6. was: When using MiniCluster mode, file descriptors like {{/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState}} are not released after a Job completes. Executing multiple Jobs in the same JVM might result in leftover file descriptors, potentially leading to problems. After executing the reproducing code provided below (after entering the sleep), running lsof -p 18162 reveals: {code:java} ... java18162 sa_cluster 30r DIR 253,1 01311962 /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted) java18162 sa_cluster
[jira] [Commented] (FLINK-33981) File Descriptor References Not Released After Job Execution in MiniCluster Mode
[ https://issues.apache.org/jira/browse/FLINK-33981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17802398#comment-17802398 ] Feng Jiajie commented on FLINK-33981: - The above code can be consistently reproduced in Flink 1.18.0, but there is no issue in Flink 1.14.6. > File Descriptor References Not Released After Job Execution in MiniCluster > Mode > --- > > Key: FLINK-33981 > URL: https://issues.apache.org/jira/browse/FLINK-33981 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends >Affects Versions: 1.18.0 >Reporter: Feng Jiajie >Priority: Major > > When using MiniCluster mode, file descriptors like > {{/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState}} are not > released after a Job completes. Executing multiple Jobs in the same JVM might > result in leftover file descriptors, potentially leading to problems. > After executing the reproducing code provided below (after entering the > sleep), running lsof -p 18162 reveals: > {code:java} > ... > java18162 sa_cluster 30r DIR 253,1 01311962 > /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted) > java18162 sa_cluster 31r DIR 253,1 01311962 > /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted) > java18162 sa_cluster 32r DIR 253,1 01310787 > /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted) > java18162 sa_cluster 33r DIR 253,1 01310787 > /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted) > java18162 sa_cluster 34r DIR 253,1 01311960 > /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted) > java18162 sa_cluster 35r DIR 253,1 01311960 > /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted) > java18162 sa_cluster 36r DIR 253,1 01311974 > /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted) > java18162 sa_cluster 37r DIR 253,1 01311974 > /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted) > java18162 sa_cluster 38r DIR 253,1 01311979 > /tmp/minicluster_8413f476b77245203ed1ef759eb0d2de/tm_0/localState (deleted) > ... > {code} > The code used for reproduction is as follows: > {code:java} > import org.apache.flink.api.common.JobExecutionResult; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.core.execution.JobClient; > import org.apache.flink.streaming.api.datastream.DataStreamSource; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.sink.DiscardingSink; > import org.apache.flink.streaming.api.graph.StreamGraph; > import java.util.concurrent.CompletableFuture; > import java.util.concurrent.TimeUnit; > import java.util.concurrent.TimeoutException; > /** > * javac -cp 'lib/*' TestReleaseFd.java > * java -Xmx600m -cp '.:lib/*' TestReleaseFd > */ > public class TestReleaseFd { > public static void main(String[] args) throws Exception { > for (int i = 0; i < 10; ++i) { > int round = i; > Thread thread = new Thread(() -> { > try { > Configuration configuration = new Configuration(); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(configuration); > env.setParallelism(1); > DataStreamSource longDataStreamSource = env.fromSequence(1, > 10); > longDataStreamSource.addSink(new DiscardingSink<>()); > StreamGraph streamGraph = env.getStreamGraph(); > streamGraph.setJobName("test-" + System.nanoTime()); > JobClient jobClient = env.executeAsync(streamGraph); > CompletableFuture > jobExecutionResultCompletableFuture = jobClient.getJobExecutionResult(); > JobExecutionResult jobExecutionResult = null; > while (jobExecutionResult == null) { > try { > jobExecutionResult = > jobExecutionResultCompletableFuture.get(20, TimeUnit.SECONDS); > } catch (TimeoutException timeoutException) { > // ignore > } > } > System.out.println("finished round: " + round); > env.close(); > } catch (Exception e) { > throw new RuntimeException(e); > } > }); > thread.setDaemon(true); > thread.start(); > thread.join(); > System.out.println("done ... " + i); > } > > //
[jira] [Created] (FLINK-33981) File Descriptor References Not Released After Job Execution in MiniCluster Mode
Feng Jiajie created FLINK-33981: --- Summary: File Descriptor References Not Released After Job Execution in MiniCluster Mode Key: FLINK-33981 URL: https://issues.apache.org/jira/browse/FLINK-33981 Project: Flink Issue Type: Bug Components: Runtime / State Backends Affects Versions: 1.18.0 Reporter: Feng Jiajie When using MiniCluster mode, file descriptors like {{/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState}} are not released after a Job completes. Executing multiple Jobs in the same JVM might result in leftover file descriptors, potentially leading to problems. After executing the reproducing code provided below (after entering the sleep), running lsof -p 18162 reveals: {code:java} ... java18162 sa_cluster 30r DIR 253,1 01311962 /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted) java18162 sa_cluster 31r DIR 253,1 01311962 /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted) java18162 sa_cluster 32r DIR 253,1 01310787 /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted) java18162 sa_cluster 33r DIR 253,1 01310787 /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted) java18162 sa_cluster 34r DIR 253,1 01311960 /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted) java18162 sa_cluster 35r DIR 253,1 01311960 /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted) java18162 sa_cluster 36r DIR 253,1 01311974 /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted) java18162 sa_cluster 37r DIR 253,1 01311974 /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted) java18162 sa_cluster 38r DIR 253,1 01311979 /tmp/minicluster_8413f476b77245203ed1ef759eb0d2de/tm_0/localState (deleted) ... {code} The code used for reproduction is as follows: {code:java} import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.graph.StreamGraph; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * javac -cp 'lib/*' TestReleaseFd.java * java -Xmx600m -cp '.:lib/*' TestReleaseFd */ public class TestReleaseFd { public static void main(String[] args) throws Exception { for (int i = 0; i < 10; ++i) { int round = i; Thread thread = new Thread(() -> { try { Configuration configuration = new Configuration(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration); env.setParallelism(1); DataStreamSource longDataStreamSource = env.fromSequence(1, 10); longDataStreamSource.addSink(new DiscardingSink<>()); StreamGraph streamGraph = env.getStreamGraph(); streamGraph.setJobName("test-" + System.nanoTime()); JobClient jobClient = env.executeAsync(streamGraph); CompletableFuture jobExecutionResultCompletableFuture = jobClient.getJobExecutionResult(); JobExecutionResult jobExecutionResult = null; while (jobExecutionResult == null) { try { jobExecutionResult = jobExecutionResultCompletableFuture.get(20, TimeUnit.SECONDS); } catch (TimeoutException timeoutException) { // ignore } } System.out.println("finished round: " + round); env.close(); } catch (Exception e) { throw new RuntimeException(e); } }); thread.setDaemon(true); thread.start(); thread.join(); System.out.println("done ... " + i); } // === lsof -p 18162 Thread.sleep(500_000_000); } } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33360) HybridSource fails to clear the previous round's state when switching sources, leading to data loss
[ https://issues.apache.org/jira/browse/FLINK-33360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Feng Jiajie updated FLINK-33360: Fix Version/s: 1.18.1 > HybridSource fails to clear the previous round's state when switching > sources, leading to data loss > --- > > Key: FLINK-33360 > URL: https://issues.apache.org/jira/browse/FLINK-33360 > Project: Flink > Issue Type: Bug > Components: Connectors / HybridSource >Affects Versions: 1.16.2, 1.18.0, 1.17.1 >Reporter: Feng Jiajie >Priority: Major > Labels: pull-request-available > Fix For: 1.7.3, 1.18.1 > > > org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator: > {code:java} > // track readers that have finished processing for current > enumerator > finishedReaders.add(subtaskId); > if (finishedReaders.size() == context.currentParallelism()) { > LOG.debug("All readers finished, ready to switch > enumerator!"); > if (currentSourceIndex + 1 < sources.size()) { > switchEnumerator(); > // switch all readers prior to sending split assignments > for (int i = 0; i < context.currentParallelism(); i++) { > sendSwitchSourceEvent(i, currentSourceIndex); > } > } > } {code} > I think that *finishedReaders* is used to keep track of all the subTaskIds > that have finished reading the current round of the source. Therefore, in the > *switchEnumerator* function, *finishedReaders* should be cleared: > If it's not cleared, then in the next source reading, whenever any > SourceReader reports a *SourceReaderFinishedEvent* (while other SourceReaders > may not have finished processing in parallel), the condition > *finishedReaders.size() == context.currentParallelism()* will be satisfied > and it will trigger {*}sendSwitchSourceEvent{*}(i, currentSourceIndex), > sending a *SwitchSourceEvent* to all SourceReaders. > If a SourceReader receives a SwitchSourceEvent before it finishes reading the > previous source, it will execute {*}currentReader.close(){*}, and some data > may not be fully read, resulting in a partial data loss in the source. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33360) HybridSource fails to clear the previous round's state when switching sources, leading to data loss
[ https://issues.apache.org/jira/browse/FLINK-33360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Feng Jiajie updated FLINK-33360: Affects Version/s: 1.18.0 > HybridSource fails to clear the previous round's state when switching > sources, leading to data loss > --- > > Key: FLINK-33360 > URL: https://issues.apache.org/jira/browse/FLINK-33360 > Project: Flink > Issue Type: Bug > Components: Connectors / HybridSource >Affects Versions: 1.16.2, 1.18.0, 1.17.1 >Reporter: Feng Jiajie >Priority: Major > Labels: pull-request-available > Fix For: 1.7.3 > > > org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator: > {code:java} > // track readers that have finished processing for current > enumerator > finishedReaders.add(subtaskId); > if (finishedReaders.size() == context.currentParallelism()) { > LOG.debug("All readers finished, ready to switch > enumerator!"); > if (currentSourceIndex + 1 < sources.size()) { > switchEnumerator(); > // switch all readers prior to sending split assignments > for (int i = 0; i < context.currentParallelism(); i++) { > sendSwitchSourceEvent(i, currentSourceIndex); > } > } > } {code} > I think that *finishedReaders* is used to keep track of all the subTaskIds > that have finished reading the current round of the source. Therefore, in the > *switchEnumerator* function, *finishedReaders* should be cleared: > If it's not cleared, then in the next source reading, whenever any > SourceReader reports a *SourceReaderFinishedEvent* (while other SourceReaders > may not have finished processing in parallel), the condition > *finishedReaders.size() == context.currentParallelism()* will be satisfied > and it will trigger {*}sendSwitchSourceEvent{*}(i, currentSourceIndex), > sending a *SwitchSourceEvent* to all SourceReaders. > If a SourceReader receives a SwitchSourceEvent before it finishes reading the > previous source, it will execute {*}currentReader.close(){*}, and some data > may not be fully read, resulting in a partial data loss in the source. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33360) HybridSource fails to clear the previous round's state when switching sources, leading to data loss
[ https://issues.apache.org/jira/browse/FLINK-33360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779507#comment-17779507 ] Feng Jiajie commented on FLINK-33360: - pr: https://github.com/apache/flink/pull/23593 > HybridSource fails to clear the previous round's state when switching > sources, leading to data loss > --- > > Key: FLINK-33360 > URL: https://issues.apache.org/jira/browse/FLINK-33360 > Project: Flink > Issue Type: Bug > Components: Connectors / HybridSource >Affects Versions: 1.16.2, 1.17.1 >Reporter: Feng Jiajie >Priority: Major > Labels: pull-request-available > Fix For: 1.7.3 > > > org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator: > {code:java} > // track readers that have finished processing for current > enumerator > finishedReaders.add(subtaskId); > if (finishedReaders.size() == context.currentParallelism()) { > LOG.debug("All readers finished, ready to switch > enumerator!"); > if (currentSourceIndex + 1 < sources.size()) { > switchEnumerator(); > // switch all readers prior to sending split assignments > for (int i = 0; i < context.currentParallelism(); i++) { > sendSwitchSourceEvent(i, currentSourceIndex); > } > } > } {code} > I think that *finishedReaders* is used to keep track of all the subTaskIds > that have finished reading the current round of the source. Therefore, in the > *switchEnumerator* function, *finishedReaders* should be cleared: > If it's not cleared, then in the next source reading, whenever any > SourceReader reports a *SourceReaderFinishedEvent* (while other SourceReaders > may not have finished processing in parallel), the condition > *finishedReaders.size() == context.currentParallelism()* will be satisfied > and it will trigger {*}sendSwitchSourceEvent{*}(i, currentSourceIndex), > sending a *SwitchSourceEvent* to all SourceReaders. > If a SourceReader receives a SwitchSourceEvent before it finishes reading the > previous source, it will execute {*}currentReader.close(){*}, and some data > may not be fully read, resulting in a partial data loss in the source. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33360) HybridSource fails to clear the previous round's state when switching sources, leading to data loss
Feng Jiajie created FLINK-33360: --- Summary: HybridSource fails to clear the previous round's state when switching sources, leading to data loss Key: FLINK-33360 URL: https://issues.apache.org/jira/browse/FLINK-33360 Project: Flink Issue Type: Bug Components: Connectors / HybridSource Affects Versions: 1.17.1, 1.16.2 Reporter: Feng Jiajie Fix For: 1.7.3 org.apache.flink.connector.base.source.hybrid.HybridSourceSplitEnumerator: {code:java} // track readers that have finished processing for current enumerator finishedReaders.add(subtaskId); if (finishedReaders.size() == context.currentParallelism()) { LOG.debug("All readers finished, ready to switch enumerator!"); if (currentSourceIndex + 1 < sources.size()) { switchEnumerator(); // switch all readers prior to sending split assignments for (int i = 0; i < context.currentParallelism(); i++) { sendSwitchSourceEvent(i, currentSourceIndex); } } } {code} I think that *finishedReaders* is used to keep track of all the subTaskIds that have finished reading the current round of the source. Therefore, in the *switchEnumerator* function, *finishedReaders* should be cleared: If it's not cleared, then in the next source reading, whenever any SourceReader reports a *SourceReaderFinishedEvent* (while other SourceReaders may not have finished processing in parallel), the condition *finishedReaders.size() == context.currentParallelism()* will be satisfied and it will trigger {*}sendSwitchSourceEvent{*}(i, currentSourceIndex), sending a *SwitchSourceEvent* to all SourceReaders. If a SourceReader receives a SwitchSourceEvent before it finishes reading the previous source, it will execute {*}currentReader.close(){*}, and some data may not be fully read, resulting in a partial data loss in the source. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33171) Consistent implicit type coercion support for equal and non-equal comparisons for codegen
[ https://issues.apache.org/jira/browse/FLINK-33171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Feng Jiajie updated FLINK-33171: Summary: Consistent implicit type coercion support for equal and non-equal comparisons for codegen (was: Consistent implicit type coercion support for equal and non-equi comparison for codegen) > Consistent implicit type coercion support for equal and non-equal comparisons > for codegen > - > > Key: FLINK-33171 > URL: https://issues.apache.org/jira/browse/FLINK-33171 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.18.0, 1.17.1 >Reporter: Feng Jiajie >Assignee: Feng Jiajie >Priority: Major > Labels: pull-request-available > Fix For: 1.17.2, 1.18.1 > > > When executing the following SQL: > {code:sql} > SELECT > time1, > time1 = '2023-09-30 18:22:42.123' AS eq1, > NOT (time1 = '2023-09-30 18:22:42.123') AS notEq1 > FROM table1; > {code} > the result is as follows: > {code:java} > ++-+++ > | op | time1 |eq1 | notEq1 | > ++-+++ > | +I | 2023-09-30 18:22:42.123 | TRUE | TRUE | > | +I | 2023-09-30 18:22:42.124 | FALSE | TRUE | > ++-+++ > 2 rows in set > {code} > The "notEq1" in the first row should be FALSE. > Here is the reproducing code: > {code:java} > import org.apache.flink.api.common.functions.RichMapFunction; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.common.typeinfo.Types; > import org.apache.flink.api.java.typeutils.RowTypeInfo; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.datastream.DataStreamSource; > import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.DataTypes; > import org.apache.flink.table.api.Schema; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.apache.flink.types.Row; > public class TimePointNotEqualTest { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(new > Configuration()); > env.setParallelism(1); > DataStreamSource longDataStreamSource = env.fromSequence(0, 1); > RowTypeInfo rowTypeInfo = > new RowTypeInfo(new TypeInformation[] {Types.LONG}, new > String[] {"time1"}); > SingleOutputStreamOperator map = > longDataStreamSource.map(new RichMapFunction() { > @Override > public Row map(Long value) { > Row row = new Row(1); > row.setField(0, 1696069362123L + value); > return row; > } > }, rowTypeInfo); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); > Schema schema = Schema.newBuilder() > .column("time1", > DataTypes.TIMESTAMP_LTZ(3).bridgedTo(Long.class)) > .build(); > tableEnv.createTemporaryView("table1", map, schema); > tableEnv.sqlQuery("SELECT " > + "time1," // 2023-09-30 18:22:42.123 > + "time1 = '2023-09-30 18:22:42.123' AS eq1," // expect TRUE > + "NOT (time1 = '2023-09-30 18:22:42.123') AS notEq1 " // > expect FALSE but TRUE > + "FROM table1").execute().print(); > } > } > {code} > I would like to attempt to fix this issue. If possible, please assign the > issue to me. Thank you. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-33171) Consistent implicit type coercion support for equal and non-equi comparison for codegen
[ https://issues.apache.org/jira/browse/FLINK-33171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Feng Jiajie updated FLINK-33171: Summary: Consistent implicit type coercion support for equal and non-equi comparison for codegen (was: Table SQL support Not Equal for TimePoint type and TimeString) > Consistent implicit type coercion support for equal and non-equi comparison > for codegen > --- > > Key: FLINK-33171 > URL: https://issues.apache.org/jira/browse/FLINK-33171 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.18.0, 1.17.1 >Reporter: Feng Jiajie >Assignee: Feng Jiajie >Priority: Major > Labels: pull-request-available > Fix For: 1.17.2, 1.18.1 > > > When executing the following SQL: > {code:sql} > SELECT > time1, > time1 = '2023-09-30 18:22:42.123' AS eq1, > NOT (time1 = '2023-09-30 18:22:42.123') AS notEq1 > FROM table1; > {code} > the result is as follows: > {code:java} > ++-+++ > | op | time1 |eq1 | notEq1 | > ++-+++ > | +I | 2023-09-30 18:22:42.123 | TRUE | TRUE | > | +I | 2023-09-30 18:22:42.124 | FALSE | TRUE | > ++-+++ > 2 rows in set > {code} > The "notEq1" in the first row should be FALSE. > Here is the reproducing code: > {code:java} > import org.apache.flink.api.common.functions.RichMapFunction; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.common.typeinfo.Types; > import org.apache.flink.api.java.typeutils.RowTypeInfo; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.datastream.DataStreamSource; > import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.DataTypes; > import org.apache.flink.table.api.Schema; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.apache.flink.types.Row; > public class TimePointNotEqualTest { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(new > Configuration()); > env.setParallelism(1); > DataStreamSource longDataStreamSource = env.fromSequence(0, 1); > RowTypeInfo rowTypeInfo = > new RowTypeInfo(new TypeInformation[] {Types.LONG}, new > String[] {"time1"}); > SingleOutputStreamOperator map = > longDataStreamSource.map(new RichMapFunction() { > @Override > public Row map(Long value) { > Row row = new Row(1); > row.setField(0, 1696069362123L + value); > return row; > } > }, rowTypeInfo); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); > Schema schema = Schema.newBuilder() > .column("time1", > DataTypes.TIMESTAMP_LTZ(3).bridgedTo(Long.class)) > .build(); > tableEnv.createTemporaryView("table1", map, schema); > tableEnv.sqlQuery("SELECT " > + "time1," // 2023-09-30 18:22:42.123 > + "time1 = '2023-09-30 18:22:42.123' AS eq1," // expect TRUE > + "NOT (time1 = '2023-09-30 18:22:42.123') AS notEq1 " // > expect FALSE but TRUE > + "FROM table1").execute().print(); > } > } > {code} > I would like to attempt to fix this issue. If possible, please assign the > issue to me. Thank you. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33171) Table SQL support Not Equal for TimePoint type and TimeString
[ https://issues.apache.org/jira/browse/FLINK-33171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17770698#comment-17770698 ] Feng Jiajie commented on FLINK-33171: - I have attempted to submit a pull request: [https://github.com/apache/flink/pull/23478] > Table SQL support Not Equal for TimePoint type and TimeString > - > > Key: FLINK-33171 > URL: https://issues.apache.org/jira/browse/FLINK-33171 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner >Affects Versions: 1.18.0, 1.17.1 >Reporter: Feng Jiajie >Priority: Major > Labels: pull-request-available > Fix For: 1.17.2, 1.18.1 > > > When executing the following SQL: > {code:sql} > SELECT > time1, > time1 = '2023-09-30 18:22:42.123' AS eq1, > NOT (time1 = '2023-09-30 18:22:42.123') AS notEq1 > FROM table1; > {code} > the result is as follows: > {code:java} > ++-+++ > | op | time1 |eq1 | notEq1 | > ++-+++ > | +I | 2023-09-30 18:22:42.123 | TRUE | TRUE | > | +I | 2023-09-30 18:22:42.124 | FALSE | TRUE | > ++-+++ > 2 rows in set > {code} > The "notEq1" in the first row should be FALSE. > Here is the reproducing code: > {code:java} > import org.apache.flink.api.common.functions.RichMapFunction; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.common.typeinfo.Types; > import org.apache.flink.api.java.typeutils.RowTypeInfo; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.streaming.api.datastream.DataStreamSource; > import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.DataTypes; > import org.apache.flink.table.api.Schema; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.apache.flink.types.Row; > public class TimePointNotEqualTest { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(new > Configuration()); > env.setParallelism(1); > DataStreamSource longDataStreamSource = env.fromSequence(0, 1); > RowTypeInfo rowTypeInfo = > new RowTypeInfo(new TypeInformation[] {Types.LONG}, new > String[] {"time1"}); > SingleOutputStreamOperator map = > longDataStreamSource.map(new RichMapFunction() { > @Override > public Row map(Long value) { > Row row = new Row(1); > row.setField(0, 1696069362123L + value); > return row; > } > }, rowTypeInfo); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); > Schema schema = Schema.newBuilder() > .column("time1", > DataTypes.TIMESTAMP_LTZ(3).bridgedTo(Long.class)) > .build(); > tableEnv.createTemporaryView("table1", map, schema); > tableEnv.sqlQuery("SELECT " > + "time1," // 2023-09-30 18:22:42.123 > + "time1 = '2023-09-30 18:22:42.123' AS eq1," // expect TRUE > + "NOT (time1 = '2023-09-30 18:22:42.123') AS notEq1 " // > expect FALSE but TRUE > + "FROM table1").execute().print(); > } > } > {code} > I would like to attempt to fix this issue. If possible, please assign the > issue to me. Thank you. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-33171) Table SQL support Not Equal for TimePoint type and TimeString
Feng Jiajie created FLINK-33171: --- Summary: Table SQL support Not Equal for TimePoint type and TimeString Key: FLINK-33171 URL: https://issues.apache.org/jira/browse/FLINK-33171 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.17.1, 1.18.0 Reporter: Feng Jiajie Fix For: 1.17.2, 1.18.1 When executing the following SQL: {code:sql} SELECT time1, time1 = '2023-09-30 18:22:42.123' AS eq1, NOT (time1 = '2023-09-30 18:22:42.123') AS notEq1 FROM table1; {code} the result is as follows: {code:java} ++-+++ | op | time1 |eq1 | notEq1 | ++-+++ | +I | 2023-09-30 18:22:42.123 | TRUE | TRUE | | +I | 2023-09-30 18:22:42.124 | FALSE | TRUE | ++-+++ 2 rows in set {code} The "notEq1" in the first row should be FALSE. Here is the reproducing code: {code:java} import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; public class TimePointNotEqualTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration()); env.setParallelism(1); DataStreamSource longDataStreamSource = env.fromSequence(0, 1); RowTypeInfo rowTypeInfo = new RowTypeInfo(new TypeInformation[] {Types.LONG}, new String[] {"time1"}); SingleOutputStreamOperator map = longDataStreamSource.map(new RichMapFunction() { @Override public Row map(Long value) { Row row = new Row(1); row.setField(0, 1696069362123L + value); return row; } }, rowTypeInfo); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); Schema schema = Schema.newBuilder() .column("time1", DataTypes.TIMESTAMP_LTZ(3).bridgedTo(Long.class)) .build(); tableEnv.createTemporaryView("table1", map, schema); tableEnv.sqlQuery("SELECT " + "time1," // 2023-09-30 18:22:42.123 + "time1 = '2023-09-30 18:22:42.123' AS eq1," // expect TRUE + "NOT (time1 = '2023-09-30 18:22:42.123') AS notEq1 " // expect FALSE but TRUE + "FROM table1").execute().print(); } } {code} I would like to attempt to fix this issue. If possible, please assign the issue to me. Thank you. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning
[ https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Feng Jiajie updated FLINK-15152: Priority: Critical (was: Major) > Job running without periodic checkpoint for stop failed at the beginning > > > Key: FLINK-15152 > URL: https://issues.apache.org/jira/browse/FLINK-15152 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.9.1 >Reporter: Feng Jiajie >Priority: Critical > Labels: checkpoint, scheduler > > I have a streaming job configured with periodically checkpoint, but after one > week running, I found there isn't any checkpoint file. > h2. Reproduce the problem: > 1. Job was submitted to YARN: > {code:java} > bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m > flink-example-1.0-SNAPSHOT.jar{code} > 2. Then immediately, before all the task switch to RUNNING (about seconds), > I(actually a job control script) send a "stop with savepoint" command by > flink cli: > {code:java} > bin/flink stop -yid application_1575872737452_0019 > f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir > {code} > log in jobmanager.log: > {code:java} > 2019-12-09 17:56:56,512 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint > triggering task Source: Socket Stream -> Map (1/1) of job > f75ca6f457828427ed3d413031b92722 is not in state RUNNING but SCHEDULED > instead. Aborting checkpoint. > {code} > Then the job task(taskmanager) *continues to run normally without* checkpoint. > h2. The cause of the problem: > 1. "stop with savepoint" command call the code > stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612) > and then triggerSynchronousSavepoint: > {code:java} > // we stop the checkpoint coordinator so that we are guaranteed > // to have only the data of the synchronous savepoint committed. > // in case of failure, and if the job restarts, the coordinator > // will be restarted by the CheckpointCoordinatorDeActivator. > checkpointCoordinator.stopCheckpointScheduler();{code} > 2. but "before all the task switch to RUNNING", triggerSynchronousSavepoint > failed at org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509 > {code:java} > LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} > instead. Aborting checkpoint.", > tasksToTrigger[i].getTaskNameWithSubtaskIndex(), > job, > ExecutionState.RUNNING, > ee.getState()); > throw new > CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code} > 3. finally, "stop with savepoint" failed, with > "checkpointCoordinator.stopCheckpointScheduler()" but without the termination > of the job. The job is still running without periodically checkpoint. > > sample code for reproduce: > {code:java} > public class StreamingJob { > private static StateBackend makeRocksdbBackend() throws IOException { > RocksDBStateBackend rocksdbBackend = new > RocksDBStateBackend("file:///tmp/aaa"); > rocksdbBackend.enableTtlCompactionFilter(); > > rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED); > return rocksdbBackend; > } > public static void main(String[] args) throws Exception { > // set up the streaming execution environment > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > // 10 sec > env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE); > env.setStateBackend(makeRocksdbBackend()); > env.setRestartStrategy(RestartStrategies.noRestart()); > CheckpointConfig checkpointConfig = env.getCheckpointConfig(); > checkpointConfig.enableExternalizedCheckpoints( > > CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > checkpointConfig.setFailOnCheckpointingErrors(true); > DataStream text = env.socketTextStream("127.0.0.1", 8912, "\n"); > text.map(new MapFunction>() { > @Override > public Tuple2 map(String s) { > String[] s1 = s.split(" "); > return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1])); > } > }).keyBy(0).flatMap(new CountWindowAverage()).print(); > env.execute("Flink Streaming Java API Skeleton"); > } > public static class CountWindowAverage extends > RichFlatMapFunction, Tuple2> { > private transient ValueState> sum; > @Override > public void flatMap(Tuple2 input, Collector Long>> out) throws Exception { > Tuple2 currentSum = sum.value(); > currentSum.f0 += 1; > currentSum.f1 += input.f1; > sum.update(currentSum); > out.collect(new Tuple2<>(input.f0, currentSum.f1)); > } > @Override > public void open(Configuration config) {
[jira] [Commented] (FLINK-15308) Job failed when enable pipelined-shuffle.compression and numberOfTaskSlots > 1
[ https://issues.apache.org/jira/browse/FLINK-15308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16999892#comment-16999892 ] Feng Jiajie commented on FLINK-15308: - Really looking forward to it. > Job failed when enable pipelined-shuffle.compression and numberOfTaskSlots > 1 > -- > > Key: FLINK-15308 > URL: https://issues.apache.org/jira/browse/FLINK-15308 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.10.0 > Environment: $ git log > commit 4b54da2c67692b1c9d43e1184c00899b0151b3ae > Author: bowen.li > Date: Tue Dec 17 17:37:03 2019 -0800 >Reporter: Feng Jiajie >Assignee: Yingjie Cao >Priority: Blocker > Fix For: 1.10.0 > > Attachments: image-2019-12-19-10-55-30-644.png > > > Job worked well with default flink-conf.yaml with > pipelined-shuffle.compression: > {code:java} > taskmanager.numberOfTaskSlots: 1 > taskmanager.network.pipelined-shuffle.compression.enabled: true > {code} > But when I set taskmanager.numberOfTaskSlots to 4 or 6: > {code:java} > taskmanager.numberOfTaskSlots: 6 > taskmanager.network.pipelined-shuffle.compression.enabled: true > {code} > job failed: > {code:java} > $ bin/flink run -m yarn-cluster -p 16 -yjm 1024m -ytm 12288m > ~/flink-example-1.0-SNAPSHOT.jar > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/data/sa_cluster/cloudera/parcels/CDH-5.14.4-1.cdh5.14.4.p0.3/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > 2019-12-18 15:04:40,514 WARN org.apache.flink.yarn.cli.FlinkYarnSessionCli > - The configuration directory > ('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf') > already contains a LOG4J config file.If you want to use logback, then please > delete or rename the log configuration file. > 2019-12-18 15:04:40,514 WARN org.apache.flink.yarn.cli.FlinkYarnSessionCli > - The configuration directory > ('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf') > already contains a LOG4J config file.If you want to use logback, then please > delete or rename the log configuration file. > 2019-12-18 15:04:40,907 INFO org.apache.flink.yarn.YarnClusterDescriptor > - No path for the flink jar passed. Using the location of class > org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2019-12-18 15:04:41,084 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Cluster specification: > ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=12288, > numberTaskManagers=1, slotsPerTaskManager=6} > 2019-12-18 15:04:42,344 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Submitting application master application_1576573857638_0026 > 2019-12-18 15:04:42,370 INFO > org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted > application application_1576573857638_0026 > 2019-12-18 15:04:42,371 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Waiting for the cluster to be allocated > 2019-12-18 15:04:42,372 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Deploying cluster, current state ACCEPTED > 2019-12-18 15:04:45,388 INFO org.apache.flink.yarn.YarnClusterDescriptor > - YARN application has been deployed successfully. > 2019-12-18 15:04:45,390 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Found Web Interface debugboxcreate431x3.sa:36162 of > application 'application_1576573857638_0026'. > Job has been submitted with JobID 9140c70769f4271cc22ea8becaa26272 > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: org.apache.flink.client.program.ProgramInvocationException: > Job failed (JobID: 9140c70769f4271cc22ea8becaa26272) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) > at >
[jira] [Commented] (FLINK-15308) Job failed when enable pipelined-shuffle.compression and numberOfTaskSlots > 1
[ https://issues.apache.org/jira/browse/FLINK-15308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16999706#comment-16999706 ] Feng Jiajie commented on FLINK-15308: - Hi [~kevin.cyj] , I can reproduce the problem every time. YARN cluster: 3 node ( 8 core 32GB ) {code:java} $ cat flink-conf.yaml | grep -v '^#' | grep -v '^$' jobmanager.rpc.address: localhost jobmanager.rpc.port: 6123 jobmanager.heap.size: 1024m taskmanager.memory.total-process.size: 1024m taskmanager.numberOfTaskSlots: 6 parallelism.default: 1 taskmanager.network.pipelined-shuffle.compression.enabled: true jobmanager.execution.failover-strategy: region {code} > Job failed when enable pipelined-shuffle.compression and numberOfTaskSlots > 1 > -- > > Key: FLINK-15308 > URL: https://issues.apache.org/jira/browse/FLINK-15308 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.10.0 > Environment: $ git log > commit 4b54da2c67692b1c9d43e1184c00899b0151b3ae > Author: bowen.li > Date: Tue Dec 17 17:37:03 2019 -0800 >Reporter: Feng Jiajie >Assignee: Yingjie Cao >Priority: Blocker > Attachments: image-2019-12-19-10-55-30-644.png > > > Job worked well with default flink-conf.yaml with > pipelined-shuffle.compression: > {code:java} > taskmanager.numberOfTaskSlots: 1 > taskmanager.network.pipelined-shuffle.compression.enabled: true > {code} > But when I set taskmanager.numberOfTaskSlots to 4 or 6: > {code:java} > taskmanager.numberOfTaskSlots: 6 > taskmanager.network.pipelined-shuffle.compression.enabled: true > {code} > job failed: > {code:java} > $ bin/flink run -m yarn-cluster -p 16 -yjm 1024m -ytm 12288m > ~/flink-example-1.0-SNAPSHOT.jar > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/data/sa_cluster/cloudera/parcels/CDH-5.14.4-1.cdh5.14.4.p0.3/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > 2019-12-18 15:04:40,514 WARN org.apache.flink.yarn.cli.FlinkYarnSessionCli > - The configuration directory > ('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf') > already contains a LOG4J config file.If you want to use logback, then please > delete or rename the log configuration file. > 2019-12-18 15:04:40,514 WARN org.apache.flink.yarn.cli.FlinkYarnSessionCli > - The configuration directory > ('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf') > already contains a LOG4J config file.If you want to use logback, then please > delete or rename the log configuration file. > 2019-12-18 15:04:40,907 INFO org.apache.flink.yarn.YarnClusterDescriptor > - No path for the flink jar passed. Using the location of class > org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2019-12-18 15:04:41,084 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Cluster specification: > ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=12288, > numberTaskManagers=1, slotsPerTaskManager=6} > 2019-12-18 15:04:42,344 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Submitting application master application_1576573857638_0026 > 2019-12-18 15:04:42,370 INFO > org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted > application application_1576573857638_0026 > 2019-12-18 15:04:42,371 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Waiting for the cluster to be allocated > 2019-12-18 15:04:42,372 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Deploying cluster, current state ACCEPTED > 2019-12-18 15:04:45,388 INFO org.apache.flink.yarn.YarnClusterDescriptor > - YARN application has been deployed successfully. > 2019-12-18 15:04:45,390 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Found Web Interface debugboxcreate431x3.sa:36162 of > application 'application_1576573857638_0026'. > Job has been submitted with JobID 9140c70769f4271cc22ea8becaa26272 > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: org.apache.flink.client.program.ProgramInvocationException: > Job failed (JobID:
[jira] [Comment Edited] (FLINK-15308) Job failed when enable pipelined-shuffle.compression and numberOfTaskSlots > 1
[ https://issues.apache.org/jira/browse/FLINK-15308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16998931#comment-16998931 ] Feng Jiajie edited comment on FLINK-15308 at 12/18/19 8:35 AM: --- Here is my test code: [https://github.com/fengjiajie/my-flink-test|https://github.com/fengjiajie/my-flink-test/tree/master/src/main] run cmd: {code:java} bin/flink run -m yarn-cluster -p 16 -yjm 1024m -ytm 8192m ~/laputa-flink-example-1.0-SNAPSHOT.jar {code} and {code:java} nc -l 31212 {code} on the host debugboxcreate431x1 `cn/kbyte/StreamingJob.java:88` {code:java} new SocketClientSink<>("debugboxcreate431x1", 31212, new SimpleStringSchema())) {code} [~kevin.cyj] was (Author: fengjiajie): [https://github.com/fengjiajie/my-flink-test|https://github.com/fengjiajie/my-flink-test/tree/master/src/main] run cmd: bin/flink run -m yarn-cluster -p 16 -yjm 1024m -ytm 8192m ~/laputa-flink-example-1.0-SNAPSHOT.jar [~kevin.cyj] > Job failed when enable pipelined-shuffle.compression and numberOfTaskSlots > 1 > -- > > Key: FLINK-15308 > URL: https://issues.apache.org/jira/browse/FLINK-15308 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.10.0 > Environment: $ git log > commit 4b54da2c67692b1c9d43e1184c00899b0151b3ae > Author: bowen.li > Date: Tue Dec 17 17:37:03 2019 -0800 >Reporter: Feng Jiajie >Assignee: Yingjie Cao >Priority: Blocker > > Job worked well with default flink-conf.yaml with > pipelined-shuffle.compression: > {code:java} > taskmanager.numberOfTaskSlots: 1 > taskmanager.network.pipelined-shuffle.compression.enabled: true > {code} > But when I set taskmanager.numberOfTaskSlots to 4 or 6: > {code:java} > taskmanager.numberOfTaskSlots: 6 > taskmanager.network.pipelined-shuffle.compression.enabled: true > {code} > job failed: > {code:java} > $ bin/flink run -m yarn-cluster -p 16 -yjm 1024m -ytm 12288m > ~/flink-example-1.0-SNAPSHOT.jar > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/data/sa_cluster/cloudera/parcels/CDH-5.14.4-1.cdh5.14.4.p0.3/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > 2019-12-18 15:04:40,514 WARN org.apache.flink.yarn.cli.FlinkYarnSessionCli > - The configuration directory > ('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf') > already contains a LOG4J config file.If you want to use logback, then please > delete or rename the log configuration file. > 2019-12-18 15:04:40,514 WARN org.apache.flink.yarn.cli.FlinkYarnSessionCli > - The configuration directory > ('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf') > already contains a LOG4J config file.If you want to use logback, then please > delete or rename the log configuration file. > 2019-12-18 15:04:40,907 INFO org.apache.flink.yarn.YarnClusterDescriptor > - No path for the flink jar passed. Using the location of class > org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2019-12-18 15:04:41,084 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Cluster specification: > ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=12288, > numberTaskManagers=1, slotsPerTaskManager=6} > 2019-12-18 15:04:42,344 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Submitting application master application_1576573857638_0026 > 2019-12-18 15:04:42,370 INFO > org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted > application application_1576573857638_0026 > 2019-12-18 15:04:42,371 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Waiting for the cluster to be allocated > 2019-12-18 15:04:42,372 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Deploying cluster, current state ACCEPTED > 2019-12-18 15:04:45,388 INFO org.apache.flink.yarn.YarnClusterDescriptor > - YARN application has been deployed successfully. > 2019-12-18 15:04:45,390 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Found Web Interface debugboxcreate431x3.sa:36162 of > application 'application_1576573857638_0026'. > Job has been submitted with JobID 9140c70769f4271cc22ea8becaa26272 >
[jira] [Comment Edited] (FLINK-15308) Job failed when enable pipelined-shuffle.compression and numberOfTaskSlots > 1
[ https://issues.apache.org/jira/browse/FLINK-15308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16998931#comment-16998931 ] Feng Jiajie edited comment on FLINK-15308 at 12/18/19 8:30 AM: --- [https://github.com/fengjiajie/my-flink-test|https://github.com/fengjiajie/my-flink-test/tree/master/src/main] run cmd: bin/flink run -m yarn-cluster -p 16 -yjm 1024m -ytm 8192m ~/laputa-flink-example-1.0-SNAPSHOT.jar [~kevin.cyj] was (Author: fengjiajie): [https://github.com/fengjiajie/my-flink-test/tree/master/src/main] run cmd: bin/flink run -m yarn-cluster -p 16 -yjm 1024m -ytm 8192m ~/laputa-flink-example-1.0-SNAPSHOT.jar > Job failed when enable pipelined-shuffle.compression and numberOfTaskSlots > 1 > -- > > Key: FLINK-15308 > URL: https://issues.apache.org/jira/browse/FLINK-15308 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.10.0 > Environment: $ git log > commit 4b54da2c67692b1c9d43e1184c00899b0151b3ae > Author: bowen.li > Date: Tue Dec 17 17:37:03 2019 -0800 >Reporter: Feng Jiajie >Assignee: Yingjie Cao >Priority: Blocker > > Job worked well with default flink-conf.yaml with > pipelined-shuffle.compression: > {code:java} > taskmanager.numberOfTaskSlots: 1 > taskmanager.network.pipelined-shuffle.compression.enabled: true > {code} > But when I set taskmanager.numberOfTaskSlots to 4 or 6: > {code:java} > taskmanager.numberOfTaskSlots: 6 > taskmanager.network.pipelined-shuffle.compression.enabled: true > {code} > job failed: > {code:java} > $ bin/flink run -m yarn-cluster -p 16 -yjm 1024m -ytm 12288m > ~/flink-example-1.0-SNAPSHOT.jar > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/data/sa_cluster/cloudera/parcels/CDH-5.14.4-1.cdh5.14.4.p0.3/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > 2019-12-18 15:04:40,514 WARN org.apache.flink.yarn.cli.FlinkYarnSessionCli > - The configuration directory > ('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf') > already contains a LOG4J config file.If you want to use logback, then please > delete or rename the log configuration file. > 2019-12-18 15:04:40,514 WARN org.apache.flink.yarn.cli.FlinkYarnSessionCli > - The configuration directory > ('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf') > already contains a LOG4J config file.If you want to use logback, then please > delete or rename the log configuration file. > 2019-12-18 15:04:40,907 INFO org.apache.flink.yarn.YarnClusterDescriptor > - No path for the flink jar passed. Using the location of class > org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2019-12-18 15:04:41,084 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Cluster specification: > ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=12288, > numberTaskManagers=1, slotsPerTaskManager=6} > 2019-12-18 15:04:42,344 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Submitting application master application_1576573857638_0026 > 2019-12-18 15:04:42,370 INFO > org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted > application application_1576573857638_0026 > 2019-12-18 15:04:42,371 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Waiting for the cluster to be allocated > 2019-12-18 15:04:42,372 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Deploying cluster, current state ACCEPTED > 2019-12-18 15:04:45,388 INFO org.apache.flink.yarn.YarnClusterDescriptor > - YARN application has been deployed successfully. > 2019-12-18 15:04:45,390 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Found Web Interface debugboxcreate431x3.sa:36162 of > application 'application_1576573857638_0026'. > Job has been submitted with JobID 9140c70769f4271cc22ea8becaa26272 > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: org.apache.flink.client.program.ProgramInvocationException: > Job failed (JobID: 9140c70769f4271cc22ea8becaa26272) > at >
[jira] [Commented] (FLINK-15308) Job failed when enable pipelined-shuffle.compression and numberOfTaskSlots > 1
[ https://issues.apache.org/jira/browse/FLINK-15308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16998931#comment-16998931 ] Feng Jiajie commented on FLINK-15308: - [https://github.com/fengjiajie/my-flink-test/tree/master/src/main] run cmd: bin/flink run -m yarn-cluster -p 16 -yjm 1024m -ytm 8192m ~/laputa-flink-example-1.0-SNAPSHOT.jar > Job failed when enable pipelined-shuffle.compression and numberOfTaskSlots > 1 > -- > > Key: FLINK-15308 > URL: https://issues.apache.org/jira/browse/FLINK-15308 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.10.0 > Environment: $ git log > commit 4b54da2c67692b1c9d43e1184c00899b0151b3ae > Author: bowen.li > Date: Tue Dec 17 17:37:03 2019 -0800 >Reporter: Feng Jiajie >Assignee: Yingjie Cao >Priority: Blocker > > Job worked well with default flink-conf.yaml with > pipelined-shuffle.compression: > {code:java} > taskmanager.numberOfTaskSlots: 1 > taskmanager.network.pipelined-shuffle.compression.enabled: true > {code} > But when I set taskmanager.numberOfTaskSlots to 4 or 6: > {code:java} > taskmanager.numberOfTaskSlots: 6 > taskmanager.network.pipelined-shuffle.compression.enabled: true > {code} > job failed: > {code:java} > $ bin/flink run -m yarn-cluster -p 16 -yjm 1024m -ytm 12288m > ~/flink-example-1.0-SNAPSHOT.jar > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/data/sa_cluster/cloudera/parcels/CDH-5.14.4-1.cdh5.14.4.p0.3/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > 2019-12-18 15:04:40,514 WARN org.apache.flink.yarn.cli.FlinkYarnSessionCli > - The configuration directory > ('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf') > already contains a LOG4J config file.If you want to use logback, then please > delete or rename the log configuration file. > 2019-12-18 15:04:40,514 WARN org.apache.flink.yarn.cli.FlinkYarnSessionCli > - The configuration directory > ('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf') > already contains a LOG4J config file.If you want to use logback, then please > delete or rename the log configuration file. > 2019-12-18 15:04:40,907 INFO org.apache.flink.yarn.YarnClusterDescriptor > - No path for the flink jar passed. Using the location of class > org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2019-12-18 15:04:41,084 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Cluster specification: > ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=12288, > numberTaskManagers=1, slotsPerTaskManager=6} > 2019-12-18 15:04:42,344 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Submitting application master application_1576573857638_0026 > 2019-12-18 15:04:42,370 INFO > org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted > application application_1576573857638_0026 > 2019-12-18 15:04:42,371 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Waiting for the cluster to be allocated > 2019-12-18 15:04:42,372 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Deploying cluster, current state ACCEPTED > 2019-12-18 15:04:45,388 INFO org.apache.flink.yarn.YarnClusterDescriptor > - YARN application has been deployed successfully. > 2019-12-18 15:04:45,390 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Found Web Interface debugboxcreate431x3.sa:36162 of > application 'application_1576573857638_0026'. > Job has been submitted with JobID 9140c70769f4271cc22ea8becaa26272 > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: org.apache.flink.client.program.ProgramInvocationException: > Job failed (JobID: 9140c70769f4271cc22ea8becaa26272) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) > at >
[jira] [Updated] (FLINK-15308) Job failed when enable pipelined-shuffle.compression and numberOfTaskSlots > 1
[ https://issues.apache.org/jira/browse/FLINK-15308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Feng Jiajie updated FLINK-15308: Priority: Blocker (was: Major) > Job failed when enable pipelined-shuffle.compression and numberOfTaskSlots > 1 > -- > > Key: FLINK-15308 > URL: https://issues.apache.org/jira/browse/FLINK-15308 > Project: Flink > Issue Type: Bug > Components: Runtime / Network >Affects Versions: 1.10.0 > Environment: $ git log > commit 4b54da2c67692b1c9d43e1184c00899b0151b3ae > Author: bowen.li > Date: Tue Dec 17 17:37:03 2019 -0800 >Reporter: Feng Jiajie >Priority: Blocker > > Job worked well with default flink-conf.yaml with > pipelined-shuffle.compression: > {code:java} > taskmanager.numberOfTaskSlots: 1 > taskmanager.network.pipelined-shuffle.compression.enabled: true > {code} > But when I set taskmanager.numberOfTaskSlots to 4 or 6: > {code:java} > taskmanager.numberOfTaskSlots: 6 > taskmanager.network.pipelined-shuffle.compression.enabled: true > {code} > job failed: > {code:java} > $ bin/flink run -m yarn-cluster -p 16 -yjm 1024m -ytm 12288m > ~/flink-example-1.0-SNAPSHOT.jar > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/data/sa_cluster/cloudera/parcels/CDH-5.14.4-1.cdh5.14.4.p0.3/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] > 2019-12-18 15:04:40,514 WARN org.apache.flink.yarn.cli.FlinkYarnSessionCli > - The configuration directory > ('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf') > already contains a LOG4J config file.If you want to use logback, then please > delete or rename the log configuration file. > 2019-12-18 15:04:40,514 WARN org.apache.flink.yarn.cli.FlinkYarnSessionCli > - The configuration directory > ('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf') > already contains a LOG4J config file.If you want to use logback, then please > delete or rename the log configuration file. > 2019-12-18 15:04:40,907 INFO org.apache.flink.yarn.YarnClusterDescriptor > - No path for the flink jar passed. Using the location of class > org.apache.flink.yarn.YarnClusterDescriptor to locate the jar > 2019-12-18 15:04:41,084 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Cluster specification: > ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=12288, > numberTaskManagers=1, slotsPerTaskManager=6} > 2019-12-18 15:04:42,344 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Submitting application master application_1576573857638_0026 > 2019-12-18 15:04:42,370 INFO > org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted > application application_1576573857638_0026 > 2019-12-18 15:04:42,371 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Waiting for the cluster to be allocated > 2019-12-18 15:04:42,372 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Deploying cluster, current state ACCEPTED > 2019-12-18 15:04:45,388 INFO org.apache.flink.yarn.YarnClusterDescriptor > - YARN application has been deployed successfully. > 2019-12-18 15:04:45,390 INFO org.apache.flink.yarn.YarnClusterDescriptor > - Found Web Interface debugboxcreate431x3.sa:36162 of > application 'application_1576573857638_0026'. > Job has been submitted with JobID 9140c70769f4271cc22ea8becaa26272 > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: org.apache.flink.client.program.ProgramInvocationException: > Job failed (JobID: 9140c70769f4271cc22ea8becaa26272) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) > at > org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) > at >
[jira] [Created] (FLINK-15308) Job failed when enable pipelined-shuffle.compression and numberOfTaskSlots > 1
Feng Jiajie created FLINK-15308: --- Summary: Job failed when enable pipelined-shuffle.compression and numberOfTaskSlots > 1 Key: FLINK-15308 URL: https://issues.apache.org/jira/browse/FLINK-15308 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.10.0 Environment: $ git log commit 4b54da2c67692b1c9d43e1184c00899b0151b3ae Author: bowen.li Date: Tue Dec 17 17:37:03 2019 -0800 Reporter: Feng Jiajie Job worked well with default flink-conf.yaml with pipelined-shuffle.compression: {code:java} taskmanager.numberOfTaskSlots: 1 taskmanager.network.pipelined-shuffle.compression.enabled: true {code} But when I set taskmanager.numberOfTaskSlots to 4 or 6: {code:java} taskmanager.numberOfTaskSlots: 6 taskmanager.network.pipelined-shuffle.compression.enabled: true {code} job failed: {code:java} $ bin/flink run -m yarn-cluster -p 16 -yjm 1024m -ytm 12288m ~/flink-example-1.0-SNAPSHOT.jar SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/data/sa_cluster/cloudera/parcels/CDH-5.14.4-1.cdh5.14.4.p0.3/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 2019-12-18 15:04:40,514 WARN org.apache.flink.yarn.cli.FlinkYarnSessionCli - The configuration directory ('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file. 2019-12-18 15:04:40,514 WARN org.apache.flink.yarn.cli.FlinkYarnSessionCli - The configuration directory ('/data/build/flink/flink-dist/target/flink-1.10-SNAPSHOT-bin/flink-1.10-SNAPSHOT/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file. 2019-12-18 15:04:40,907 INFO org.apache.flink.yarn.YarnClusterDescriptor - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar 2019-12-18 15:04:41,084 INFO org.apache.flink.yarn.YarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=1024, taskManagerMemoryMB=12288, numberTaskManagers=1, slotsPerTaskManager=6} 2019-12-18 15:04:42,344 INFO org.apache.flink.yarn.YarnClusterDescriptor - Submitting application master application_1576573857638_0026 2019-12-18 15:04:42,370 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1576573857638_0026 2019-12-18 15:04:42,371 INFO org.apache.flink.yarn.YarnClusterDescriptor - Waiting for the cluster to be allocated 2019-12-18 15:04:42,372 INFO org.apache.flink.yarn.YarnClusterDescriptor - Deploying cluster, current state ACCEPTED 2019-12-18 15:04:45,388 INFO org.apache.flink.yarn.YarnClusterDescriptor - YARN application has been deployed successfully. 2019-12-18 15:04:45,390 INFO org.apache.flink.yarn.YarnClusterDescriptor - Found Web Interface debugboxcreate431x3.sa:36162 of application 'application_1576573857638_0026'. Job has been submitted with JobID 9140c70769f4271cc22ea8becaa26272 The program finished with the following exception: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 9140c70769f4271cc22ea8becaa26272) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213) at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1924) at
[jira] [Updated] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning
[ https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Feng Jiajie updated FLINK-15152: Labels: checkpoint scheduler (was: checkpoint) > Job running without periodic checkpoint for stop failed at the beginning > > > Key: FLINK-15152 > URL: https://issues.apache.org/jira/browse/FLINK-15152 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing >Affects Versions: 1.9.1 >Reporter: Feng Jiajie >Priority: Major > Labels: checkpoint, scheduler > > I have a streaming job configured with periodically checkpoint, but after one > week running, I found there isn't any checkpoint file. > h2. Reproduce the problem: > 1. Job was submitted to YARN: > {code:java} > bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m > flink-example-1.0-SNAPSHOT.jar{code} > 2. Then immediately, before all the task switch to RUNNING (about seconds), > I(actually a job control script) send a "stop with savepoint" command by > flink cli: > {code:java} > bin/flink stop -yid application_1575872737452_0019 > f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir > {code} > log in jobmanager.log: > {code:java} > 2019-12-09 17:56:56,512 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint > triggering task Source: Socket Stream -> Map (1/1) of job > f75ca6f457828427ed3d413031b92722 is not in state RUNNING but SCHEDULED > instead. Aborting checkpoint. > {code} > Then the job task(taskmanager) *continues to run normally without* checkpoint. > h2. The cause of the problem: > 1. "stop with savepoint" command call the code > stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612) > and then triggerSynchronousSavepoint: > {code:java} > // we stop the checkpoint coordinator so that we are guaranteed > // to have only the data of the synchronous savepoint committed. > // in case of failure, and if the job restarts, the coordinator > // will be restarted by the CheckpointCoordinatorDeActivator. > checkpointCoordinator.stopCheckpointScheduler();{code} > 2. but "before all the task switch to RUNNING", triggerSynchronousSavepoint > failed at org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509 > {code:java} > LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} > instead. Aborting checkpoint.", > tasksToTrigger[i].getTaskNameWithSubtaskIndex(), > job, > ExecutionState.RUNNING, > ee.getState()); > throw new > CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code} > 3. finally, "stop with savepoint" failed, with > "checkpointCoordinator.stopCheckpointScheduler()" but without the termination > of the job. The job is still running without periodically checkpoint. > > sample code for reproduce: > {code:java} > public class StreamingJob { > private static StateBackend makeRocksdbBackend() throws IOException { > RocksDBStateBackend rocksdbBackend = new > RocksDBStateBackend("file:///tmp/aaa"); > rocksdbBackend.enableTtlCompactionFilter(); > > rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED); > return rocksdbBackend; > } > public static void main(String[] args) throws Exception { > // set up the streaming execution environment > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > // 10 sec > env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE); > env.setStateBackend(makeRocksdbBackend()); > env.setRestartStrategy(RestartStrategies.noRestart()); > CheckpointConfig checkpointConfig = env.getCheckpointConfig(); > checkpointConfig.enableExternalizedCheckpoints( > > CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); > checkpointConfig.setFailOnCheckpointingErrors(true); > DataStream text = env.socketTextStream("127.0.0.1", 8912, "\n"); > text.map(new MapFunction>() { > @Override > public Tuple2 map(String s) { > String[] s1 = s.split(" "); > return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1])); > } > }).keyBy(0).flatMap(new CountWindowAverage()).print(); > env.execute("Flink Streaming Java API Skeleton"); > } > public static class CountWindowAverage extends > RichFlatMapFunction, Tuple2> { > private transient ValueState> sum; > @Override > public void flatMap(Tuple2 input, Collector Long>> out) throws Exception { > Tuple2 currentSum = sum.value(); > currentSum.f0 += 1; > currentSum.f1 += input.f1; > sum.update(currentSum); > out.collect(new Tuple2<>(input.f0, currentSum.f1)); > } > @Override > public void
[jira] [Updated] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning
[ https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Feng Jiajie updated FLINK-15152: Description: I have a streaming job configured with periodically checkpoint, but after one week running, I found there isn't any checkpoint file. h2. Reproduce the problem: 1. Job was submitted to YARN: {code:java} bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m flink-example-1.0-SNAPSHOT.jar{code} 2. Then immediately, before all the task switch to RUNNING (about seconds), I(actually a job control script) send a "stop with savepoint" command by flink cli: {code:java} bin/flink stop -yid application_1575872737452_0019 f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir {code} log in jobmanager.log: {code:java} 2019-12-09 17:56:56,512 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint triggering task Source: Socket Stream -> Map (1/1) of job f75ca6f457828427ed3d413031b92722 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint. {code} Then the job task(taskmanager) *continues to run normally without* checkpoint. h2. The cause of the problem: 1. "stop with savepoint" command call the code stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612) and then triggerSynchronousSavepoint: {code:java} // we stop the checkpoint coordinator so that we are guaranteed // to have only the data of the synchronous savepoint committed. // in case of failure, and if the job restarts, the coordinator // will be restarted by the CheckpointCoordinatorDeActivator. checkpointCoordinator.stopCheckpointScheduler();{code} 2. but "before all the task switch to RUNNING", triggerSynchronousSavepoint failed at org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509 {code:java} LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.", tasksToTrigger[i].getTaskNameWithSubtaskIndex(), job, ExecutionState.RUNNING, ee.getState()); throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code} 3. finally, "stop with savepoint" failed, with "checkpointCoordinator.stopCheckpointScheduler()" but without the termination of the job. The job is still running without periodically checkpoint. sample code for reproduce: {code:java} public class StreamingJob { private static StateBackend makeRocksdbBackend() throws IOException { RocksDBStateBackend rocksdbBackend = new RocksDBStateBackend("file:///tmp/aaa"); rocksdbBackend.enableTtlCompactionFilter(); rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED); return rocksdbBackend; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 10 sec env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE); env.setStateBackend(makeRocksdbBackend()); env.setRestartStrategy(RestartStrategies.noRestart()); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); checkpointConfig.setFailOnCheckpointingErrors(true); DataStream text = env.socketTextStream("127.0.0.1", 8912, "\n"); text.map(new MapFunction>() { @Override public Tuple2 map(String s) { String[] s1 = s.split(" "); return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1])); } }).keyBy(0).flatMap(new CountWindowAverage()).print(); env.execute("Flink Streaming Java API Skeleton"); } public static class CountWindowAverage extends RichFlatMapFunction, Tuple2> { private transient ValueState> sum; @Override public void flatMap(Tuple2 input, Collector> out) throws Exception { Tuple2 currentSum = sum.value(); currentSum.f0 += 1; currentSum.f1 += input.f1; sum.update(currentSum); out.collect(new Tuple2<>(input.f0, currentSum.f1)); } @Override public void open(Configuration config) { ValueStateDescriptor> descriptor = new ValueStateDescriptor<>( "average", // the state name TypeInformation.of(new TypeHint>() { }), // type information Tuple2.of(0L, 0L)); // default value of the state, if nothing was set sum = getRuntimeContext().getState(descriptor); } } } {code} was: I have a streaming job configured with periodically checkpoint, but after one week running, I found there isn't any checkpoint file. h2. Reproduce the problem: 1. Job was submitted to YARN: {code:java} bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m flink-example-1.0-SNAPSHOT.jar{code} 2. Then immediately, before all
[jira] [Updated] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning
[ https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Feng Jiajie updated FLINK-15152: Description: I have a streaming job configured with periodically checkpoint, but after one week running, I found there isn't any checkpoint file. h2. Reproduce the problem: 1. Job was submitted to YARN: {code:java} bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m flink-example-1.0-SNAPSHOT.jar{code} 2. Then immediately, before all the task switch to RUNNING (about seconds), I(actually a job control script) send a "stop with savepoint" command by flink cli: {code:java} bin/flink stop -yid application_1575872737452_0019 f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir {code} log in jobmanager.log: {code:java} 2019-12-09 17:56:56,512 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint triggering task Source: Socket Stream -> Map (1/1) of job f75ca6f457828427ed3d413031b92722 is not in state RUNNING but SCHEDULED instead. Aborting checkpoint. {code} Then the job task *continues to run normally without* checkpoint. h2. The cause of the problem: 1. "stop with savepoint" command call the code stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612) and then triggerSynchronousSavepoint: {code:java} // we stop the checkpoint coordinator so that we are guaranteed // to have only the data of the synchronous savepoint committed. // in case of failure, and if the job restarts, the coordinator // will be restarted by the CheckpointCoordinatorDeActivator. checkpointCoordinator.stopCheckpointScheduler();{code} 2. but "before all the task switch to RUNNING", checkpoint failed at org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509 {code:java} LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.", tasksToTrigger[i].getTaskNameWithSubtaskIndex(), job, ExecutionState.RUNNING, ee.getState()); throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code} 3. finally, "stop with savepoint" failed, with "checkpointCoordinator.stopCheckpointScheduler()" but without the termination of the job. sample code: {code:java} public class StreamingJob { private static StateBackend makeRocksdbBackend() throws IOException { RocksDBStateBackend rocksdbBackend = new RocksDBStateBackend("file:///tmp/aaa"); rocksdbBackend.enableTtlCompactionFilter(); rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED); return rocksdbBackend; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 10 sec env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE); env.setStateBackend(makeRocksdbBackend()); env.setRestartStrategy(RestartStrategies.noRestart()); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); checkpointConfig.setFailOnCheckpointingErrors(true); DataStream text = env.socketTextStream("127.0.0.1", 8912, "\n"); text.map(new MapFunction>() { @Override public Tuple2 map(String s) { String[] s1 = s.split(" "); return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1])); } }).keyBy(0).flatMap(new CountWindowAverage()).print(); env.execute("Flink Streaming Java API Skeleton"); } public static class CountWindowAverage extends RichFlatMapFunction, Tuple2> { private transient ValueState> sum; @Override public void flatMap(Tuple2 input, Collector> out) throws Exception { Tuple2 currentSum = sum.value(); currentSum.f0 += 1; currentSum.f1 += input.f1; sum.update(currentSum); out.collect(new Tuple2<>(input.f0, currentSum.f1)); } @Override public void open(Configuration config) { ValueStateDescriptor> descriptor = new ValueStateDescriptor<>( "average", // the state name TypeInformation.of(new TypeHint>() { }), // type information Tuple2.of(0L, 0L)); // default value of the state, if nothing was set sum = getRuntimeContext().getState(descriptor); } } } {code} was: I have a streaming job configured with periodically checkpoint, but after one week running, I found there isn't any checkpoint file. h2. Reproduce the problem: 1. Job was submitted to YARN: {code:java} bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m flink-example-1.0-SNAPSHOT.jar{code} 2. Then immediately, before all the task switch to RUNNING (about seconds), I(actually a job control script) send a "stop with
[jira] [Updated] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning
[ https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Feng Jiajie updated FLINK-15152: Description: I have a streaming job configured with periodically checkpoint, but after one week running, I found there isn't any checkpoint file. h2. Reproduce the problem: 1. Job was submitted to YARN: {code:java} bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m flink-example-1.0-SNAPSHOT.jar{code} 2. Then immediately, before all the task switch to RUNNING (about seconds), I(actually a job control script) send a "stop with savepoint" command by flink cli: {code:java} bin/flink stop -yid application_1575872737452_0019 f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir {code} Then the job task continues to run normally, but no checkpointing. h2. The cause of the problem: 1. "stop with savepoint" command call the code stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612) and then triggerSynchronousSavepoint: {code:java} // we stop the checkpoint coordinator so that we are guaranteed // to have only the data of the synchronous savepoint committed. // in case of failure, and if the job restarts, the coordinator // will be restarted by the CheckpointCoordinatorDeActivator. checkpointCoordinator.stopCheckpointScheduler();{code} 2. but "before all the task switch to RUNNING", checkpoint failed at org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509 {code:java} LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.", tasksToTrigger[i].getTaskNameWithSubtaskIndex(), job, ExecutionState.RUNNING, ee.getState()); throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code} 3. finally, "stop with savepoint" failed, with "checkpointCoordinator.stopCheckpointScheduler()" but without the termination of the job. sample code: {code:java} public class StreamingJob { private static StateBackend makeRocksdbBackend() throws IOException { RocksDBStateBackend rocksdbBackend = new RocksDBStateBackend("file:///tmp/aaa"); rocksdbBackend.enableTtlCompactionFilter(); rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED); return rocksdbBackend; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 10 sec env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE); env.setStateBackend(makeRocksdbBackend()); env.setRestartStrategy(RestartStrategies.noRestart()); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); checkpointConfig.setFailOnCheckpointingErrors(true); DataStream text = env.socketTextStream("127.0.0.1", 8912, "\n"); text.map(new MapFunction>() { @Override public Tuple2 map(String s) { String[] s1 = s.split(" "); return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1])); } }).keyBy(0).flatMap(new CountWindowAverage()).print(); env.execute("Flink Streaming Java API Skeleton"); } public static class CountWindowAverage extends RichFlatMapFunction, Tuple2> { private transient ValueState> sum; @Override public void flatMap(Tuple2 input, Collector> out) throws Exception { Tuple2 currentSum = sum.value(); currentSum.f0 += 1; currentSum.f1 += input.f1; sum.update(currentSum); out.collect(new Tuple2<>(input.f0, currentSum.f1)); } @Override public void open(Configuration config) { ValueStateDescriptor> descriptor = new ValueStateDescriptor<>( "average", // the state name TypeInformation.of(new TypeHint>() { }), // type information Tuple2.of(0L, 0L)); // default value of the state, if nothing was set sum = getRuntimeContext().getState(descriptor); } } } {code} was: I have a streaming job configured with periodically checkpoint, but after one week running, I found there isn't any checkpoint file. h2. Reproduce the problem: 1. Job was submitted to YARN: {code:java} bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m flink-example-1.0-SNAPSHOT.jar{code} 2. Then immediately, before all the task switch to RUNNING (about seconds), I(actually a job control script) send a stop with savepoint command by flink cli: {code:java} bin/flink stop -yid application_1575872737452_0019 f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir {code} Then the job task continues to run normally, but no checkpointing. h2. The cause of the problem: 1. "stop with savepoint" command call the code
[jira] [Updated] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning
[ https://issues.apache.org/jira/browse/FLINK-15152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Feng Jiajie updated FLINK-15152: Description: I have a streaming job configured with periodically checkpoint, but after one week running, I found there isn't any checkpoint file. h2. Reproduce the problem: 1. Job was submitted to YARN: {code:java} bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m flink-example-1.0-SNAPSHOT.jar{code} 2. Then immediately, before all the task switch to RUNNING (about seconds), I(actually a job control script) send a stop with savepoint command by flink cli: {code:java} bin/flink stop -yid application_1575872737452_0019 f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir {code} Then the job task continues to run normally, but no checkpointing. h2. The cause of the problem: 1. "stop with savepoint" command call the code stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612) and then triggerSynchronousSavepoint: {code:java} // we stop the checkpoint coordinator so that we are guaranteed // to have only the data of the synchronous savepoint committed. // in case of failure, and if the job restarts, the coordinator // will be restarted by the CheckpointCoordinatorDeActivator. checkpointCoordinator.stopCheckpointScheduler();{code} 2. but "before all the task switch to RUNNING", checkpoint failed at org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509 {code:java} LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.", tasksToTrigger[i].getTaskNameWithSubtaskIndex(), job, ExecutionState.RUNNING, ee.getState()); throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code} 3. finally, "stop with savepoint" failed, with "checkpointCoordinator.stopCheckpointScheduler()" but without the termination of the job. sample code: {code:java} public class StreamingJob { private static StateBackend makeRocksdbBackend() throws IOException { RocksDBStateBackend rocksdbBackend = new RocksDBStateBackend("file:///tmp/aaa"); rocksdbBackend.enableTtlCompactionFilter(); rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED); return rocksdbBackend; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 10 sec env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE); env.setStateBackend(makeRocksdbBackend()); env.setRestartStrategy(RestartStrategies.noRestart()); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); checkpointConfig.setFailOnCheckpointingErrors(true); DataStream text = env.socketTextStream("127.0.0.1", 8912, "\n"); text.map(new MapFunction>() { @Override public Tuple2 map(String s) { String[] s1 = s.split(" "); return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1])); } }).keyBy(0).flatMap(new CountWindowAverage()).print(); env.execute("Flink Streaming Java API Skeleton"); } public static class CountWindowAverage extends RichFlatMapFunction, Tuple2> { private transient ValueState> sum; @Override public void flatMap(Tuple2 input, Collector> out) throws Exception { Tuple2 currentSum = sum.value(); currentSum.f0 += 1; currentSum.f1 += input.f1; sum.update(currentSum); out.collect(new Tuple2<>(input.f0, currentSum.f1)); } @Override public void open(Configuration config) { ValueStateDescriptor> descriptor = new ValueStateDescriptor<>( "average", // the state name TypeInformation.of(new TypeHint>() { }), // type information Tuple2.of(0L, 0L)); // default value of the state, if nothing was set sum = getRuntimeContext().getState(descriptor); } } } {code} was: I have a streaming job configured with periodically checkpoint, but after one week running, I found there isn't any checkpoint file. h2. Reproduce the problem: # Job was submitted to YARN: {code:java} bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m flink-example-1.0-SNAPSHOT.jar{code} # Then immediately, before all the task switch to RUNNING (about seconds), I(actually a job control script) send a stop with savepoint command by flink cli: {code:java} bin/flink stop -yid application_1575872737452_0019 f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir {code} Then the job task continues to run normally, but no checkpointing. h2. The cause of the problem: # "stop with savepoint" command call the code
[jira] [Created] (FLINK-15152) Job running without periodic checkpoint for stop failed at the beginning
Feng Jiajie created FLINK-15152: --- Summary: Job running without periodic checkpoint for stop failed at the beginning Key: FLINK-15152 URL: https://issues.apache.org/jira/browse/FLINK-15152 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.9.1 Reporter: Feng Jiajie I have a streaming job configured with periodically checkpoint, but after one week running, I found there isn't any checkpoint file. h2. Reproduce the problem: # Job was submitted to YARN: {code:java} bin/flink run -m yarn-cluster -p 1 -yjm 1024m -ytm 4096m flink-example-1.0-SNAPSHOT.jar{code} # Then immediately, before all the task switch to RUNNING (about seconds), I(actually a job control script) send a stop with savepoint command by flink cli: {code:java} bin/flink stop -yid application_1575872737452_0019 f75ca6f457828427ed3d413031b92722 -p file:///tmp/some_dir {code} Then the job task continues to run normally, but no checkpointing. h2. The cause of the problem: # "stop with savepoint" command call the code stopCheckpointScheduler(org/apache/flink/runtime/scheduler/LegacyScheduler.java:612) and then triggerSynchronousSavepoint: {code:java} // we stop the checkpoint coordinator so that we are guaranteed // to have only the data of the synchronous savepoint committed. // in case of failure, and if the job restarts, the coordinator // will be restarted by the CheckpointCoordinatorDeActivator. checkpointCoordinator.stopCheckpointScheduler();{code} # but "before all the task switch to RUNNING", checkpoint failed at org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:509 {code:java} LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.", tasksToTrigger[i].getTaskNameWithSubtaskIndex(), job, ExecutionState.RUNNING, ee.getState()); throw new CheckpointException(CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING);{code} # finally, "stop with savepoint" failed, with "checkpointCoordinator.stopCheckpointScheduler()" but without the termination of the job. sample code: {code:java} public class StreamingJob { private static StateBackend makeRocksdbBackend() throws IOException { RocksDBStateBackend rocksdbBackend = new RocksDBStateBackend("file:///tmp/aaa"); rocksdbBackend.enableTtlCompactionFilter(); rocksdbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED); return rocksdbBackend; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 10 sec env.enableCheckpointing(10_000L, CheckpointingMode.AT_LEAST_ONCE); env.setStateBackend(makeRocksdbBackend()); env.setRestartStrategy(RestartStrategies.noRestart()); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); checkpointConfig.enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); checkpointConfig.setFailOnCheckpointingErrors(true); DataStream text = env.socketTextStream("127.0.0.1", 8912, "\n"); text.map(new MapFunction>() { @Override public Tuple2 map(String s) { String[] s1 = s.split(" "); return Tuple2.of(Long.parseLong(s1[0]), Long.parseLong(s1[1])); } }).keyBy(0).flatMap(new CountWindowAverage()).print(); env.execute("Flink Streaming Java API Skeleton"); } public static class CountWindowAverage extends RichFlatMapFunction, Tuple2> { private transient ValueState> sum; @Override public void flatMap(Tuple2 input, Collector> out) throws Exception { Tuple2 currentSum = sum.value(); currentSum.f0 += 1; currentSum.f1 += input.f1; sum.update(currentSum); out.collect(new Tuple2<>(input.f0, currentSum.f1)); } @Override public void open(Configuration config) { ValueStateDescriptor> descriptor = new ValueStateDescriptor<>( "average", // the state name TypeInformation.of(new TypeHint>() { }), // type information Tuple2.of(0L, 0L)); // default value of the state, if nothing was set sum = getRuntimeContext().getState(descriptor); } } } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)