[ https://issues.apache.org/jira/browse/FLINK-33981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Rui Fan resolved FLINK-33981. ----------------------------- Resolution: Fixed > File Descriptor References Not Released After Job Execution in MiniCluster > Mode > ------------------------------------------------------------------------------- > > Key: FLINK-33981 > URL: https://issues.apache.org/jira/browse/FLINK-33981 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends > Affects Versions: 1.18.0 > Reporter: Feng Jiajie > Assignee: Feng Jiajie > Priority: Major > Labels: pull-request-available > Fix For: 1.19.0, 1.17.3, 1.18.2 > > > When using MiniCluster mode, file descriptors like > *{{/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState}}* are > not released after a Job completes. Executing multiple Jobs in the same JVM > might result in leftover file descriptors, potentially leading to problems. > After executing the reproducing code provided below (after entering the > sleep), running *lsof -p 18162* reveals: > {code:java} > ... > java 18162 sa_cluster 30r DIR 253,1 0 1311962 > /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted) > java 18162 sa_cluster 31r DIR 253,1 0 1311962 > /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted) > java 18162 sa_cluster 32r DIR 253,1 0 1310787 > /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted) > java 18162 sa_cluster 33r DIR 253,1 0 1310787 > /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted) > java 18162 sa_cluster 34r DIR 253,1 0 1311960 > /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted) > java 18162 sa_cluster 35r DIR 253,1 0 1311960 > /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted) > java 18162 sa_cluster 36r DIR 253,1 0 1311974 > /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted) > java 18162 sa_cluster 37r DIR 253,1 0 1311974 > /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted) > java 18162 sa_cluster 38r DIR 253,1 0 1311979 > /tmp/minicluster_8413f476b77245203ed1ef759eb0d2de/tm_0/localState (deleted) > ... > {code} > The code used for reproduction is as follows: > {code:java} > import org.apache.flink.api.common.JobExecutionResult; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.core.execution.JobClient; > import org.apache.flink.streaming.api.datastream.DataStreamSource; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.sink.DiscardingSink; > import org.apache.flink.streaming.api.graph.StreamGraph; > import java.util.concurrent.CompletableFuture; > import java.util.concurrent.TimeUnit; > import java.util.concurrent.TimeoutException; > /** > * javac -cp 'lib/*' TestReleaseFd.java > * java -Xmx600m -cp '.:lib/*' TestReleaseFd > */ > public class TestReleaseFd { > public static void main(String[] args) throws Exception { > for (int i = 0; i < 10; ++i) { > int round = i; > Thread thread = new Thread(() -> { > try { > Configuration configuration = new Configuration(); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(configuration); > env.setParallelism(1); > DataStreamSource<Long> longDataStreamSource = env.fromSequence(1, > 100000); > longDataStreamSource.addSink(new DiscardingSink<>()); > StreamGraph streamGraph = env.getStreamGraph(); > streamGraph.setJobName("test-" + System.nanoTime()); > JobClient jobClient = env.executeAsync(streamGraph); > CompletableFuture<JobExecutionResult> > jobExecutionResultCompletableFuture = jobClient.getJobExecutionResult(); > JobExecutionResult jobExecutionResult = null; > while (jobExecutionResult == null) { > try { > jobExecutionResult = > jobExecutionResultCompletableFuture.get(20, TimeUnit.SECONDS); > } catch (TimeoutException timeoutException) { > // ignore > } > } > System.out.println("finished round: " + round); > env.close(); > } catch (Exception e) { > throw new RuntimeException(e); > } > }); > thread.setDaemon(true); > thread.start(); > thread.join(); > System.out.println("done ... " + i); > } > > // ======================= lsof -p 18162 > Thread.sleep(500_000_000); > } > } > {code} > The above code can be consistently reproduced in Flink 1.18.0, but there is > no issue in Flink 1.14.6. -- This message was sent by Atlassian Jira (v8.20.10#820010)