[ 
https://issues.apache.org/jira/browse/FLINK-33981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rui Fan resolved FLINK-33981.
-----------------------------
    Resolution: Fixed

> File Descriptor References Not Released After Job Execution in MiniCluster 
> Mode
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-33981
>                 URL: https://issues.apache.org/jira/browse/FLINK-33981
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.18.0
>            Reporter: Feng Jiajie
>            Assignee: Feng Jiajie
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.19.0, 1.17.3, 1.18.2
>
>
> When using MiniCluster mode, file descriptors like 
> *{{/tmp/minicluster_6e62ab17ab058e0d5df56b97e9ccef71/tm_0/localState}}* are 
> not released after a Job completes. Executing multiple Jobs in the same JVM 
> might result in leftover file descriptors, potentially leading to problems.
> After executing the reproducing code provided below (after entering the 
> sleep), running *lsof -p 18162* reveals:
> {code:java}
> ...
> java    18162 sa_cluster   30r   DIR              253,1         0    1311962 
> /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
> java    18162 sa_cluster   31r   DIR              253,1         0    1311962 
> /tmp/minicluster_79186ba3856bb18fd21c20d9adbaa3da/tm_0/localState (deleted)
> java    18162 sa_cluster   32r   DIR              253,1         0    1310787 
> /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
> java    18162 sa_cluster   33r   DIR              253,1         0    1310787 
> /tmp/minicluster_f3ac056ffa1c47010698fc02c5e6d4cf/tm_0/localState (deleted)
> java    18162 sa_cluster   34r   DIR              253,1         0    1311960 
> /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
> java    18162 sa_cluster   35r   DIR              253,1         0    1311960 
> /tmp/minicluster_ef6177c97a2c9096758d0a8f132f9f02/tm_0/localState (deleted)
> java    18162 sa_cluster   36r   DIR              253,1         0    1311974 
> /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
> java    18162 sa_cluster   37r   DIR              253,1         0    1311974 
> /tmp/minicluster_661a2967ea5c377265fa9f41c768db21/tm_0/localState (deleted)
> java    18162 sa_cluster   38r   DIR              253,1         0    1311979 
> /tmp/minicluster_8413f476b77245203ed1ef759eb0d2de/tm_0/localState (deleted)
> ...
> {code}
> The code used for reproduction is as follows:
> {code:java}
> import org.apache.flink.api.common.JobExecutionResult;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.core.execution.JobClient;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
> import org.apache.flink.streaming.api.graph.StreamGraph;
> import java.util.concurrent.CompletableFuture;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.TimeoutException;
> /**
>  * javac -cp 'lib/*' TestReleaseFd.java
>  * java -Xmx600m -cp '.:lib/*' TestReleaseFd
>  */
> public class TestReleaseFd {
>   public static void main(String[] args) throws Exception {
>     for (int i = 0; i < 10; ++i) {
>       int round = i;
>       Thread thread = new Thread(() -> {
>         try {
>           Configuration configuration = new Configuration();
>           final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
>           env.setParallelism(1);
>           DataStreamSource<Long> longDataStreamSource = env.fromSequence(1, 
> 100000);
>           longDataStreamSource.addSink(new DiscardingSink<>());
>           StreamGraph streamGraph = env.getStreamGraph();
>           streamGraph.setJobName("test-" + System.nanoTime());
>           JobClient jobClient = env.executeAsync(streamGraph);
>           CompletableFuture<JobExecutionResult> 
> jobExecutionResultCompletableFuture = jobClient.getJobExecutionResult();
>           JobExecutionResult jobExecutionResult = null;
>           while (jobExecutionResult == null) {
>             try {
>               jobExecutionResult = 
> jobExecutionResultCompletableFuture.get(20, TimeUnit.SECONDS);
>             } catch (TimeoutException timeoutException) {
>               // ignore
>             }
>           }
>           System.out.println("finished round: " + round);
>           env.close();
>         } catch (Exception e) {
>           throw new RuntimeException(e);
>         }
>       });
>       thread.setDaemon(true);
>       thread.start();
>       thread.join();
>       System.out.println("done ... " + i);
>     }
>     
>     // ======================= lsof -p 18162
>     Thread.sleep(500_000_000);
>   }
> }
>  {code}
> The above code can be consistently reproduced in Flink 1.18.0, but there is 
> no issue in Flink 1.14.6.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to