[ 
https://issues.apache.org/jira/browse/FLINK-10832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arnaud Linz updated FLINK-10832:
--------------------------------
    Description: 
In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), 

This code never ends : 

 {{ 
    public void testFlink162() throws Exception {
        // get the execution environment
        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        // get input data 
        final DataStreamSource<String> text = env.addSource(new 
SourceFunction<String>() {
            @Override
            public void run(final SourceContext<String> ctx) throws Exception {
                for (int count = 0; count < 5; count++) {
                    ctx.collect(String.valueOf(count));
                }
            }
            @Override
            public void cancel() {
            }
        });
        text.print().setParallelism(1);
        env.execute("Simple Test");
        // Never ends !
    }

}}
 

 

It's critical for us as we heavily rely on this "source exhaustion stop" 
mechanism to achieve proper stop of streaming applications from their own code, 
so it prevents us from using the last flink versions.

 

The log extract shows that the local cluster tried to shut down, but could not 
do it for no apparent reason:

 

{{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. 
Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to 
RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
 {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using 
default (Memory / JobManager) MemoryStateBackend (data in heap memory / 
checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
asynchronous: TRUE, maxStateSize: 5242880) 
(org.apache.flink.streaming.runtime.tasks.StreamTask:230)}}
 {{0}}
 {{1}}
 {{2}}
 {{3}}
 {{4}}
 {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to Std. 
Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. 
(org.apache.flink.runtime.taskmanager.Task:915)}}
 {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom 
Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). 
(org.apache.flink.runtime.taskmanager.Task:818)}}
 {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed 
for task Source: Custom Source -> Sink: Print to Std. Out (1/1) 
(07ae66bef91de06205cf22a337ea1fe2) [FINISHED] 
(org.apache.flink.runtime.taskmanager.Task:845)}}
 {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final 
execution state FINISHED to JobManager for task Source: Custom Source -> Sink: 
Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. 
(org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}}
 {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to Std. 
Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. 
(org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
 {{[2018-11-07 11:11:13,907] INFO Job Simple Test 
(0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. 
(org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}}
 {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 
0ef8697ca98f6d2b565ed928d17c8a49. 
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}}
 {{[2018-11-07 11:11:13,908] INFO Shutting down 
(org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}}
 {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster 
(org.apache.flink.runtime.minicluster.MiniCluster:427)}}
 {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. 
(org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}}
 {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor 
akka://flink/user/taskmanager_0. 
(org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}}
 {{[2018-11-07 11:11:23,583] INFO Shutting down 
TaskExecutorLocalStateStoresManager. 
(org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}}
 {{[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory 
C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814 
(org.apache.flink.runtime.io.disk.iomanager.IOManager:110)}}
 {{[2018-11-07 11:11:23,591] INFO Shutting down the network environment and its 
components. (org.apache.flink.runtime.io.network.NetworkEnvironment:344)}}
 {{[2018-11-07 11:11:23,591] INFO Removing cache directory 
C:\Users\alinz\AppData\Local\Temp\flink-web-ui 
(org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:733)}}
 {{[2018-11-07 11:11:23,593] INFO Closing the SlotManager. 
(org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:249)}}
 {{[2018-11-07 11:11:23,593] INFO Suspending the SlotManager. 
(org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:212)}}
 {{[2018-11-07 11:11:23,596] INFO Close ResourceManager connection 
cd021102669258aad77c20645ed08aae: ResourceManager leader changed to new address 
null. (org.apache.flink.runtime.jobmaster.JobMaster:1355)}}
 {{[2018-11-07 11:11:23,607] INFO Stop job leader service. 
(org.apache.flink.runtime.taskexecutor.JobLeaderService:135)}}
 {{[2018-11-07 11:11:23,608] INFO Stopped TaskExecutor 
akka://flink/user/taskmanager_0. 
(org.apache.flink.runtime.taskexecutor.TaskExecutor:330)}}

 

  was:
In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), 

This code never ends : 

 

{{ public void testFlink162() throws Exception {}}
{{ // get the execution environment}}
{{ final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();}}
{{ // get input data }}
{{ final DataStreamSource<String> text = env.addSource(new 
SourceFunction<String>() {}}
{{ @Override}}
{{ public void run(final SourceContext<String> ctx) throws Exception {}}
{{ for (int count = 0; count < 5; count++) {}}
{{ ctx.collect(String.valueOf(count));}}
{{ }}}
{{ }}}
{{ @Override}}
{{ public void cancel() {}}
{{ }}}
{{ });}}
{{ text.print().setParallelism(1);}}
{{ env.execute("Simple Test");}}
{{ // Never ends !}}
{{ }}}

 

 

It's critical for us as we heavily rely on this "source exhaustion stop" 
mechanism to achieve proper stop of streaming applications from their own code, 
so it prevents us from using the last flink versions.

 

The log extract shows that the local cluster tried to shut down, but could not 
do it for no apparent reason:

 

{{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. 
Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to 
RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
 {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using 
default (Memory / JobManager) MemoryStateBackend (data in heap memory / 
checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
asynchronous: TRUE, maxStateSize: 5242880) 
(org.apache.flink.streaming.runtime.tasks.StreamTask:230)}}
 {{0}}
 {{1}}
 {{2}}
 {{3}}
 {{4}}
 {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to Std. 
Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. 
(org.apache.flink.runtime.taskmanager.Task:915)}}
 {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom 
Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). 
(org.apache.flink.runtime.taskmanager.Task:818)}}
 {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed 
for task Source: Custom Source -> Sink: Print to Std. Out (1/1) 
(07ae66bef91de06205cf22a337ea1fe2) [FINISHED] 
(org.apache.flink.runtime.taskmanager.Task:845)}}
 {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final 
execution state FINISHED to JobManager for task Source: Custom Source -> Sink: 
Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. 
(org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}}
 {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to Std. 
Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to FINISHED. 
(org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
 {{[2018-11-07 11:11:13,907] INFO Job Simple Test 
(0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. 
(org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}}
 {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 
0ef8697ca98f6d2b565ed928d17c8a49. 
(org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}}
 {{[2018-11-07 11:11:13,908] INFO Shutting down 
(org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}}
 {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster 
(org.apache.flink.runtime.minicluster.MiniCluster:427)}}
 {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. 
(org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}}
 {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor 
akka://flink/user/taskmanager_0. 
(org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}}
 {{[2018-11-07 11:11:23,583] INFO Shutting down 
TaskExecutorLocalStateStoresManager. 
(org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}}
 {{[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory 
C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814 
(org.apache.flink.runtime.io.disk.iomanager.IOManager:110)}}
 {{[2018-11-07 11:11:23,591] INFO Shutting down the network environment and its 
components. (org.apache.flink.runtime.io.network.NetworkEnvironment:344)}}
 {{[2018-11-07 11:11:23,591] INFO Removing cache directory 
C:\Users\alinz\AppData\Local\Temp\flink-web-ui 
(org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:733)}}
 {{[2018-11-07 11:11:23,593] INFO Closing the SlotManager. 
(org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:249)}}
 {{[2018-11-07 11:11:23,593] INFO Suspending the SlotManager. 
(org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:212)}}
 {{[2018-11-07 11:11:23,596] INFO Close ResourceManager connection 
cd021102669258aad77c20645ed08aae: ResourceManager leader changed to new address 
null. (org.apache.flink.runtime.jobmaster.JobMaster:1355)}}
 {{[2018-11-07 11:11:23,607] INFO Stop job leader service. 
(org.apache.flink.runtime.taskexecutor.JobLeaderService:135)}}
 {{[2018-11-07 11:11:23,608] INFO Stopped TaskExecutor 
akka://flink/user/taskmanager_0. 
(org.apache.flink.runtime.taskexecutor.TaskExecutor:330)}}

 


> StreamExecutionEnvironment.execute() does not return when all sources end
> -------------------------------------------------------------------------
>
>                 Key: FLINK-10832
>                 URL: https://issues.apache.org/jira/browse/FLINK-10832
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 1.5.5, 1.6.2
>            Reporter: Arnaud Linz
>            Priority: Critical
>
> In 1.5.5, 1.6.1 and 1.6.2 (it works in 1.6.0 and 1.3.2), 
> This code never ends : 
>  {{ 
>     public void testFlink162() throws Exception {
>         // get the execution environment
>         final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         // get input data 
>         final DataStreamSource<String> text = env.addSource(new 
> SourceFunction<String>() {
>             @Override
>             public void run(final SourceContext<String> ctx) throws Exception 
> {
>                 for (int count = 0; count < 5; count++) {
>                     ctx.collect(String.valueOf(count));
>                 }
>             }
>             @Override
>             public void cancel() {
>             }
>         });
>         text.print().setParallelism(1);
>         env.execute("Simple Test");
>         // Never ends !
>     }
> }}
>  
>  
> It's critical for us as we heavily rely on this "source exhaustion stop" 
> mechanism to achieve proper stop of streaming applications from their own 
> code, so it prevents us from using the last flink versions.
>  
> The log extract shows that the local cluster tried to shut down, but could 
> not do it for no apparent reason:
>  
> {{[2018-11-07 11:11:13,837] INFO Source: Custom Source -> Sink: Print to Std. 
> Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from DEPLOYING to 
> RUNNING. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,840] INFO No state backend has been configured, using 
> default (Memory / JobManager) MemoryStateBackend (data in heap memory / 
> checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
> asynchronous: TRUE, maxStateSize: 5242880) 
> (org.apache.flink.streaming.runtime.tasks.StreamTask:230)}}
>  {{0}}
>  {{1}}
>  {{2}}
>  {{3}}
>  {{4}}
>  {{[2018-11-07 11:11:13,897] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.taskmanager.Task:915)}}
>  {{[2018-11-07 11:11:13,897] INFO Freeing task resources for Source: Custom 
> Source -> Sink: Print to Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2). 
> (org.apache.flink.runtime.taskmanager.Task:818)}}
>  {{[2018-11-07 11:11:13,898] INFO Ensuring all FileSystem streams are closed 
> for task Source: Custom Source -> Sink: Print to Std. Out (1/1) 
> (07ae66bef91de06205cf22a337ea1fe2) [FINISHED] 
> (org.apache.flink.runtime.taskmanager.Task:845)}}
>  {{[2018-11-07 11:11:13,899] INFO Un-registering task and sending final 
> execution state FINISHED to JobManager for task Source: Custom Source -> 
> Sink: Print to Std. Out 07ae66bef91de06205cf22a337ea1fe2. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:1337)}}
>  {{[2018-11-07 11:11:13,904] INFO Source: Custom Source -> Sink: Print to 
> Std. Out (1/1) (07ae66bef91de06205cf22a337ea1fe2) switched from RUNNING to 
> FINISHED. (org.apache.flink.runtime.executiongraph.ExecutionGraph:1316)}}
>  {{[2018-11-07 11:11:13,907] INFO Job Simple Test 
> (0ef8697ca98f6d2b565ed928d17c8a49) switched from state RUNNING to FINISHED. 
> (org.apache.flink.runtime.executiongraph.ExecutionGraph:1356)}}
>  {{[2018-11-07 11:11:13,908] INFO Stopping checkpoint coordinator for job 
> 0ef8697ca98f6d2b565ed928d17c8a49. 
> (org.apache.flink.runtime.checkpoint.CheckpointCoordinator:320)}}
>  {{[2018-11-07 11:11:13,908] INFO Shutting down 
> (org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore:102)}}
>  {{[2018-11-07 11:11:23,579] INFO Shutting down Flink Mini Cluster 
> (org.apache.flink.runtime.minicluster.MiniCluster:427)}}
>  {{[2018-11-07 11:11:23,580] INFO Shutting down rest endpoint. 
> (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:265)}}
>  {{[2018-11-07 11:11:23,582] INFO Stopping TaskExecutor 
> akka://flink/user/taskmanager_0. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:291)}}
>  {{[2018-11-07 11:11:23,583] INFO Shutting down 
> TaskExecutorLocalStateStoresManager. 
> (org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager:213)}}
>  {{[2018-11-07 11:11:23,591] INFO I/O manager removed spill file directory 
> C:\Users\alinz\AppData\Local\Temp\flink-io-6a19871b-3b86-4a47-9b82-28eef7e55814
>  (org.apache.flink.runtime.io.disk.iomanager.IOManager:110)}}
>  {{[2018-11-07 11:11:23,591] INFO Shutting down the network environment and 
> its components. (org.apache.flink.runtime.io.network.NetworkEnvironment:344)}}
>  {{[2018-11-07 11:11:23,591] INFO Removing cache directory 
> C:\Users\alinz\AppData\Local\Temp\flink-web-ui 
> (org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint:733)}}
>  {{[2018-11-07 11:11:23,593] INFO Closing the SlotManager. 
> (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:249)}}
>  {{[2018-11-07 11:11:23,593] INFO Suspending the SlotManager. 
> (org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager:212)}}
>  {{[2018-11-07 11:11:23,596] INFO Close ResourceManager connection 
> cd021102669258aad77c20645ed08aae: ResourceManager leader changed to new 
> address null. (org.apache.flink.runtime.jobmaster.JobMaster:1355)}}
>  {{[2018-11-07 11:11:23,607] INFO Stop job leader service. 
> (org.apache.flink.runtime.taskexecutor.JobLeaderService:135)}}
>  {{[2018-11-07 11:11:23,608] INFO Stopped TaskExecutor 
> akka://flink/user/taskmanager_0. 
> (org.apache.flink.runtime.taskexecutor.TaskExecutor:330)}}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to