[ 
https://issues.apache.org/jira/browse/FLINK-30131?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17637165#comment-17637165
 ] 

Weijie Guo edited comment on FLINK-30131 at 11/22/22 12:52 PM:
---------------------------------------------------------------

[~landlord] Thanks for reporting this.

At present, I have not investigated this issue in depth, but only made further 
understanding:
1. It can be seen that the thread is stuck in the 
`requestMemorySegmentBlocking` when the data shuffle is written. Theoretically, 
this will not be stuck all the time, and will be reused with downstream 
consumption. So, Is your job hanging forever?
2. What environment did you run this job, local or YARN/K8S, which helps to 
reproduce the problem. And can you also provide your flink-conf.yaml?
3. If you need to increase the number of network buffers, you only need to 
increase the total TM memory and network memory, adjusting 
`taskmanager.network.memory.buffers-per-channel` and 
`taskmanager.network.memory.floating-buffers-per-gate` will not help solve this 
problem, but will also make the buffer request more competitive.

TBH, I am not familiar with the implementation of iteration, cc [~gaoyunhaii]  
for more professional advice.

 


was (Author: weijie guo):
[~landlord] Thanks for reporting this.

At present, I have not investigated this issue in depth, but only made further 
understanding:
1. It can be seen that the thread is stuck in the 
`requestMemorySegmentBlocking` when the data shuffle is written. Theoretically, 
this will not be stuck all the time, and will be reused with downstream 
consumption. So, Is your job hanging forever?
2. What environment did you run this job, local or YARN/K8S, which helps to 
reproduce the problem. And can you also provide your flink-conf.yaml?
3. If you need to increase the number of network buffers, you only need to 
increase the total TM memory and network memory, adjusting 
`taskmanager.network.memory.buffers-per-channel` and 
`taskmanager.network.memory.floating-buffers-per-gate` will not help solve this 
problem, but will also make the buffer request more competitive.

> flink iterate will suspend when record is a bit large
> -----------------------------------------------------
>
>                 Key: FLINK-30131
>                 URL: https://issues.apache.org/jira/browse/FLINK-30131
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.15.2
>            Reporter: Lu
>            Priority: Major
>         Attachments: image-2022-11-22-14-59-08-272.png
>
>
>  
> {code:java}
> //代码占位符
> Configuration configuration = new Configuration();
> configuration.setInteger(RestOptions.PORT, 8082);
> configuration.setInteger(NETWORK_MAX_BUFFERS_PER_CHANNEL, 10000000);
> configuration.set(TaskManagerOptions.NETWORK_MEMORY_MAX, 
> MemorySize.parse("4g"));
> configuration.setInteger("taskmanager.network.memory.buffers-per-channel", 
> 10000000);
> configuration.setInteger("taskmanager.network.memory.floating-buffers-per-gate",
>  10000000);
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
> env.setParallelism(1);
> List<Integer> list = new ArrayList<>(10);
> for (int i = 1; i < 10000; i++) {
>     list.add(i);
> }
> DataStreamSource<Integer> integerDataStreamSource = env.fromCollection(list);
> DataStream<byte[]> map = integerDataStreamSource.map(i -> new 
> byte[10000000]).setParallelism(1).name("map to byte[]").shuffle();
> IterativeStream<byte[]> iterate = map.iterate();
> DataStream<byte[]> map1 = iterate.process(new ProcessFunction<byte[], 
> byte[]>() {
>     @Override
>     public void processElement(byte[] value, ProcessFunction<byte[], 
> byte[]>.Context ctx, Collector<byte[]> out) throws Exception {
>         out.collect(value);
>     }
> }).name("multi collect");
> DataStream<byte[]> filter = map1.filter(i -> true 
> ).setParallelism(1).name("feedback");
> iterate.closeWith(filter);
> map1.map(bytes -> bytes.length).name("map to length").print();
> env.execute(); {code}
> my code is above.
>  
> when i use iterate with big record ,  the iterate will suspend at a random 
> place. when i saw the stack, it has a suspicious thread
> !image-2022-11-22-14-59-08-272.png|width=751,height=328!
> it seems like a network related problem. so i increse the network buffer 
> memory and num. but it only delay the suspend point,  it will still suspend 
> after iterate a little more times than before.
> i want to know if this is a bug or i have some error in my code or 
> configuration.
> looking forward to your reply. thanks in advance.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to