[jira] [Created] (FLINK-31386) Fix the potential deadlock issue of blocking shuffle
Yingjie Cao created FLINK-31386: --- Summary: Fix the potential deadlock issue of blocking shuffle Key: FLINK-31386 URL: https://issues.apache.org/jira/browse/FLINK-31386 Project: Flink Issue Type: Bug Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.17.0 Currently, the SortMergeResultPartition may allocate more network buffers than the guaranteed size of the LocalBufferPool. As a result, some result partitions may need to wait other result partitions to release the over-allocated network buffers to continue. However, the result partitions which have allocated more than guaranteed buffers relies on the processing of input data to trigger data spilling and buffer recycling. The input data further relies on batch reading buffers used by the SortMergeResultPartitionReadScheduler which may already taken by those blocked result partitions which are waiting for buffers. Then deadlock occurs. We can easily fix this deadlock by reserving the guaranteed buffers on initializing. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29351) Enable input buffer floating for blocking shuffle
Yingjie Cao created FLINK-29351: --- Summary: Enable input buffer floating for blocking shuffle Key: FLINK-29351 URL: https://issues.apache.org/jira/browse/FLINK-29351 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.17.0 At input gate, Flink needs exclusive buffers for each input channel. For large parallelism jobs, it is easy to cause "Insufficient number of network buffers" error. This ticket aims to make all input network buffers floating for blocking shuffle to reduce the possibility of "Insufficient number of network buffers" error. This change can also improve the default blocking shuffle performance because buffer floating can increase the buffer utilization. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-29299) Fix the network memory size calculation issue in fine-grained resource mode
Yingjie Cao created FLINK-29299: --- Summary: Fix the network memory size calculation issue in fine-grained resource mode Key: FLINK-29299 URL: https://issues.apache.org/jira/browse/FLINK-29299 Project: Flink Issue Type: Improvement Components: Runtime / Network Affects Versions: 1.16.0 Reporter: Yingjie Cao Fix For: 1.16.0 After FLINK-28663, one intermediate dataset can be consumed by multiple consumers, there is a case where one vertex can consume one intermediate dataset multiple times. However, currently in fine-grained resource mode, when computing the required network buffer size, the intermediate dataset is used as key to record the size of network buffer per input gate, which means it may allocate less network buffers than needed if two input gate of the same vertex consumes the same intermediate dataset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28663) Allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side
Yingjie Cao created FLINK-28663: --- Summary: Allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side Key: FLINK-28663 URL: https://issues.apache.org/jira/browse/FLINK-28663 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: Yingjie Cao Currently, one intermediate dataset can only be consumed by one downstream consumer vertex. If there are multiple consumer vertices consuming the same output of the same upstream vertex, multiple intermediate datasets will be produced. We can optimize this behavior to produce only one intermediate dataset which can be shared by multiple consumer vertices. As the first step, we should allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side. (Note that this optimization only works for blocking shuffle because pipelined shuffle result partition can not be consumed multiple times) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28561) Merge subpartition shuffle data read request for better sequential IO
Yingjie Cao created FLINK-28561: --- Summary: Merge subpartition shuffle data read request for better sequential IO Key: FLINK-28561 URL: https://issues.apache.org/jira/browse/FLINK-28561 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.17.0 Currently, the shuffle data of each subpartition for blocking shuffle is read separately. To achieve better performance and reduce IOPS, we can merge consecutive data requests of the same field together and serves them in one IO request. More specifically, 1) if multiple data requests are reading the same data, for example, reading broadcast data, the reader will read the data only once and send the same piece of data to multiple downstream consumers. 2) if multiple data requests are reading the consecutive data in one file, we will merge those data requests together as one large request and read a larger size of data sequentially which is good for file IO performance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28556) Extract header fields of Buffer into a BufferHeader class for blocking shuffle file IO
Yingjie Cao created FLINK-28556: --- Summary: Extract header fields of Buffer into a BufferHeader class for blocking shuffle file IO Key: FLINK-28556 URL: https://issues.apache.org/jira/browse/FLINK-28556 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.16.0 This is a small code refactor which also can be reused by following PRs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28551) Store the number of bytes instead of the number of buffers in index entry for sort-shuffle
Yingjie Cao created FLINK-28551: --- Summary: Store the number of bytes instead of the number of buffers in index entry for sort-shuffle Key: FLINK-28551 URL: https://issues.apache.org/jira/browse/FLINK-28551 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.16.0 Currently, in each index entry of sort-shuffle index file, one filed is the number of buffers in the current data region. The problem is that it is hard to know the data boundary before reading the file, to solve the problem, we can store the number of bytes instead of the number of buffers in index entry. Based on this change, we can do some optimization, for example, read larger size of data than a buffer for better sequential IO like what's mentioned in FLINK-28373. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28550) Remove the unused field in SortMergeSubpartitionReader
Yingjie Cao created FLINK-28550: --- Summary: Remove the unused field in SortMergeSubpartitionReader Key: FLINK-28550 URL: https://issues.apache.org/jira/browse/FLINK-28550 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.16.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28519) Fix the bug that SortMergeResultPartitionReadScheduler may not read data sequentially
Yingjie Cao created FLINK-28519: --- Summary: Fix the bug that SortMergeResultPartitionReadScheduler may not read data sequentially Key: FLINK-28519 URL: https://issues.apache.org/jira/browse/FLINK-28519 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.16.0 Currently, the SortMergeResultPartitionReadScheduler always gets all active subpartition readers and read at most one data region for them. It is common that some subpartitions are requested before others and their region indexes are ahead of others. If all region data of a subpartition can be read in one round, some subpartition readers will always ahead of others which will cause random IO. This patch fixes this case by polling one subpartition reader at a time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28514) Remove data flush in SortMergeResultPartition
Yingjie Cao created FLINK-28514: --- Summary: Remove data flush in SortMergeResultPartition Key: FLINK-28514 URL: https://issues.apache.org/jira/browse/FLINK-28514 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.16.0 This patch aims to remove the data flush in SortMergeResultPartition because it is useless. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28512) Select HashBasedDataBuffer and SortBasedDataBuffer dynamically based on the number of network buffers can be allocated for
Yingjie Cao created FLINK-28512: --- Summary: Select HashBasedDataBuffer and SortBasedDataBuffer dynamically based on the number of network buffers can be allocated for Key: FLINK-28512 URL: https://issues.apache.org/jira/browse/FLINK-28512 Project: Flink Issue Type: Sub-task Reporter: Yingjie Cao -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28382) Introduce new compression algorithms of higher compression ratio
Yingjie Cao created FLINK-28382: --- Summary: Introduce new compression algorithms of higher compression ratio Key: FLINK-28382 URL: https://issues.apache.org/jira/browse/FLINK-28382 Project: Flink Issue Type: Bug Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.16.0 Currently, we use lz4 for shuffle data compression which is a good balance between IO optimization and CPU consumption. But for some scenarios, the IO becomes bottleneck and the storage space is limited (especially for k8s environment). For these cases, we need compression algorithms of higher compression ratio to further reduce IO. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28380) Produce one intermediate dataset for multiple consumers consuming the same data
Yingjie Cao created FLINK-28380: --- Summary: Produce one intermediate dataset for multiple consumers consuming the same data Key: FLINK-28380 URL: https://issues.apache.org/jira/browse/FLINK-28380 Project: Flink Issue Type: Sub-task Components: Client / Job Submission, Runtime / Coordination, Runtime / Network Reporter: Yingjie Cao Fix For: 1.16.0 Currently, if one output of an upstream job vertex is consumed by multiple downstream job vertices, the upstream vertex will produce multiple dataset. For blocking shuffle, it means serialize and persist the same data multiple times. This ticket aims to optimize this behavior and make the upstream job vertex produce one dataset which will be read by multiple downstream vertex. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28378) Use larger data reading buffer size for sort-shuffle
Yingjie Cao created FLINK-28378: --- Summary: Use larger data reading buffer size for sort-shuffle Key: FLINK-28378 URL: https://issues.apache.org/jira/browse/FLINK-28378 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.16.0 Currently, for sort shuffle, we always use the network buffer size as the data reading buffer size which is 32K by default. We can increase this buffer size for better performance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28377) Enable to request less number of data reading buffers for sort-shuffle if there is no enough data
Yingjie Cao created FLINK-28377: --- Summary: Enable to request less number of data reading buffers for sort-shuffle if there is no enough data Key: FLINK-28377 URL: https://issues.apache.org/jira/browse/FLINK-28377 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.16.0 Currently, for sort blocking shuffle, the corresponding data reader always allocate a fixed size of buffers for shuffle data reading even when there is no much data to read which will cause buffer waste. We can optimize this to allocate less number of reading buffers if there is no enough data. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28376) Reduce the number of IO threads for sort-shuffle
Yingjie Cao created FLINK-28376: --- Summary: Reduce the number of IO threads for sort-shuffle Key: FLINK-28376 URL: https://issues.apache.org/jira/browse/FLINK-28376 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.16.0 Currently, the number of IO threads for shuffle data reading is relevant to the size of reading memory and the number of CPU cores. We should also consider the number of slots and the number of disks. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28374) Some further improvements of sort-shuffle
Yingjie Cao created FLINK-28374: --- Summary: Some further improvements of sort-shuffle Key: FLINK-28374 URL: https://issues.apache.org/jira/browse/FLINK-28374 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.16.0 This is an umbrella issue for sort-shuffle Improvements. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-28373) Read larger size of data sequentially for sort-shuffle
Yingjie Cao created FLINK-28373: --- Summary: Read larger size of data sequentially for sort-shuffle Key: FLINK-28373 URL: https://issues.apache.org/jira/browse/FLINK-28373 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.16.0 Currently, for sort blocking shuffle, the corresponding data readers read shuffle data in buffer granularity. Before compression, each buffer is 32K by default, after compression the size will become smaller (may less than 10K). For file IO, this is pretty smaller. To achieve better performance and reduce IOPS, we can merge consecutive data requests of the same field together and serves them in one IO request. More specifically, 1) if multiple data requests are reading the same data, for example, reading broadcast data, the reader will read the data only once and send the same piece of data to multiple downstream consumers. 2) if multiple data requests are reading the consecutive data in one file, we will merge those data requests together as one large request and read a larger size of data sequentially which is good for file IO performance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-26347) Should use Flink system Classloader (AppClassloader) when deserializing RPC message
Yingjie Cao created FLINK-26347: --- Summary: Should use Flink system Classloader (AppClassloader) when deserializing RPC message Key: FLINK-26347 URL: https://issues.apache.org/jira/browse/FLINK-26347 Project: Flink Issue Type: Bug Affects Versions: 1.15.0 Reporter: Yingjie Cao Fix For: 1.15.0 FLINK-25742 removed the redundant serialization of RPC invocation at Flink side. However, by accident, which changes the class loading behavior. Before FLINK-25742, Flink system Classloader is used to load RPC message class, but after FLINK-25742, the RpcSystem Classloader (its parent Classloader is not Flink system Classloader) is used which can cause ClassNotFoundException. I encountered this exception when trying to run flink-remote-shuffle on the latest Flink 1.15-SNAPSHOT, the remote shuffle class (shuffle descriptor class) can not be found even when the corresponding jar file is in Flink lib/ directory. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25960) Distribute the data read buffers more fairly among result partitions for sort-shuffle
Yingjie Cao created FLINK-25960: --- Summary: Distribute the data read buffers more fairly among result partitions for sort-shuffle Key: FLINK-25960 URL: https://issues.apache.org/jira/browse/FLINK-25960 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.15.0 Currently, the data read buffers for sort-shuffle are allocated in a random way and some result partitions may occupy too many buffers which leads to the starvation of other result partitions. This ticket aims to improve the scenario by not reading data for those result partitions which already occupy more than the average number of read buffers per result partition. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25959) Add a micro-benchmark for the sort-based blocking shuffle
Yingjie Cao created FLINK-25959: --- Summary: Add a micro-benchmark for the sort-based blocking shuffle Key: FLINK-25959 URL: https://issues.apache.org/jira/browse/FLINK-25959 Project: Flink Issue Type: Sub-task Reporter: Yingjie Cao -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25860) Move read buffer allocation and output file creation to setup method for sort-shuffle result partition to avoid blocking main thread
Yingjie Cao created FLINK-25860: --- Summary: Move read buffer allocation and output file creation to setup method for sort-shuffle result partition to avoid blocking main thread Key: FLINK-25860 URL: https://issues.apache.org/jira/browse/FLINK-25860 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.15.0 Currently, the read buffer allocation and output file creation of sort-shuffle is performed by the main thread. These operations are a little heavy and can block the main thread for a while which may influence other RPC calls including follow-up task deployment. This ticket aims to solve the issue by moving read buffer allocation and output file creation to setup method. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25796) Avoid record copy for result partition of sort-shuffle if there are enough buffers for better performance
Yingjie Cao created FLINK-25796: --- Summary: Avoid record copy for result partition of sort-shuffle if there are enough buffers for better performance Key: FLINK-25796 URL: https://issues.apache.org/jira/browse/FLINK-25796 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.15.0 Currently, for result partition of sort-shuffle, there is extra record copy overhead Introduced by clustering records by subpartition index. For small records, this overhead can cause even 20% performance regression. This ticket aims to solve the problem. In fact, the hash-based implementation is a nature way to achieve the goal of sorting records by partition index. However, it incurs some serious weaknesses. For example, when there is no enough buffers or there is data skew, it can waste buffers and influence compression efficiency which can cause performance regression. This ticket tries to solve the issue by dynamically switching between the two implementations, that is, if there are enough buffers, the hash-based implementation will be used and if there is no enough buffers, the sort-based implementation will be used. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25786) Adjust the generation of subpartition data storage order for sort-shuffle from random shuffle to random shift
Yingjie Cao created FLINK-25786: --- Summary: Adjust the generation of subpartition data storage order for sort-shuffle from random shuffle to random shift Key: FLINK-25786 URL: https://issues.apache.org/jira/browse/FLINK-25786 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.15.0 Currently, for sort-shuffle the generation of subpartition data storage order is random shuffle. However, if there is no enough resources to run the downstream consumer tasks in parallel, the performance can be influenced because of the random disk IO caused by the random subpartition data storage order. This ticket aims to improve this scenario. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25781) Adjust the maximum number of buffers can be used per result partition for shuffle data read
Yingjie Cao created FLINK-25781: --- Summary: Adjust the maximum number of buffers can be used per result partition for shuffle data read Key: FLINK-25781 URL: https://issues.apache.org/jira/browse/FLINK-25781 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.15.0 In the current sort-shuffle implementation, the maximum number of buffers can be used per result partition for shuffle data read is 32M. However, for large parallelism jobs, 32M is not enough and for small parallelism jobs, 32M may waste buffers. This ticket aims to adjust the maximum number of buffers can be used per result partition to let it adjust according to parallelism and the selected value is an empirical one based on the TPC-DS test results. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25780) Reduce the maximum number of data output buffers for sort-shuffle
Yingjie Cao created FLINK-25780: --- Summary: Reduce the maximum number of data output buffers for sort-shuffle Key: FLINK-25780 URL: https://issues.apache.org/jira/browse/FLINK-25780 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.15.0 The data output buffer of sort-shuffle is for better disk IO performance and currently, the total data output buffer size is 16M which is pretty big. However, blocking request too many buffers may influence performance. This ticket aims to reduce the maximum number of data output buffers to reduce the buffer request time. The selected value is an empirical one based on the TPC-DS test results. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25774) Restrict the maximum number of buffers can be used per result partition for blocking shuffle
Yingjie Cao created FLINK-25774: --- Summary: Restrict the maximum number of buffers can be used per result partition for blocking shuffle Key: FLINK-25774 URL: https://issues.apache.org/jira/browse/FLINK-25774 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.15.0 Currently, for blocking shuffle, the maximum number of buffers can be used per result partition is Integer.MAX_VALUE. For hash-shuffle, the maximum number of buffers to be used is (numSubpartition + 1), because the hash-shuffle implementation always flush the previous buffer after a new buffer is added, so setting the maximum number of buffers can be used to Integer.MAX_VALUE is meaningless. For sort-shuffle, if too many buffers are taken by one result partition, other result partitions and input gates may spend too much time waiting for buffers which can influence performance. This ticket aims to restrict the maximum number of buffers can be used per result partition and the selected value is an empirical one based on the TPC-DS test results. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25654) Remove the redundant lock in SortMergeResultPartition
Yingjie Cao created FLINK-25654: --- Summary: Remove the redundant lock in SortMergeResultPartition Key: FLINK-25654 URL: https://issues.apache.org/jira/browse/FLINK-25654 Project: Flink Issue Type: Sub-task Components: Runtime / Network Affects Versions: 1.14.3 Reporter: Yingjie Cao Fix For: 1.15.0, 1.14.4 After FLINK-2372, the task canceler will never call the close method of ResultPartition, this can reduce some race conditions and simplify the code. This ticket aims to remove some redundant locks in SortMergeResultPartition. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25653) Move buffer recycle in SortMergeSubpartitionReader out of lock to avoid deadlock
Yingjie Cao created FLINK-25653: --- Summary: Move buffer recycle in SortMergeSubpartitionReader out of lock to avoid deadlock Key: FLINK-25653 URL: https://issues.apache.org/jira/browse/FLINK-25653 Project: Flink Issue Type: Sub-task Components: Runtime / Network Affects Versions: 1.13.5, 1.14.3 Reporter: Yingjie Cao Fix For: 1.15.0, 1.13.6, 1.14.4 For the current sort-shuffle implementation, the different lock order in SortMergeSubpartitionReader and SortMergeResultPartitionReadScheduler can cause deadlock. To solve the problem, we can move buffer recycle in SortMergeSubpartitionReader out of the lock. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25640) Enhance the document for blocking shuffle
Yingjie Cao created FLINK-25640: --- Summary: Enhance the document for blocking shuffle Key: FLINK-25640 URL: https://issues.apache.org/jira/browse/FLINK-25640 Project: Flink Issue Type: Sub-task Components: Documentation Reporter: Yingjie Cao Fix For: 1.15.0 As discussed in [https://lists.apache.org/thread/pt2b1f17x2l5rlvggwxs6m265lo4ly7p], this ticket aims to enhance the document for blocking shuffle and add more operation guidelines. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25639) Increase the default read buffer size of sort-shuffle to 64M
Yingjie Cao created FLINK-25639: --- Summary: Increase the default read buffer size of sort-shuffle to 64M Key: FLINK-25639 URL: https://issues.apache.org/jira/browse/FLINK-25639 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.15.0 As discussed in [https://lists.apache.org/thread/pt2b1f17x2l5rlvggwxs6m265lo4ly7p], this ticket aims to increase the default read buffer size of sort-shuffle to 64M. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25638) Increase the default write buffer size of sort-shuffle to 16M
Yingjie Cao created FLINK-25638: --- Summary: Increase the default write buffer size of sort-shuffle to 16M Key: FLINK-25638 URL: https://issues.apache.org/jira/browse/FLINK-25638 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.15.0 As discussed in [https://lists.apache.org/thread/pt2b1f17x2l5rlvggwxs6m265lo4ly7p], this ticket aims to increase the default write buffer size of sort-shuffle to 16M. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25637) Make sort-shuffle the default shuffle implementation for batch jobs
Yingjie Cao created FLINK-25637: --- Summary: Make sort-shuffle the default shuffle implementation for batch jobs Key: FLINK-25637 URL: https://issues.apache.org/jira/browse/FLINK-25637 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.15.0 As discussed in [https://lists.apache.org/thread/pt2b1f17x2l5rlvggwxs6m265lo4ly7p], this ticket aims to make sort-shuffle the default shuffle implementation for batch jobs. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25636) FLIP-199: Change some default config values of blocking shuffle for better usability
Yingjie Cao created FLINK-25636: --- Summary: FLIP-199: Change some default config values of blocking shuffle for better usability Key: FLINK-25636 URL: https://issues.apache.org/jira/browse/FLINK-25636 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.15.0 This is the umbrella issue for FLIP-199, we will change the several default config value for batch shuffle and update the document accordingly. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25607) Sorting by duration on Flink Web UI does not work correctly
Yingjie Cao created FLINK-25607: --- Summary: Sorting by duration on Flink Web UI does not work correctly Key: FLINK-25607 URL: https://issues.apache.org/jira/browse/FLINK-25607 Project: Flink Issue Type: Bug Reporter: Yingjie Cao Attachments: image-2022-01-11-16-44-10-709.png The Flink version used is 1.14. !image-2022-01-11-16-44-10-709.png! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-25606) Requesting exclusive buffers timeout when recovering from unaligned checkpoint under fine-grained resource mode
Yingjie Cao created FLINK-25606: --- Summary: Requesting exclusive buffers timeout when recovering from unaligned checkpoint under fine-grained resource mode Key: FLINK-25606 URL: https://issues.apache.org/jira/browse/FLINK-25606 Project: Flink Issue Type: Bug Reporter: Yingjie Cao When converting the RecoveredInputChannel to RemoteInputChannel, the network buffer is not enough to initialize input channel exclusive buffers. Here is the exception stack: {code:java} java.io.IOException: Timeout triggered when requesting exclusive buffers: The total number of network buffers is currently set to 6144 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max', or you may increase the timeout which is 3ms by setting the key 'taskmanager.network.memory.exclusive-buffers-request-timeout-ms'. at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:205) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:60) at org.apache.flink.runtime.io.network.partition.consumer.BufferManager.requestExclusiveBuffers(BufferManager.java:133) at org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.setup(RemoteInputChannel.java:157) at org.apache.flink.runtime.io.network.partition.consumer.RemoteRecoveredInputChannel.toInputChannelInternal(RemoteRecoveredInputChannel.java:77) at org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel.toInputChannel(RecoveredInputChannel.java:106) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.convertRecoveredInputChannels(SingleInputGate.java:307) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:290) at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.requestPartitions(InputGateWithMetrics.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:359) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:323) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684) at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639) at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) at java.lang.Thread.run(Thread.java:834) {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24954) Reset read buffer request timeout on buffer recycling for sort-shuffle
Yingjie Cao created FLINK-24954: --- Summary: Reset read buffer request timeout on buffer recycling for sort-shuffle Key: FLINK-24954 URL: https://issues.apache.org/jira/browse/FLINK-24954 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.15.0 Currently, the read buffer request timeout implementation of sort-shuffle is a little aggressive. As reported in the mailing list: [https://lists.apache.org/thread/bd3s5bqfg9oxlb1g1gg3pxs3577lhf88]. The TimeoutException may be triggered if there is data skew and the downstream task is slow. Actually, we can further improve this case by reseting the request timeout on buffer recycling. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24901) Some further improvements of the pluggable shuffle framework
Yingjie Cao created FLINK-24901: --- Summary: Some further improvements of the pluggable shuffle framework Key: FLINK-24901 URL: https://issues.apache.org/jira/browse/FLINK-24901 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.15.0 This is an umbrella issue including several further improvements of the pluggable shuffle framework. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24900) Support to run multiple shuffle plugins in one session cluster
Yingjie Cao created FLINK-24900: --- Summary: Support to run multiple shuffle plugins in one session cluster Key: FLINK-24900 URL: https://issues.apache.org/jira/browse/FLINK-24900 Project: Flink Issue Type: Improvement Reporter: Yingjie Cao Currently, one Flink cluster can only use one shuffle plugin. However, there are cases where different jobs may need different shuffle implementations. By loading shuffle plugin with the plugin manager and letting jobs select their shuffle service freely, Flink can support to run multiple shuffle plugins in one session cluster. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24899) Enable data compression for blocking shuffle by default
Yingjie Cao created FLINK-24899: --- Summary: Enable data compression for blocking shuffle by default Key: FLINK-24899 URL: https://issues.apache.org/jira/browse/FLINK-24899 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.15.0 Currently, shuffle data compression is not enabled by default. Shuffle data compression is important for blocking data shuffle and enabling shuffle data compression by default can improve the usability. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24898) Some further improvements of sort-shuffle
Yingjie Cao created FLINK-24898: --- Summary: Some further improvements of sort-shuffle Key: FLINK-24898 URL: https://issues.apache.org/jira/browse/FLINK-24898 Project: Flink Issue Type: Bug Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.15.0 This is an umbrella issue including several further improvements of sort-shuffle. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-24035) Notify the buffer listeners when the local buffer pool receives available notification from the global pool
Yingjie Cao created FLINK-24035: --- Summary: Notify the buffer listeners when the local buffer pool receives available notification from the global pool Key: FLINK-24035 URL: https://issues.apache.org/jira/browse/FLINK-24035 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.14.0 Reporter: Yingjie Cao Fix For: 1.14.0 The buffer listeners are not notified when the the local buffer pool receives available notification from the global pool. This may cause potential deadlock issue: # A LocalBufferPool is created, but there is no available buffers in the global NetworkBufferPool. # The LocalBufferPool registers an available buffer listener to the global NetworkBufferPool. # The BufferManager requests buffers from the LocalBufferPool but no buffer is available. As a result, it registers an available buffer listener to the LocalBufferPool. # A buffer is recycled to the global pool and the local buffer pool is notified about the available buffer. # The local buffer pool requests the available buffer from the global pool but the registered available buffer listener of BufferManager is not notified and it can never get a chance to be notified so deadlock occurs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23724) Network buffer leak when ResultPartition is released (failover)
Yingjie Cao created FLINK-23724: --- Summary: Network buffer leak when ResultPartition is released (failover) Key: FLINK-23724 URL: https://issues.apache.org/jira/browse/FLINK-23724 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.14.0 Reporter: Yingjie Cao Fix For: 1.14.0 The BufferBuilders in BufferWritingResultPartition are not properly released when ResultPartition is released. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23275) Support to release cluster partitions stored externally
Yingjie Cao created FLINK-23275: --- Summary: Support to release cluster partitions stored externally Key: FLINK-23275 URL: https://issues.apache.org/jira/browse/FLINK-23275 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination, Runtime / Network Reporter: Yingjie Cao Fix For: 1.14.0 Currently, if the cluster partition is stored externally, it can not be released by the partition tracker, one of reason is that the ShuffleMaster is not a cluster level service, after FLINK-23214,we can release the cluster partition by ShuffleMaster. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23249) Introduce ShuffleMasterContext to ShuffleMaster
Yingjie Cao created FLINK-23249: --- Summary: Introduce ShuffleMasterContext to ShuffleMaster Key: FLINK-23249 URL: https://issues.apache.org/jira/browse/FLINK-23249 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination, Runtime / Network Reporter: Yingjie Cao Fix For: 1.14.0 Introduce ShuffleMasterContext to ShuffleMaster. Just like the ShuffleEnvironmentContext at the TaskManager side, the ShuffleMasterContext can act as a proxy of ShuffleMaster and other components of Flink like the ResourceManagerPartitionTracker. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23214) Make ShuffleMaster a cluster level shared service
Yingjie Cao created FLINK-23214: --- Summary: Make ShuffleMaster a cluster level shared service Key: FLINK-23214 URL: https://issues.apache.org/jira/browse/FLINK-23214 Project: Flink Issue Type: Sub-task Components: Runtime / Network Affects Versions: 1.14.0 Reporter: Yingjie Cao This ticket tries to make ShuffleMaster a cluster level shared service which makes it consistent with the ShuffleEnvironment at the TM side. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22910) ShuffleMaster enhancement for pluggable shuffle service framework
Yingjie Cao created FLINK-22910: --- Summary: ShuffleMaster enhancement for pluggable shuffle service framework Key: FLINK-22910 URL: https://issues.apache.org/jira/browse/FLINK-22910 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Yingjie Cao Fix For: 1.14.0 The current _ShuffleMaster_ has an unclear lifecycle which is inconsistent with the _ShuffleEnvironment_ at the _TM_ side. Besides, it is hard to Implement some important capabilities for remote shuffle service. For example, 1) release external resources when a job finished; 2) Stop or start tracking some partitions depending on the status of the external service or system. We drafted a document[1] which proposed some simple changes to solve these issues. The document is still not wholly completed yet. We will start a discussion once it is finished. [1] https://docs.google.com/document/d/1_cHoapNbx_fJ7ZNraSqw4ZK1hMRiWWJDITuSZrdMDDs/edit?usp=sharing -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22307) Increase the data writing cache size of sort-merge blocking shuffle
Yingjie Cao created FLINK-22307: --- Summary: Increase the data writing cache size of sort-merge blocking shuffle Key: FLINK-22307 URL: https://issues.apache.org/jira/browse/FLINK-22307 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.13.0 Currently, the data writing cache is 8M, which is not enough if data compression is enabled. By increasing the cache size to 16M, the performance of our benchmark job can be increased by about 20%. (We may make it configurable in the future) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22305) Increase the default value of taskmanager.network.sort-shuffle.min-buffers
Yingjie Cao created FLINK-22305: --- Summary: Increase the default value of taskmanager.network.sort-shuffle.min-buffers Key: FLINK-22305 URL: https://issues.apache.org/jira/browse/FLINK-22305 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.13.0 Currently, the default value of taskmanager.network.sort-shuffle.min-buffers is 64, which is pretty small. As suggested, we'd like to increase the default value of taskmanager.network.sort-shuffle.min-buffers. By increasing the default taskmanager.network.sort-shuffle.min-buffers, the corner case of very small in-memory sort-buffer and write-buffer can be avoid, which is better for performance. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22156) HiveDialectQueryITCase fails on Azure because of no output for 900 seconds
Yingjie Cao created FLINK-22156: --- Summary: HiveDialectQueryITCase fails on Azure because of no output for 900 seconds Key: FLINK-22156 URL: https://issues.apache.org/jira/browse/FLINK-22156 Project: Flink Issue Type: Bug Components: Table SQL / Runtime, Tests Reporter: Yingjie Cao Fix For: 1.13.0 [https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_apis/build/builds/16105/logs/139] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22153) Manually test the sort-merge blocking shuffle
Yingjie Cao created FLINK-22153: --- Summary: Manually test the sort-merge blocking shuffle Key: FLINK-22153 URL: https://issues.apache.org/jira/browse/FLINK-22153 Project: Flink Issue Type: Task Components: Runtime / Network Affects Versions: 1.13.0 Reporter: Yingjie Cao Fix For: 1.13.0 In 1.12, we introduced sort-merge blocking shuffle to Flink and in 1.13, the feature was optimized which improves the usability (fix direct memory OOM issue) and performance (introduce IO scheduling and broadcast optimization). The sort-merge blocking shuffle can be tested following the bellow process: # Write a simple batch job using either sql/table or DataStream API; (Word count should be enough) # Enable sort-merge blocking shuffle by setting taskmanager.network.sort-shuffle.min-parallelism to 1 in the Flink configuration file; # Submit and run the batch job with different parallelism and data volume; # Tune the relevant config options (taskmanager.network.blocking-shuffle.compression.enabled, taskmanager.network.sort-shuffle.min-buffers, taskmanager.memory.framework.off-heap.batch-shuffle.size) and see the influence. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22127) Enrich error message of read buffer request timeout exception
Yingjie Cao created FLINK-22127: --- Summary: Enrich error message of read buffer request timeout exception Key: FLINK-22127 URL: https://issues.apache.org/jira/browse/FLINK-22127 Project: Flink Issue Type: Sub-task Components: Runtime / Network Affects Versions: 1.13.0 Reporter: Yingjie Cao Fix For: 1.13.0 Enrich error message of read buffer request timeout exception to tell the user how to solve the timeout exception. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21951) Fix wrong if condition in BufferReaderWriterUtil#writeBuffers
Yingjie Cao created FLINK-21951: --- Summary: Fix wrong if condition in BufferReaderWriterUtil#writeBuffers Key: FLINK-21951 URL: https://issues.apache.org/jira/browse/FLINK-21951 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.13.0 The wrong if condition in BufferReaderWriterUtil#writeBuffers may lead to data loss when bulk writing a large amount of data into file. This is a bug intruded since 1.9, but only small amount of data is written, so the bug never occurs. In 1.13, the sort-merge shuffle uses it to write more data which triggers the bug. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21859) Batch job fails due to "Could not mark slot 61a637e3977c58a0e6b73533c419297d active"
Yingjie Cao created FLINK-21859: --- Summary: Batch job fails due to "Could not mark slot 61a637e3977c58a0e6b73533c419297d active" Key: FLINK-21859 URL: https://issues.apache.org/jira/browse/FLINK-21859 Project: Flink Issue Type: Bug Components: Runtime / Task Affects Versions: 1.13.0 Reporter: Yingjie Cao Here is the error stack: {code:java} 2021-03-18 19:05:31org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategyat org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:130) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:81) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:221) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:212) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:203) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:701) at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1449) at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1105) at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1045) at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:754) at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195) at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182) at org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271) at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:669) at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:1997) at org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271) at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152) at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:385) at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.lambda$releaseSlot$1(DefaultDeclarativeSlotPool.java:376) at java.util.Optional.ifPresent(Optional.java:159)at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlot(DefaultDeclarativeSlotPool.java:374) at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.failAllocation(DeclarativeSlotPoolService.java:198) at org.apache.flink.runtime.jobmaster.JobMaster.internalFailAllocation(JobMaster.java:650) at org.apache.flink.runtime.jobmaster.JobMaster.failSlot(JobMaster.java:636)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:301) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at akka.actor.Actor$class.aroundReceive(Actor.scala:517)at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)at akka.actor.ActorCell.invoke(ActorCell.scala:561)at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)at akka.dispatch.Mailbox.run(Mailbox.scala:225)at
[jira] [Created] (FLINK-21857) StackOverflow for large parallelism jobs when processing EndOfChannelStateEvent
Yingjie Cao created FLINK-21857: --- Summary: StackOverflow for large parallelism jobs when processing EndOfChannelStateEvent Key: FLINK-21857 URL: https://issues.apache.org/jira/browse/FLINK-21857 Project: Flink Issue Type: Bug Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.13.0 CheckpointedInputGate#handleEvent calls pollNext recursively when processing EndOfChannelStateEvent, for large parallelism job of large amount of input channels, the stack can become really deep thus causing StackOverflow. The following is the stack: {code:java} 12:11 java.lang.StackOverflowError at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.waitAndGetNextData(SingleInputGate.java:650) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:625) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNext(SingleInputGate.java:611) at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.pollNext(InputGateWithMetrics.java:109) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:149) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:202) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:202) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:202) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:202) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:202) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158) at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:202){code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21850) Improve document and config description of sort-merge blocking shuffle
Yingjie Cao created FLINK-21850: --- Summary: Improve document and config description of sort-merge blocking shuffle Key: FLINK-21850 URL: https://issues.apache.org/jira/browse/FLINK-21850 Project: Flink Issue Type: Sub-task Components: Documentation Reporter: Yingjie Cao Fix For: 1.13.0 After the improvement of FLINK-19614, some of the previous document description for sort-merge blocking shuffle is not accurate, we need to improve the corresponding document. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21790) Shuffle data directories to make data directory section of different TaskManagers fairer
Yingjie Cao created FLINK-21790: --- Summary: Shuffle data directories to make data directory section of different TaskManagers fairer Key: FLINK-21790 URL: https://issues.apache.org/jira/browse/FLINK-21790 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.13.0 Currently, different TaskManagers select data directory in the same order and if there are multiple disk, some disks may stores more data than others which is bad for performance. A simple improvement is that each TaskManager shuffles the given data directories randomly and select the data directory in different order. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21789) Make FileChannelManagerImpl#getNextPathNum select data directory fairly
Yingjie Cao created FLINK-21789: --- Summary: Make FileChannelManagerImpl#getNextPathNum select data directory fairly Key: FLINK-21789 URL: https://issues.apache.org/jira/browse/FLINK-21789 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.13.0 The get and increase next path index in FileChannelManagerImpl#getNextPathNum is not atomic which may cause unfairness of data directory selection (bad for performance if multiple disk is configured). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21788) Throw PartitionNotFoundException if the partition file has been lost for blocking shuffle
Yingjie Cao created FLINK-21788: --- Summary: Throw PartitionNotFoundException if the partition file has been lost for blocking shuffle Key: FLINK-21788 URL: https://issues.apache.org/jira/browse/FLINK-21788 Project: Flink Issue Type: Bug Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.13.0 Currently, if the partition file has been lost for blocking shuffle, FileNotFoundException will be thrown and the partition data is not regenerated, so failover can not recover the job. It should throw PartitionNotFoundException instead. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21778) Use heap memory instead of direct memory as index entry cache for sort-merge shuffle
Yingjie Cao created FLINK-21778: --- Summary: Use heap memory instead of direct memory as index entry cache for sort-merge shuffle Key: FLINK-21778 URL: https://issues.apache.org/jira/browse/FLINK-21778 Project: Flink Issue Type: Sub-task Components: Runtime / Network Affects Versions: 1.12.0 Reporter: Yingjie Cao Fix For: 1.13.0 Currently, the sort-merge shuffle implementation uses a piece of direct memory as index entry cache for acceleration. We can use heap memory instead to reduce the usage of direct memory which further reduces the possibility of OutOfMemoryError. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21777) Replace the 4M data writing cache of sort-merge shuffle with writev system call
Yingjie Cao created FLINK-21777: --- Summary: Replace the 4M data writing cache of sort-merge shuffle with writev system call Key: FLINK-21777 URL: https://issues.apache.org/jira/browse/FLINK-21777 Project: Flink Issue Type: Sub-task Components: Runtime / Network Affects Versions: 1.12.0 Reporter: Yingjie Cao Fix For: 1.13.0 Currently, the sort-merge shuffle implementation uses 4M unmanned direct memory as cache for data written. It can be replaced by the writev system call which can reduce the unmanned direct memory usage without any performance loss. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20758) Use region file mechanism for shuffle data reading before we switch to managed memory
Yingjie Cao created FLINK-20758: --- Summary: Use region file mechanism for shuffle data reading before we switch to managed memory Key: FLINK-20758 URL: https://issues.apache.org/jira/browse/FLINK-20758 Project: Flink Issue Type: Sub-task Components: Runtime / Network Affects Versions: 1.12.0 Reporter: Yingjie Cao Fix For: 1.12.2 FLINK-15981 implemented region file based data reader to solve the direct memory OOM issue introduced by usage of unmanaged direct memory, however only for BoundedBlockingResultPartition. We can introduce it to sort-merge based blocking shuffle to avoid the similar direct memory OOM problem which can improve the usability a lot. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20757) Optimize data broadcast for sort-merge shuffle
Yingjie Cao created FLINK-20757: --- Summary: Optimize data broadcast for sort-merge shuffle Key: FLINK-20757 URL: https://issues.apache.org/jira/browse/FLINK-20757 Project: Flink Issue Type: Sub-task Components: Runtime / Network Affects Versions: 1.12.0 Reporter: Yingjie Cao Fix For: 1.13.0 For data broadcast, we can only copy the record once when writing data into SortBuffer. Besides, we can write only one copy of data when spilling data into disk. These optimizations can improve the performance of data broadcast. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20740) Use managed memory to avoid direct memory OOM error
Yingjie Cao created FLINK-20740: --- Summary: Use managed memory to avoid direct memory OOM error Key: FLINK-20740 URL: https://issues.apache.org/jira/browse/FLINK-20740 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.12.0 Reporter: Yingjie Cao Fix For: 1.13.0 Currently, sort-merge blocking shuffle uses some unmanaged memory for data writing and reading, which means users must increase the size of direct memory, otherwise, one may encounter direct memory OOM error, which is really bad for usability. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20013) BoundedBlockingSubpartition may leak network buffer if task is failed or canceled
Yingjie Cao created FLINK-20013: --- Summary: BoundedBlockingSubpartition may leak network buffer if task is failed or canceled Key: FLINK-20013 URL: https://issues.apache.org/jira/browse/FLINK-20013 Project: Flink Issue Type: Bug Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.12.0 BoundedBlockingSubpartition may leak network buffer if task is failed or canceled. We need to recycle the current BufferConsumer when task is failed or canceled. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-20010) SinkITCase.writerAndCommitterAndGlobalCommitterExecuteInStreamingMode fails on Azure Pipeline
Yingjie Cao created FLINK-20010: --- Summary: SinkITCase.writerAndCommitterAndGlobalCommitterExecuteInStreamingMode fails on Azure Pipeline Key: FLINK-20010 URL: https://issues.apache.org/jira/browse/FLINK-20010 Project: Flink Issue Type: Bug Components: Tests Reporter: Yingjie Cao Fix For: 1.12.0 SinkITCase.writerAndCommitterAndGlobalCommitterExecuteInStreamingMode fails on Azure Pipeline {code:java} at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365) at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159) at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345) at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19991) UnalignedCheckpointITCase#shouldPerformUnalignedCheckpointMassivelyParallel fails on Azure Pipeline
Yingjie Cao created FLINK-19991: --- Summary: UnalignedCheckpointITCase#shouldPerformUnalignedCheckpointMassivelyParallel fails on Azure Pipeline Key: FLINK-19991 URL: https://issues.apache.org/jira/browse/FLINK-19991 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing, Runtime / Network Reporter: Yingjie Cao Fix For: 1.12.0 UnalignedCheckpointITCase#shouldPerformUnalignedCheckpointMassivelyParallel fails on Azure Pipeline {code:java} java.lang.AssertionError: Expected: <0L> but: was <1809L> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.junit.Assert.assertThat(Assert.java:956) at org.junit.rules.ErrorCollector$1.call(ErrorCollector.java:65) at org.junit.rules.ErrorCollector.checkSucceeds(ErrorCollector.java:78) at org.junit.rules.ErrorCollector.checkThat(ErrorCollector.java:63) at org.junit.rules.ErrorCollector.checkThat(ErrorCollector.java:54) at org.apache.flink.test.checkpointing.UnalignedCheckpointITCase.execute(UnalignedCheckpointITCase.java:189) at org.apache.flink.test.checkpointing.UnalignedCheckpointITCase.shouldPerformUnalignedCheckpointMassivelyParallel(UnalignedCheckpointITCase.java:179) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.rules.Verifier$1.evaluate(Verifier.java:35) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.lang.Thread.run(Thread.java:748) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19938) Implement shuffle data read scheduling for sort-merge blocking shuffle
Yingjie Cao created FLINK-19938: --- Summary: Implement shuffle data read scheduling for sort-merge blocking shuffle Key: FLINK-19938 URL: https://issues.apache.org/jira/browse/FLINK-19938 Project: Flink Issue Type: Sub-task Components: Runtime / Network Affects Versions: 1.12.0 Reporter: Yingjie Cao Fix For: 1.12.0 As described in [https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink.] shuffle IO scheduling is important for performance. We'd like to Introduce it to sort-merge shuffle first. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19614) Further optimization of sort-merge based blocking shuffle
Yingjie Cao created FLINK-19614: --- Summary: Further optimization of sort-merge based blocking shuffle Key: FLINK-19614 URL: https://issues.apache.org/jira/browse/FLINK-19614 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: Yingjie Cao FLINK-19582 introduces a basic sort-merge based blocking shuffle implementation. We can further optimize it based on the approaches proposed in [https://docs.google.com/document/d/1mpekX6aAHJhBsQ0pS9MxDiFQjHQIuaJH0GAQHh0GlJ0/edit?usp=sharing|https://docs.google.com/document/d/1mpekX6aAHJhBsQ0pS9MxDiFQjHQIuaJH0GAQHh0GlJ0/edit?usp=sharing,]. This is the umbrella ticket for the optimizations. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19603) Introduce shuffle data compression to sort-merge based blocking shuffle
Yingjie Cao created FLINK-19603: --- Summary: Introduce shuffle data compression to sort-merge based blocking shuffle Key: FLINK-19603 URL: https://issues.apache.org/jira/browse/FLINK-19603 Project: Flink Issue Type: Sub-task Components: Runtime / Network Affects Versions: 1.12.0 Reporter: Yingjie Cao Fix For: 1.12.0 Shuffle data compression can reduce the storage overhead and improve the disk/network IO performance. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19602) Introduce new config options to enable sort-merge based blocking shuffle
Yingjie Cao created FLINK-19602: --- Summary: Introduce new config options to enable sort-merge based blocking shuffle Key: FLINK-19602 URL: https://issues.apache.org/jira/browse/FLINK-19602 Project: Flink Issue Type: Sub-task Components: Runtime / Network Affects Versions: 1.12.0 Reporter: Yingjie Cao Fix For: 1.12.0 1. taskmanager.network.sort-merge-blocking-shuffle.enabled: Boolean flag to enable/disable sort-merge based blocking shuffle. 2. taskmanager.network.sort-merge-blocking-shuffle.buffers-per-partition: Number of network buffers required for each sort-merge blocking result partition. 3. taskmanager.network.sort-merge-blocking-shuffle.min-parallelism: Minimum parallelism to enable the sort-merge based blocking shuffle. With these new config options, the default behavior of blocking shuffle stays unchanged. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19601) Introduce sort-merge based blocking result partition SortMergeResultPartition and the corresponding subpartition reader
Yingjie Cao created FLINK-19601: --- Summary: Introduce sort-merge based blocking result partition SortMergeResultPartition and the corresponding subpartition reader Key: FLINK-19601 URL: https://issues.apache.org/jira/browse/FLINK-19601 Project: Flink Issue Type: Sub-task Components: Runtime / Network Affects Versions: 1.12.0 Reporter: Yingjie Cao Fix For: 1.12.0 SortMergeResultPartition appends all added records and events to a SortBuffer and after the SortBuffer is full, all data in the SortBuffer will be copied and spilled to a PartitionedFile in subpartition index order. Different from the hash-based blocking shuffle implementation, SortMergeResultPartition can write at most one file concurrently. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19600) Introduce PartitionedFile and the corresponding writer/reader for sort-merge based blocking shuffle
Yingjie Cao created FLINK-19600: --- Summary: Introduce PartitionedFile and the corresponding writer/reader for sort-merge based blocking shuffle Key: FLINK-19600 URL: https://issues.apache.org/jira/browse/FLINK-19600 Project: Flink Issue Type: Sub-task Components: Runtime / Network Affects Versions: 1.12.0 Reporter: Yingjie Cao Fix For: 1.12.0 PartitionedFile is the persistent file type of sort-merge based blocking shuffle. It stores data of all subpartitions in subpartition index order. PartitionedFile can be produced by PartitionedFileWriter and consumed by PartitionedFileReader. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19597) Introduce SortBuffer and its implementation PartitionSortedBuffer for sort-merge based blocking shuffle
Yingjie Cao created FLINK-19597: --- Summary: Introduce SortBuffer and its implementation PartitionSortedBuffer for sort-merge based blocking shuffle Key: FLINK-19597 URL: https://issues.apache.org/jira/browse/FLINK-19597 Project: Flink Issue Type: Sub-task Components: Runtime / Network Affects Versions: 1.12.0 Reporter: Yingjie Cao Fix For: 1.12.0 Data of different channels can be appended to a SortBuffer and after the SortBuffer is finished, the appended data can be copied from it in channel index order. PartitionSortedBuffer is an implementation of SortBuffer and it sorts all appended records only by subpartition index. Records of the same subpartition keep the appended order. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19582) Introduce sort-merge based blocking shuffle to Flink
Yingjie Cao created FLINK-19582: --- Summary: Introduce sort-merge based blocking shuffle to Flink Key: FLINK-19582 URL: https://issues.apache.org/jira/browse/FLINK-19582 Project: Flink Issue Type: Improvement Components: Runtime / Network Affects Versions: 1.12.0 Reporter: Yingjie Cao Fix For: 1.12.0 *Motivation* Hash-based blocking shuffle and sort-merge based blocking shuffle are two main blocking shuffle implementations wildly adopted by existing distributed data processing frameworks. Hash-based implementation writes data sent to different reducer tasks into separate files concurrently while sort-merge based approach writes those data together into a single file and merges those small files into bigger ones. Compared to sort-merge based approach, hash-based approach has several weak points when it comes to running large scale batch jobs: *1. Stability* For high parallelism (tens of thousands) batch job, current hash-based blocking shuffle implementation writes too many files concurrently which gives high pressure to the file system, for example, maintenance of too many file metas, high system cpu consumption and exhaustion of inodes or file descriptors. All of these can be potential stability issues which we encountered in our production environment before we switch to sort-merge based blocking shuffle. Sort-Merge based blocking shuffle don’t have the problem because for one result partition, only one file is written at the same time. *2. Performance* Large amounts of small shuffle files and random io can influence shuffle performance a lot especially for hdd (for ssd, sequential read is also important because of read ahead and cache). For batch job processing massive data, small amount of data per subpartition is common, because to reduce the job completion time, we usually increase the job parallelism to reduce the amount of data processed per task and the average data amount per subpartition is relevant to: (the amount of data per task) / (parallelism) = (total amount of data) / (parallelism^2) which means increasing parallelism can decrease the amount of data per subpartition rapidly. Besides, data skew is another cause of small subpartition files. By merging data of all subpartitions together in one file, more sequential read can be achieved. *3. Resource* For current hash-based implementation, each subpartition needs at least one buffer. For large scale batch shuffles, the memory consumption can be huge. For example, we need at least 320M network memory per result partition if parallelism is set to 1 and because of the huge network consumption, it is hard to config the network memory for large scale batch job and sometimes parallelism can not be increased just because of insufficient network memory which leads to bad user experience. By introducing the sort-merge based approach to Flink, we can improve Flink’s capability of running large scale batch jobs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19344) DispatcherResourceCleanupTest#testJobSubmissionUnderSameJobId is unstable on Azure Pipeline
Yingjie Cao created FLINK-19344: --- Summary: DispatcherResourceCleanupTest#testJobSubmissionUnderSameJobId is unstable on Azure Pipeline Key: FLINK-19344 URL: https://issues.apache.org/jira/browse/FLINK-19344 Project: Flink Issue Type: Bug Components: Tests Reporter: Yingjie Cao Here is the log and stack: [https://dev.azure.com/kevin-flink/flink/_build/results?buildId=88=logs=6e58d712-c5cc-52fb-0895-6ff7bd56c46b=f30a8e80-b2cf-535c-9952-7f521a4ae374|https://dev.azure.com/kevin-flink/flink/_build/results?buildId=88=logs=6e58d712-c5cc-52fb-0895-6ff7bd56c46b=f30a8e80-b2cf-535c-9952-7f521a4ae374.] {code:java} at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19323) Small optimization of network layer record serialization
Yingjie Cao created FLINK-19323: --- Summary: Small optimization of network layer record serialization Key: FLINK-19323 URL: https://issues.apache.org/jira/browse/FLINK-19323 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.12.0 Currently, when serializing a record, the SpanningRecordSerializer will first skip 4 bytes for length and serialize the record. Then it gets the serialized record length and skip back to position 0 and write the length field. After that, it skip again to the tail of the serialized data. In fact, the last two skip can be avoid by writing length field to position 0 directly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19320) Remove clearBuffers from the public interfaces of RecordWriter
Yingjie Cao created FLINK-19320: --- Summary: Remove clearBuffers from the public interfaces of RecordWriter Key: FLINK-19320 URL: https://issues.apache.org/jira/browse/FLINK-19320 Project: Flink Issue Type: Sub-task Components: Runtime / Task Reporter: Yingjie Cao Fix For: 1.12.0 Currently, RecordWriter#clearBuffers is only used to finish the current BufferBuilder and it does not clear buffers any more. Previously, it was used to recycle the partially filled buffer in the serializer but currently the serializer does not contains any network buffer any more. For now, only some tests and DataSet classes use it and all of these usage should be replaced by RecordWriter#close which dose the same thing. Besides, for FLINK-19297, we do not want to keep this method in the new ResultPartitionWriter Interface. So in this ticket, we propose to remove clearBuffers from the public interfaces of RecordWriter. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19312) Introduce BufferWritingResultPartition which wraps the logic of writing buffers to ResultSubpartition
Yingjie Cao created FLINK-19312: --- Summary: Introduce BufferWritingResultPartition which wraps the logic of writing buffers to ResultSubpartition Key: FLINK-19312 URL: https://issues.apache.org/jira/browse/FLINK-19312 Project: Flink Issue Type: Bug Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.12.0 In the current abstraction, buffers are written to and read from ResultSubpartition which is a hash-style data writing and reading implementation. In the future, sort-merge based ResultPartitionWriter will be implemented which can not share the current hash-style ResultSubpartition related logic. This ticket tries to introduce the BufferWritingResultPartition which wraps the logic related to ResultSubpartition, after which the ResultPartition is free of ResultSubpartition and can be reused by the future sort-merge based ResultPartitionWriter implementation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19302) Flushing of BoundedBlockingResultPartition should finish current BufferBuilder
Yingjie Cao created FLINK-19302: --- Summary: Flushing of BoundedBlockingResultPartition should finish current BufferBuilder Key: FLINK-19302 URL: https://issues.apache.org/jira/browse/FLINK-19302 Project: Flink Issue Type: Bug Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.12.0 Currently, flushing of BoundedBlockingResultPartition flushes and closes the current BufferConsumer but dose not finish the corresponding BufferBuilder. As a result, the records coming latter can be appended to already recycled buffer. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19299) NettyShuffleEnvironmentBuilder#setBufferSize does not take effect
Yingjie Cao created FLINK-19299: --- Summary: NettyShuffleEnvironmentBuilder#setBufferSize does not take effect Key: FLINK-19299 URL: https://issues.apache.org/jira/browse/FLINK-19299 Project: Flink Issue Type: Bug Components: Tests Reporter: Yingjie Cao Fix For: 1.12.0 Currently, NettyShuffleEnvironmentBuilder#setBufferSize does not take effect because the set value is never used when building the NettyShuffleEnvironment. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-19297) Make ResultPartitionWriter record-oriented
Yingjie Cao created FLINK-19297: --- Summary: Make ResultPartitionWriter record-oriented Key: FLINK-19297 URL: https://issues.apache.org/jira/browse/FLINK-19297 Project: Flink Issue Type: Improvement Components: Runtime / Network Affects Versions: 1.12.0 Reporter: Yingjie Cao Fix For: 1.12.0 Currently, ResultPartitionWriter is buffer-oriented, that is, RecordWriter will add buffer of different channels to ResultPartitionWriter and the buffer boundary serves as a nature boundary of data belonging to different channels. However, this abstraction is not flexible enough to handle some cases where records are appended a joint-structure shared by all channels and sorting is used to cluster data belonging to different channels. In this ticket, we propose to make ResultPartitionWriter record oriented which offers more flexibility to the implementation of ResultPartitionWriter. And based on the new record-oriented Interface, we will introduce the sort-merge based blocking shuffle to Flink in the future. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18762) Make network buffers per incoming/outgoing channel can be configured separately
Yingjie Cao created FLINK-18762: --- Summary: Make network buffers per incoming/outgoing channel can be configured separately Key: FLINK-18762 URL: https://issues.apache.org/jira/browse/FLINK-18762 Project: Flink Issue Type: Sub-task Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.12.0 In FLINK-16012, we want to decrease the default number of exclusive buffers at receiver side from 2 to 1 to accelerate checkpoint in cases of backpressure. However, number of buffers per outgoing and incoming channels are configured by a single configuration key. It is better to make network buffers per incoming/outgoing channel can be configured separately which is more flexible. At the same time, we can keep the default behavior compatible. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18728) Make initialCredit of RemoteInputChannel final
Yingjie Cao created FLINK-18728: --- Summary: Make initialCredit of RemoteInputChannel final Key: FLINK-18728 URL: https://issues.apache.org/jira/browse/FLINK-18728 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.12.0 The filed initialCredit of RemoteInputChannel is set only once and can be accessed by multi threads. We can make the filed final and moves the initialization to the constructor of RemoteInputChannel to avoid potential thread safety issues. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18727) Remove the previous finished empty Buffer in PipelinedSubpartition when adding a new Buffer
Yingjie Cao created FLINK-18727: --- Summary: Remove the previous finished empty Buffer in PipelinedSubpartition when adding a new Buffer Key: FLINK-18727 URL: https://issues.apache.org/jira/browse/FLINK-18727 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: Yingjie Cao Fix For: 1.12.0 For current implementation of PipelinedSubpartition, empty Buffer consumes credit, which means we need at lease one credit to handle the finished empty Buffer without any data. We can remove and recycle the finished empty Buffer in the queue when adding a new Buffer, after which, the reader does not need any available credit to handle the finished empty buffer. For example, if the new buffer is an event and the previous buffer is finished but without any data, after it is removed, the reader is able to process the new added event without any available credit. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18453) Stack overflow of AggregateITCase#testAggregationCodeSplit on Azure
Yingjie Cao created FLINK-18453: --- Summary: Stack overflow of AggregateITCase#testAggregationCodeSplit on Azure Key: FLINK-18453 URL: https://issues.apache.org/jira/browse/FLINK-18453 Project: Flink Issue Type: Bug Reporter: Yingjie Cao Here is some log: ``` [ERROR] testAggregationCodeSplit[LocalGlobal=OFF, MiniBatch=OFF, StateBackend=HEAP](org.apache.flink.table.planner.runtime.stream.sql.AggregateITCase) Time elapsed: 8.167 s <<< ERROR! Caused by: java.lang.StackOverflowError at org.codehaus.janino.CodeContext.extract16BitValue(CodeContext.java:720) at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:561) at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:557) ``` The whole log: [https://dev.azure.com/kevin-flink/flink/_build/results?buildId=30=logs=a1590513-d0ea-59c3-3c7b-aad756c48f25=5129dea2-618b-5c74-1b8f-9ec63a37a8a6] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-18408) BinaryRowData should use de-/serialization method of MemorySegment with explicit endianness
Yingjie Cao created FLINK-18408: --- Summary: BinaryRowData should use de-/serialization method of MemorySegment with explicit endianness Key: FLINK-18408 URL: https://issues.apache.org/jira/browse/FLINK-18408 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Reporter: Yingjie Cao I notice that classes like BinaryRowData use de-/serialization method of MemorySegment without explicit endianness, I think this is not safe if we serialize some data on one machine and deserialize on another machine with different endianness. To be one the safe side, I think we should use de-/serialization method of MemorySegment with explicit endianness. What do you think? (BinaryRowWriter may have the same issue and there may be some other classes also suffer from this but I did not notice) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17665) Serialize buffer data type of Buffer into BufferResponse
Yingjie Cao created FLINK-17665: --- Summary: Serialize buffer data type of Buffer into BufferResponse Key: FLINK-17665 URL: https://issues.apache.org/jira/browse/FLINK-17665 Project: Flink Issue Type: Improvement Components: Runtime / Network Affects Versions: 1.11.0 Reporter: Yingjie Cao Fix For: 1.11.0 After serializing DataType of Buffer into BufferResponse we can get the DataType without deserializing the Buffer at downstream which is more performance friendly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17572) Remove checkpoint alignment buffered metric from webui
Yingjie Cao created FLINK-17572: --- Summary: Remove checkpoint alignment buffered metric from webui Key: FLINK-17572 URL: https://issues.apache.org/jira/browse/FLINK-17572 Project: Flink Issue Type: Sub-task Components: Runtime / Web Frontend Affects Versions: 1.11.0 Reporter: Yingjie Cao After FLINK-16404, we never cache buffers while checkpoint barrier alignment, so the checkpoint alignment buffered metric will be always 0, we should remove it directly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17568) Task may consume data after checkpoint barrier before performing checkpoint for unaligned checkpoint
Yingjie Cao created FLINK-17568: --- Summary: Task may consume data after checkpoint barrier before performing checkpoint for unaligned checkpoint Key: FLINK-17568 URL: https://issues.apache.org/jira/browse/FLINK-17568 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.11.0 Reporter: Yingjie Cao Fix For: 1.11.0 For unaligned checkpoint, task may consume data after the checkpoint barrier before performing checkpoint which lead to consumption of duplicated data and corruption of data stream. More specifically, when the Netty thread notifies the checkpoint barrier for the first time and enqueue a checkpointing task in the mailbox, the task thread may still in data consumption loop and if it reads a new checkpoint barrier from another channel it will not return to the mailbox and instead it will continue to read data until a all data consumed or we have a full record, meanwhile, the data after checkpoint barrier may be read and consumed which lead to inconsistency. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17564) Inflight data of incoming channel may be disordered for unaligned checkpoint
Yingjie Cao created FLINK-17564: --- Summary: Inflight data of incoming channel may be disordered for unaligned checkpoint Key: FLINK-17564 URL: https://issues.apache.org/jira/browse/FLINK-17564 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.11.0 Reporter: Yingjie Cao Fix For: 1.11.0 For unaligned checkpoint, when checkpointing the inflight data of incoming channel, both task thread and Netty thread may add data to the channel state writer. More specifically, the task thread will first request inflight buffers from the input channel and add the buffers to the channel state writer, and then the Netty thread will add the following up buffers (if any) to the channel state writer. The buffer adding of task thread and Netty thread is not synchronized so the Netty thread may add buffers before the task thread which leads to disorder of the data and corruption of the data stream. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17299) Add remote channel throughput benchmark
Yingjie Cao created FLINK-17299: --- Summary: Add remote channel throughput benchmark Key: FLINK-17299 URL: https://issues.apache.org/jira/browse/FLINK-17299 Project: Flink Issue Type: Sub-task Components: Benchmarks Affects Versions: 1.11.0 Reporter: Yingjie Cao Fix For: 1.11.0 To guarantee that the implementation of FLINK-16404 does not introduce any regressions for fast checkpoint scenario (1s checkpoint Interval and checkpoint complete in 1s), we introduce this benchmark. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17208) Reduce redundant data available notification of PipelinedSubpartition
Yingjie Cao created FLINK-17208: --- Summary: Reduce redundant data available notification of PipelinedSubpartition Key: FLINK-17208 URL: https://issues.apache.org/jira/browse/FLINK-17208 Project: Flink Issue Type: Improvement Components: Runtime / Network Affects Versions: 1.11.0 Reporter: Yingjie Cao Fix For: 1.11.0 There are three scenarios that may lead to redundant data available notifications for PipelinedSubpartition: # Add a new BufferConsumer after an event; # Finish the current subpartition; # Flush an event buffer. We can avoid these redundant data available notifications. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-17107) CheckpointCoordinatorConfiguration#isExactlyOnce() is inconsistent with StreamConfig#getCheckpointMode()
Yingjie Cao created FLINK-17107: --- Summary: CheckpointCoordinatorConfiguration#isExactlyOnce() is inconsistent with StreamConfig#getCheckpointMode() Key: FLINK-17107 URL: https://issues.apache.org/jira/browse/FLINK-17107 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Reporter: Yingjie Cao CheckpointCoordinatorConfiguration#isExactlyOnce() is inconsistent with StreamConfig#getCheckpointMode() when checkpoint is disabled. CheckpointCoordinatorConfiguration#isExactlyOnce() returns true if checkpoint mode is EXACTLY_ONCE mode and return false if checkpoint mode is AT_LEAST_ONCE while StreamConfig#getCheckpointMode() will always return AT_LEAST_ONCE which means always not exactly once. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-16298) GroupWindowTableAggregateITCase.testEventTimeTumblingWindow fails on Travis
Yingjie Cao created FLINK-16298: --- Summary: GroupWindowTableAggregateITCase.testEventTimeTumblingWindow fails on Travis Key: FLINK-16298 URL: https://issues.apache.org/jira/browse/FLINK-16298 Project: Flink Issue Type: Test Components: Tests Reporter: Yingjie Cao GroupWindowTableAggregateITCase.testEventTimeTumblingWindow fails on Travis. link: [https://api.travis-ci.com/v3/job/291610383/log.txt] stack: {code:java} 05:38:01.976 [ERROR] Tests run: 18, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 7.537 s <<< FAILURE! - in org.apache.flink.table.planner.runtime.stream.table.GroupWindowTableAggregateITCase 05:38:01.976 [ERROR] testEventTimeTumblingWindow[StateBackend=HEAP](org.apache.flink.table.planner.runtime.stream.table.GroupWindowTableAggregateITCase) Time elapsed: 0.459 s <<< ERROR! org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.table.planner.runtime.stream.table.GroupWindowTableAggregateITCase.testEventTimeTumblingWindow(GroupWindowTableAggregateITCase.scala:151) Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=1, backoffTimeMS=0) Caused by: java.lang.Exception: Artificial Failure {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15455) Enable TCP connection reuse across multiple jobs.
Yingjie Cao created FLINK-15455: --- Summary: Enable TCP connection reuse across multiple jobs. Key: FLINK-15455 URL: https://issues.apache.org/jira/browse/FLINK-15455 Project: Flink Issue Type: Improvement Components: Runtime / Network Reporter: Yingjie Cao Currently, tcp connections can be only reuse by tasks residing in the same TaskManager and consumes the same IntermediateResult. And after job finish or failover, the TCP connections are closed and new connections must be setup latter. As an improvement, we can make tcp connections a cluster level resource which can be reused by multi jobs. The advantages are as follows: # Reduce the number of TCP connections so we can save some resources. # Reduce the overhead of connection setup and close so restarted jobs after failover and latter jobs submitted to the same session cluster can reuse the previous connections. We use Flink session cluster as a service for ad-hoc queries and the users can produce some statistics or create some statements and reports at any time. Most of the queries finish in 2s and we find tcp connection reuse help a lot to reduce the average execution time which means more queries can be processed using the same resource and time with even better user experience. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15166) Shuffle data compression wrongly decrease the buffer reference count.
Yingjie Cao created FLINK-15166: --- Summary: Shuffle data compression wrongly decrease the buffer reference count. Key: FLINK-15166 URL: https://issues.apache.org/jira/browse/FLINK-15166 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.10.0 Reporter: Yingjie Cao Fix For: 1.10.0 FLINK-15140 report two relevant problems which are both triggered by broadcast partitioner, to make it more clear, I create this Jira to addresses the problems separately. For blocking shuffle compression, we recycle the compressed intermediate buffer each time after we write data out, however when the data is not compressed, the return buffer is the original buffer and should not be recycled, but we wrongly recycled it. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15140) Shuffle data compression does not work with BroadcastRecordWriter.
Yingjie Cao created FLINK-15140: --- Summary: Shuffle data compression does not work with BroadcastRecordWriter. Key: FLINK-15140 URL: https://issues.apache.org/jira/browse/FLINK-15140 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.10.0 Reporter: Yingjie Cao Fix For: 1.10.0 I tested the newest code of master branch last weekend with more test cases. Unfortunately, several problems were encountered, including a bug of compression. When BroadcastRecordWriter is used, for pipelined mode, because the compressor copies the data back to the input buffer, however, the underlying buffer is shared when BroadcastRecordWriter is used. So we can not copy the compressed buffer back to the input buffer if the underlying buffer is shared. For blocking mode, we wrongly recycle the buffer when buffer is not compressed, and the problem is also triggered when BroadcastRecordWriter is used. To fix the problem, for blocking shuffle, the reference counter should be maintained correctly, for pipelined shuffle, the simplest way maybe disable compression is the underlying buffer is shared. I will open a PR to fix the problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-15030) Potential deadlock for bounded blocking ResultPartition.
Yingjie Cao created FLINK-15030: --- Summary: Potential deadlock for bounded blocking ResultPartition. Key: FLINK-15030 URL: https://issues.apache.org/jira/browse/FLINK-15030 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.9.1, 1.9.0 Reporter: Yingjie Cao Currently, the BoundedBlockingSubpartition relies on the add of the next BufferConsumer to flush and recycle the previous one, which means we need at least (numsubpartition + 1) buffers to make the bounded blocking ResultPartition work. However, the ResultPartitionFactory gives only (numsubpartition) required buffers to the BoundedBlockingSubpartition which may lead to deadlock. This problem exists only in version 1.9. In version 1.10 (master), this problem has been fixed by this commit: 2c8b4ef572f05bf4740b7e204af1e5e709cd945c. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-14872) Potential deadlock for task reading from blocking ResultPartition.
Yingjie Cao created FLINK-14872: --- Summary: Potential deadlock for task reading from blocking ResultPartition. Key: FLINK-14872 URL: https://issues.apache.org/jira/browse/FLINK-14872 Project: Flink Issue Type: Bug Reporter: Yingjie Cao Currently, the buffer pool size of InputGate reading from blocking ResultPartition is unbounded which have a potential of using too many buffers and may lead to ResultPartition of the same task can not acquire enough core buffers and finally lead to deadlock. Considered the following case: Core buffers are reserved for InputGate and ResultPartition -> InputGate consumes lots of Buffer (not including the buffer reserved for ResultPartition) -> Other tasks acquire exclusive buffer for InputGate and trigger redistribute of Buffers (Buffers taken by previous InputGate can not be released) -> The first task of which InputGate uses lots of buffers begin to emit records but can not acquire enough core Buffers (Some operators may not emit records out immediately or there is just nothing to emit) -> Deadlock. I think we can fix this problem by limit the number of Buffers can be allocated by a InputGate which reads from blocking ResultPartition. -- This message was sent by Atlassian Jira (v8.3.4#803005)