[jira] [Created] (FLINK-31386) Fix the potential deadlock issue of blocking shuffle

2023-03-09 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-31386:
---

 Summary: Fix the potential deadlock issue of blocking shuffle
 Key: FLINK-31386
 URL: https://issues.apache.org/jira/browse/FLINK-31386
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.17.0


Currently, the SortMergeResultPartition may allocate more network buffers than 
the guaranteed size of the LocalBufferPool. As a result, some result partitions 
may need to wait other result partitions to release the over-allocated network 
buffers to continue. However, the result partitions which have allocated more 
than guaranteed buffers relies on the processing of input data to trigger data 
spilling and buffer recycling. The input data further relies on batch reading 
buffers used by the SortMergeResultPartitionReadScheduler which may already 
taken by those blocked result partitions which are waiting for buffers. Then 
deadlock occurs. We can easily fix this deadlock by reserving the guaranteed 
buffers on initializing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29351) Enable input buffer floating for blocking shuffle

2022-09-20 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-29351:
---

 Summary: Enable input buffer floating for blocking shuffle
 Key: FLINK-29351
 URL: https://issues.apache.org/jira/browse/FLINK-29351
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.17.0


At input gate, Flink needs exclusive buffers for each input channel. For large 
parallelism jobs, it is easy to cause "Insufficient number of network buffers" 
error. This ticket aims to make all input network buffers floating for blocking 
shuffle to reduce the possibility of "Insufficient number of network buffers" 
error. This change can also improve the default blocking shuffle performance 
because buffer floating can increase the buffer utilization.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-29299) Fix the network memory size calculation issue in fine-grained resource mode

2022-09-13 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-29299:
---

 Summary: Fix the network memory size calculation issue in 
fine-grained resource mode
 Key: FLINK-29299
 URL: https://issues.apache.org/jira/browse/FLINK-29299
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Affects Versions: 1.16.0
Reporter: Yingjie Cao
 Fix For: 1.16.0


After FLINK-28663, one intermediate dataset can be consumed by multiple 
consumers, there is a case where one vertex can consume one intermediate 
dataset multiple times. However, currently in fine-grained resource mode, when 
computing the required network buffer size, the intermediate dataset is used as 
key to record the size of network buffer per input gate, which means it may 
allocate less network buffers than needed if two input gate of the same vertex 
consumes the same intermediate dataset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28663) Allow multiple downstream consumer job vertices sharing the same intermediate dataset at scheduler side

2022-07-25 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-28663:
---

 Summary: Allow multiple downstream consumer job vertices sharing 
the same intermediate dataset at scheduler side
 Key: FLINK-28663
 URL: https://issues.apache.org/jira/browse/FLINK-28663
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Yingjie Cao


Currently, one intermediate dataset can only be consumed by one downstream 
consumer vertex. If there are multiple consumer vertices consuming the same 
output of the same upstream vertex, multiple intermediate datasets will be 
produced. We can optimize this behavior to produce only one intermediate 
dataset which can be shared by multiple consumer vertices. As the first step, 
we should allow multiple downstream consumer job vertices sharing the same 
intermediate dataset at scheduler side. (Note that this optimization only works 
for blocking shuffle because pipelined shuffle result partition can not be 
consumed multiple times)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28561) Merge subpartition shuffle data read request for better sequential IO

2022-07-15 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-28561:
---

 Summary: Merge subpartition shuffle data read request for better 
sequential IO
 Key: FLINK-28561
 URL: https://issues.apache.org/jira/browse/FLINK-28561
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.17.0


Currently, the shuffle data of each subpartition for blocking shuffle is read 
separately. To achieve better performance and reduce IOPS, we can merge 
consecutive data requests of the same field together and serves them in one IO 
request. More specifically,

1) if multiple data requests are reading the same data, for example, reading 
broadcast data, the reader will read the data only once and send the same piece 
of data to multiple downstream consumers.

2) if multiple data requests are reading the consecutive data in one file, we 
will merge those data requests together as one large request and read a larger 
size of data sequentially which is good for file IO performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28556) Extract header fields of Buffer into a BufferHeader class for blocking shuffle file IO

2022-07-14 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-28556:
---

 Summary: Extract header fields of Buffer into a BufferHeader class 
for blocking shuffle file IO
 Key: FLINK-28556
 URL: https://issues.apache.org/jira/browse/FLINK-28556
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.16.0


This is a small code refactor which also can be reused by following PRs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28551) Store the number of bytes instead of the number of buffers in index entry for sort-shuffle

2022-07-14 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-28551:
---

 Summary: Store the number of bytes instead of the number of 
buffers in index entry for sort-shuffle
 Key: FLINK-28551
 URL: https://issues.apache.org/jira/browse/FLINK-28551
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.16.0


Currently, in each index entry of sort-shuffle index file, one filed is the 
number of buffers in the current data region. The problem is that it is hard to 
know the data boundary before reading the file, to solve the problem, we can 
store the number of bytes instead of the number of buffers in index entry. 
Based on this change, we can do some optimization, for example, read larger 
size of data than a buffer for better sequential IO like what's mentioned in 
FLINK-28373.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28550) Remove the unused field in SortMergeSubpartitionReader

2022-07-14 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-28550:
---

 Summary: Remove the unused field in SortMergeSubpartitionReader
 Key: FLINK-28550
 URL: https://issues.apache.org/jira/browse/FLINK-28550
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.16.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28519) Fix the bug that SortMergeResultPartitionReadScheduler may not read data sequentially

2022-07-12 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-28519:
---

 Summary: Fix the bug that SortMergeResultPartitionReadScheduler 
may not read data sequentially
 Key: FLINK-28519
 URL: https://issues.apache.org/jira/browse/FLINK-28519
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.16.0


Currently, the SortMergeResultPartitionReadScheduler always gets all active 
subpartition readers and read at most one data region for them. It is common 
that some subpartitions are requested before others and their region indexes 
are ahead of others. If all region data of a subpartition can be read in one 
round, some subpartition readers will always ahead of others which will cause 
random IO. This patch fixes this case by polling one subpartition reader at a 
time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28514) Remove data flush in SortMergeResultPartition

2022-07-12 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-28514:
---

 Summary: Remove data flush in SortMergeResultPartition
 Key: FLINK-28514
 URL: https://issues.apache.org/jira/browse/FLINK-28514
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.16.0


This patch aims to remove the data flush in SortMergeResultPartition because it 
is useless.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28512) Select HashBasedDataBuffer and SortBasedDataBuffer dynamically based on the number of network buffers can be allocated for

2022-07-12 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-28512:
---

 Summary: Select HashBasedDataBuffer and SortBasedDataBuffer 
dynamically based on the number of network buffers can be allocated for
 Key: FLINK-28512
 URL: https://issues.apache.org/jira/browse/FLINK-28512
 Project: Flink
  Issue Type: Sub-task
Reporter: Yingjie Cao






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28382) Introduce new compression algorithms of higher compression ratio

2022-07-04 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-28382:
---

 Summary: Introduce new compression algorithms of higher 
compression ratio
 Key: FLINK-28382
 URL: https://issues.apache.org/jira/browse/FLINK-28382
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.16.0


Currently, we use lz4 for shuffle data compression which is a good balance 
between IO optimization and CPU consumption. But for some scenarios, the IO 
becomes bottleneck and the storage space is limited (especially for k8s 
environment). For these cases, we need compression algorithms of higher 
compression ratio to further reduce IO.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28380) Produce one intermediate dataset for multiple consumers consuming the same data

2022-07-04 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-28380:
---

 Summary: Produce one intermediate dataset for multiple consumers 
consuming the same data
 Key: FLINK-28380
 URL: https://issues.apache.org/jira/browse/FLINK-28380
 Project: Flink
  Issue Type: Sub-task
  Components: Client / Job Submission, Runtime / Coordination, Runtime 
/ Network
Reporter: Yingjie Cao
 Fix For: 1.16.0


Currently, if one output of an upstream job vertex is consumed by multiple 
downstream job vertices, the upstream vertex will produce multiple dataset. For 
blocking shuffle, it means serialize and persist the same data multiple times. 
This ticket aims to optimize this behavior and make the upstream job vertex 
produce one dataset which will be read by multiple downstream vertex.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28378) Use larger data reading buffer size for sort-shuffle

2022-07-04 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-28378:
---

 Summary: Use larger data reading buffer size for sort-shuffle
 Key: FLINK-28378
 URL: https://issues.apache.org/jira/browse/FLINK-28378
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.16.0


Currently, for sort shuffle, we always use the network buffer size as the data 
reading buffer size which is 32K by default. We can increase this buffer size 
for better performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28377) Enable to request less number of data reading buffers for sort-shuffle if there is no enough data

2022-07-04 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-28377:
---

 Summary: Enable to request less number of data reading buffers for 
sort-shuffle if there is no enough data
 Key: FLINK-28377
 URL: https://issues.apache.org/jira/browse/FLINK-28377
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.16.0


Currently, for sort blocking shuffle, the corresponding data reader always 
allocate a fixed size of buffers for shuffle data reading even when there is no 
much data to read which will cause buffer waste. We can optimize this to 
allocate less number of reading buffers if there is no enough data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28376) Reduce the number of IO threads for sort-shuffle

2022-07-04 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-28376:
---

 Summary: Reduce the number of IO threads for sort-shuffle
 Key: FLINK-28376
 URL: https://issues.apache.org/jira/browse/FLINK-28376
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.16.0


Currently, the number of IO threads for shuffle data reading is relevant to the 
size of reading memory and the number of CPU cores. We should also consider the 
number of slots and the number of disks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28374) Some further improvements of sort-shuffle

2022-07-04 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-28374:
---

 Summary: Some further improvements of sort-shuffle
 Key: FLINK-28374
 URL: https://issues.apache.org/jira/browse/FLINK-28374
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.16.0


This is an umbrella issue for sort-shuffle Improvements.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-28373) Read larger size of data sequentially for sort-shuffle

2022-07-04 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-28373:
---

 Summary: Read larger size of data sequentially for sort-shuffle
 Key: FLINK-28373
 URL: https://issues.apache.org/jira/browse/FLINK-28373
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.16.0


Currently, for sort blocking shuffle, the corresponding data readers read 
shuffle data in buffer granularity. Before compression, each buffer is 32K by 
default, after compression the size will become smaller (may less than 10K). 
For file IO, this is pretty smaller. To achieve better performance and reduce 
IOPS, we can merge consecutive data requests of the same field together and 
serves them in one IO request. More specifically,

1) if multiple data requests are reading the same data, for example, reading 
broadcast data, the reader will read the data only once and send the same piece 
of data to multiple downstream consumers.

2) if multiple data requests are reading the consecutive data in one file, we 
will merge those data requests together as one large request and read a larger 
size of data sequentially which is good for file IO performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-26347) Should use Flink system Classloader (AppClassloader) when deserializing RPC message

2022-02-24 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-26347:
---

 Summary: Should use Flink system Classloader (AppClassloader) when 
deserializing RPC message
 Key: FLINK-26347
 URL: https://issues.apache.org/jira/browse/FLINK-26347
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.15.0
Reporter: Yingjie Cao
 Fix For: 1.15.0


FLINK-25742 removed the redundant serialization of RPC invocation at Flink 
side. However, by accident, which changes the class loading behavior. Before 
FLINK-25742, Flink system Classloader is used to load RPC message class, but 
after FLINK-25742, the RpcSystem Classloader (its parent Classloader is not 
Flink system Classloader) is used which can cause ClassNotFoundException. I 
encountered this exception when trying to run flink-remote-shuffle on the 
latest Flink 1.15-SNAPSHOT, the remote shuffle class (shuffle descriptor class) 
can not be found even when the corresponding jar file is in Flink lib/ 
directory.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25960) Distribute the data read buffers more fairly among result partitions for sort-shuffle

2022-02-04 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25960:
---

 Summary: Distribute the data read buffers more fairly among result 
partitions for sort-shuffle
 Key: FLINK-25960
 URL: https://issues.apache.org/jira/browse/FLINK-25960
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.15.0


Currently, the data read buffers for sort-shuffle are allocated in a random way 
and some result partitions may occupy too many buffers which leads to the 
starvation of other result partitions. This ticket aims to improve the scenario 
by not reading data for those result partitions which already occupy more than 
the average number of read buffers per result partition.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25959) Add a micro-benchmark for the sort-based blocking shuffle

2022-02-04 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25959:
---

 Summary: Add a micro-benchmark for the sort-based blocking shuffle
 Key: FLINK-25959
 URL: https://issues.apache.org/jira/browse/FLINK-25959
 Project: Flink
  Issue Type: Sub-task
Reporter: Yingjie Cao






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25860) Move read buffer allocation and output file creation to setup method for sort-shuffle result partition to avoid blocking main thread

2022-01-27 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25860:
---

 Summary: Move read buffer allocation and output file creation to 
setup method for sort-shuffle result partition to avoid blocking main thread
 Key: FLINK-25860
 URL: https://issues.apache.org/jira/browse/FLINK-25860
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.15.0


Currently, the read buffer allocation and output file creation of sort-shuffle 
is performed by the main thread. These operations are a little heavy and can 
block the main thread for a while which may influence other RPC calls including 
follow-up task deployment. This ticket aims to solve the issue by moving read 
buffer allocation and output file creation to setup method. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25796) Avoid record copy for result partition of sort-shuffle if there are enough buffers for better performance

2022-01-24 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25796:
---

 Summary: Avoid record copy for result partition of sort-shuffle if 
there are enough buffers for better performance
 Key: FLINK-25796
 URL: https://issues.apache.org/jira/browse/FLINK-25796
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.15.0


Currently, for result partition of sort-shuffle, there is extra record copy 
overhead Introduced by clustering records by subpartition index. For small 
records, this overhead can cause even 20% performance regression. This ticket 
aims to solve the problem.

In fact, the hash-based implementation is a nature way to achieve the goal of 
sorting records by partition index. However, it incurs some serious weaknesses. 
For example, when there is no enough buffers or there is data skew, it can 
waste buffers and influence compression efficiency which can cause performance 
regression.

This ticket tries to solve the issue by dynamically switching between the two 
implementations, that is, if there are enough buffers, the hash-based 
implementation will be used and if there is no enough buffers, the sort-based 
implementation will be used.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25786) Adjust the generation of subpartition data storage order for sort-shuffle from random shuffle to random shift

2022-01-24 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25786:
---

 Summary: Adjust the generation of subpartition data storage order 
for sort-shuffle from random shuffle to random shift
 Key: FLINK-25786
 URL: https://issues.apache.org/jira/browse/FLINK-25786
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.15.0


Currently, for sort-shuffle the generation of subpartition data storage order  
is random shuffle. However, if there is no enough resources to run the 
downstream consumer tasks in parallel, the performance can be influenced 
because of the random disk IO caused by the random subpartition data storage 
order. This ticket aims to improve this scenario.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25781) Adjust the maximum number of buffers can be used per result partition for shuffle data read

2022-01-24 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25781:
---

 Summary: Adjust the maximum number of buffers can be used per 
result partition for shuffle data read
 Key: FLINK-25781
 URL: https://issues.apache.org/jira/browse/FLINK-25781
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.15.0


In the current sort-shuffle implementation, the maximum number of buffers can 
be used per result partition for shuffle data read is 32M. However, for large 
parallelism jobs, 32M is not enough and for small parallelism jobs, 32M may 
waste buffers. This ticket aims to adjust the maximum number of buffers can be 
used per result partition to let it adjust according to parallelism and the 
selected value is an empirical one based on the TPC-DS test results.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25780) Reduce the maximum number of data output buffers for sort-shuffle

2022-01-24 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25780:
---

 Summary: Reduce the maximum number of data output buffers for 
sort-shuffle
 Key: FLINK-25780
 URL: https://issues.apache.org/jira/browse/FLINK-25780
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.15.0


The data output buffer of sort-shuffle is for better disk IO performance and 
currently, the total data output buffer size is 16M which is pretty big. 
However, blocking request too many buffers may influence performance. This 
ticket aims to reduce the maximum number of data output buffers to reduce the 
buffer request time. The selected value is an empirical one based on the TPC-DS 
test results.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25774) Restrict the maximum number of buffers can be used per result partition for blocking shuffle

2022-01-24 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25774:
---

 Summary: Restrict the maximum number of buffers can be used per 
result partition for blocking shuffle
 Key: FLINK-25774
 URL: https://issues.apache.org/jira/browse/FLINK-25774
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.15.0


Currently, for blocking shuffle, the maximum number of buffers can be used per 
result partition is Integer.MAX_VALUE. For hash-shuffle, the maximum number of 
buffers to be used is (numSubpartition + 1), because the hash-shuffle 
implementation always flush the previous buffer after a new buffer is added, so 
setting the maximum number of buffers can be used to Integer.MAX_VALUE is 
meaningless. For sort-shuffle, if too many buffers are taken by one result 
partition, other result partitions and input gates may spend too much time 
waiting for buffers which can influence performance. This ticket aims to 
restrict the maximum number of buffers can be used per result partition and the 
selected value is an empirical one based on the TPC-DS test results.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25654) Remove the redundant lock in SortMergeResultPartition

2022-01-13 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25654:
---

 Summary: Remove the redundant lock in SortMergeResultPartition
 Key: FLINK-25654
 URL: https://issues.apache.org/jira/browse/FLINK-25654
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.14.3
Reporter: Yingjie Cao
 Fix For: 1.15.0, 1.14.4


After FLINK-2372, the task canceler will never call the close method of 
ResultPartition, this can reduce some race conditions and simplify the code. 
This ticket aims to remove some redundant locks in SortMergeResultPartition.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25653) Move buffer recycle in SortMergeSubpartitionReader out of lock to avoid deadlock

2022-01-13 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25653:
---

 Summary: Move buffer recycle in SortMergeSubpartitionReader out of 
lock to avoid deadlock
 Key: FLINK-25653
 URL: https://issues.apache.org/jira/browse/FLINK-25653
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.13.5, 1.14.3
Reporter: Yingjie Cao
 Fix For: 1.15.0, 1.13.6, 1.14.4


For the current sort-shuffle implementation, the different lock order in 
SortMergeSubpartitionReader and SortMergeResultPartitionReadScheduler can cause 
deadlock. To solve the problem, we can move buffer recycle in 
SortMergeSubpartitionReader out of the lock.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25640) Enhance the document for blocking shuffle

2022-01-12 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25640:
---

 Summary: Enhance the document for blocking shuffle
 Key: FLINK-25640
 URL: https://issues.apache.org/jira/browse/FLINK-25640
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: Yingjie Cao
 Fix For: 1.15.0


As discussed in 
[https://lists.apache.org/thread/pt2b1f17x2l5rlvggwxs6m265lo4ly7p], this ticket 
aims to enhance the document for blocking shuffle and add more operation 
guidelines.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25639) Increase the default read buffer size of sort-shuffle to 64M

2022-01-12 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25639:
---

 Summary: Increase the default read buffer size of sort-shuffle to 
64M
 Key: FLINK-25639
 URL: https://issues.apache.org/jira/browse/FLINK-25639
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.15.0


As discussed in 
[https://lists.apache.org/thread/pt2b1f17x2l5rlvggwxs6m265lo4ly7p], this ticket 
aims to increase the default read buffer size of sort-shuffle to 64M.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25638) Increase the default write buffer size of sort-shuffle to 16M

2022-01-12 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25638:
---

 Summary: Increase the default write buffer size of sort-shuffle to 
16M
 Key: FLINK-25638
 URL: https://issues.apache.org/jira/browse/FLINK-25638
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.15.0


As discussed in 
[https://lists.apache.org/thread/pt2b1f17x2l5rlvggwxs6m265lo4ly7p], this ticket 
aims to increase the default write buffer size of sort-shuffle to 16M.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25637) Make sort-shuffle the default shuffle implementation for batch jobs

2022-01-12 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25637:
---

 Summary: Make sort-shuffle the default shuffle implementation for 
batch jobs
 Key: FLINK-25637
 URL: https://issues.apache.org/jira/browse/FLINK-25637
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.15.0


As discussed in 
[https://lists.apache.org/thread/pt2b1f17x2l5rlvggwxs6m265lo4ly7p], this ticket 
aims to make sort-shuffle the default shuffle implementation for batch jobs.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25636) FLIP-199: Change some default config values of blocking shuffle for better usability

2022-01-12 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25636:
---

 Summary: FLIP-199: Change some default config values of blocking 
shuffle for better usability
 Key: FLINK-25636
 URL: https://issues.apache.org/jira/browse/FLINK-25636
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.15.0


This is the umbrella issue for FLIP-199, we will change the several default 
config value for batch shuffle and update the document accordingly.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25607) Sorting by duration on Flink Web UI does not work correctly

2022-01-11 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25607:
---

 Summary: Sorting by duration on Flink Web UI does not work 
correctly
 Key: FLINK-25607
 URL: https://issues.apache.org/jira/browse/FLINK-25607
 Project: Flink
  Issue Type: Bug
Reporter: Yingjie Cao
 Attachments: image-2022-01-11-16-44-10-709.png

The Flink version used is 1.14.

!image-2022-01-11-16-44-10-709.png!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-25606) Requesting exclusive buffers timeout when recovering from unaligned checkpoint under fine-grained resource mode

2022-01-11 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-25606:
---

 Summary: Requesting exclusive buffers timeout when recovering from 
unaligned checkpoint under fine-grained resource mode
 Key: FLINK-25606
 URL: https://issues.apache.org/jira/browse/FLINK-25606
 Project: Flink
  Issue Type: Bug
Reporter: Yingjie Cao


When converting the RecoveredInputChannel to RemoteInputChannel, the network 
buffer is not enough to initialize input channel exclusive buffers. Here is the 
exception stack:
{code:java}
java.io.IOException: Timeout triggered when requesting exclusive buffers: The 
total number of network buffers is currently set to 6144 of 32768 bytes each. 
You can increase this number by setting the configuration keys 
'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 
'taskmanager.memory.network.max',  or you may increase the timeout which is 
3ms by setting the key 
'taskmanager.network.memory.exclusive-buffers-request-timeout-ms'.
  at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:205)
  at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:60)
  at 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager.requestExclusiveBuffers(BufferManager.java:133)
  at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.setup(RemoteInputChannel.java:157)
  at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteRecoveredInputChannel.toInputChannelInternal(RemoteRecoveredInputChannel.java:77)
  at 
org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel.toInputChannel(RecoveredInputChannel.java:106)
  at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.convertRecoveredInputChannels(SingleInputGate.java:307)
  at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.requestPartitions(SingleInputGate.java:290)
  at 
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.requestPartitions(InputGateWithMetrics.java:94)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
  at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
  at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:359)
  at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:323)
  at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
  at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
  at java.lang.Thread.run(Thread.java:834) {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24954) Reset read buffer request timeout on buffer recycling for sort-shuffle

2021-11-18 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-24954:
---

 Summary: Reset read buffer request timeout on buffer recycling for 
sort-shuffle
 Key: FLINK-24954
 URL: https://issues.apache.org/jira/browse/FLINK-24954
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.15.0


Currently, the read buffer request timeout implementation of sort-shuffle is a 
little aggressive. As reported in the mailing list: 
[https://lists.apache.org/thread/bd3s5bqfg9oxlb1g1gg3pxs3577lhf88]. The 
TimeoutException may be triggered if there is data skew and the downstream task 
is slow. Actually, we can further improve this case by reseting the request 
timeout on buffer recycling.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24901) Some further improvements of the pluggable shuffle framework

2021-11-15 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-24901:
---

 Summary: Some further improvements of the pluggable shuffle 
framework
 Key: FLINK-24901
 URL: https://issues.apache.org/jira/browse/FLINK-24901
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.15.0


This is an umbrella issue including several further improvements of the 
pluggable shuffle framework.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24900) Support to run multiple shuffle plugins in one session cluster

2021-11-15 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-24900:
---

 Summary: Support to run multiple shuffle plugins in one session 
cluster
 Key: FLINK-24900
 URL: https://issues.apache.org/jira/browse/FLINK-24900
 Project: Flink
  Issue Type: Improvement
Reporter: Yingjie Cao


Currently, one Flink cluster can only use one shuffle plugin. However, there 
are cases where different jobs may need different shuffle implementations. By 
loading shuffle plugin with the plugin manager and letting jobs select their 
shuffle service freely, Flink can support to run multiple shuffle plugins in 
one session cluster.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24899) Enable data compression for blocking shuffle by default

2021-11-15 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-24899:
---

 Summary: Enable data compression for blocking shuffle by default
 Key: FLINK-24899
 URL: https://issues.apache.org/jira/browse/FLINK-24899
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.15.0


Currently, shuffle data compression is not enabled by default. Shuffle data 
compression is important for blocking data shuffle and enabling shuffle data 
compression by default can improve the usability.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24898) Some further improvements of sort-shuffle

2021-11-15 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-24898:
---

 Summary: Some further improvements of sort-shuffle
 Key: FLINK-24898
 URL: https://issues.apache.org/jira/browse/FLINK-24898
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.15.0


This is an umbrella issue including several further improvements of 
sort-shuffle.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-24035) Notify the buffer listeners when the local buffer pool receives available notification from the global pool

2021-08-27 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-24035:
---

 Summary: Notify the buffer listeners when the local buffer pool 
receives available notification from the global pool
 Key: FLINK-24035
 URL: https://issues.apache.org/jira/browse/FLINK-24035
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.14.0
Reporter: Yingjie Cao
 Fix For: 1.14.0


The buffer listeners are not notified when the the local buffer pool receives 
available notification from the global pool. This may cause potential deadlock 
issue:
 # A LocalBufferPool is created, but there is no available buffers in the 
global NetworkBufferPool.
 # The LocalBufferPool registers an available buffer listener to the global 
NetworkBufferPool.
 # The BufferManager requests buffers from the LocalBufferPool but no buffer is 
available. As a result, it registers an available buffer listener to the 
LocalBufferPool.
 # A buffer is recycled to the global pool and the local buffer pool is 
notified about the available buffer.
 # The local buffer pool requests the available buffer from the global pool but 
the registered available buffer listener of BufferManager is not notified and 
it can never get a chance to be notified so deadlock occurs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23724) Network buffer leak when ResultPartition is released (failover)

2021-08-11 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-23724:
---

 Summary: Network buffer leak when ResultPartition is released 
(failover)
 Key: FLINK-23724
 URL: https://issues.apache.org/jira/browse/FLINK-23724
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.14.0
Reporter: Yingjie Cao
 Fix For: 1.14.0


The BufferBuilders in BufferWritingResultPartition are not properly released 
when ResultPartition is released.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23275) Support to release cluster partitions stored externally

2021-07-06 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-23275:
---

 Summary: Support to release cluster partitions stored externally
 Key: FLINK-23275
 URL: https://issues.apache.org/jira/browse/FLINK-23275
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination, Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.14.0


Currently, if the cluster partition is stored externally, it can not be 
released by the partition tracker, one of reason is that the ShuffleMaster is 
not a cluster level service, after FLINK-23214,we can release the cluster 
partition by ShuffleMaster.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23249) Introduce ShuffleMasterContext to ShuffleMaster

2021-07-05 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-23249:
---

 Summary: Introduce ShuffleMasterContext to ShuffleMaster
 Key: FLINK-23249
 URL: https://issues.apache.org/jira/browse/FLINK-23249
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination, Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.14.0


Introduce ShuffleMasterContext to ShuffleMaster. Just like the 
ShuffleEnvironmentContext at the TaskManager side, the ShuffleMasterContext can 
act as a proxy of ShuffleMaster and other components of Flink like the 
ResourceManagerPartitionTracker.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23214) Make ShuffleMaster a cluster level shared service

2021-07-02 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-23214:
---

 Summary: Make ShuffleMaster a cluster level shared service
 Key: FLINK-23214
 URL: https://issues.apache.org/jira/browse/FLINK-23214
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.14.0
Reporter: Yingjie Cao


This ticket tries to make ShuffleMaster a cluster level shared service which 
makes it consistent with the ShuffleEnvironment at the TM side.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22910) ShuffleMaster enhancement for pluggable shuffle service framework

2021-06-07 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-22910:
---

 Summary: ShuffleMaster enhancement for pluggable shuffle service 
framework
 Key: FLINK-22910
 URL: https://issues.apache.org/jira/browse/FLINK-22910
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Yingjie Cao
 Fix For: 1.14.0


The current _ShuffleMaster_ has an unclear lifecycle which is inconsistent with 
the _ShuffleEnvironment_ at the _TM_ side. Besides, it is hard to Implement 
some important capabilities for remote shuffle service. For example, 1) release 
external resources when a job finished; 2) Stop or start tracking some 
partitions depending on the status of the external service or system.

We drafted a document[1] which proposed some simple changes to solve these 
issues. The document is still not wholly completed yet. We will start a 
discussion once it is finished.

 

[1] 
https://docs.google.com/document/d/1_cHoapNbx_fJ7ZNraSqw4ZK1hMRiWWJDITuSZrdMDDs/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22307) Increase the data writing cache size of sort-merge blocking shuffle

2021-04-16 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-22307:
---

 Summary: Increase the data writing cache size of sort-merge 
blocking shuffle
 Key: FLINK-22307
 URL: https://issues.apache.org/jira/browse/FLINK-22307
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.13.0


Currently, the data writing cache is 8M, which is not enough if data 
compression is enabled. By increasing the cache size to 16M, the performance of 
our benchmark job can be increased by about 20%. (We may make it configurable 
in the future)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22305) Increase the default value of taskmanager.network.sort-shuffle.min-buffers

2021-04-16 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-22305:
---

 Summary: Increase the default value of 
taskmanager.network.sort-shuffle.min-buffers
 Key: FLINK-22305
 URL: https://issues.apache.org/jira/browse/FLINK-22305
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.13.0


Currently, the default value of taskmanager.network.sort-shuffle.min-buffers is 
64, which is pretty small. As suggested, we'd like to increase the default 
value of taskmanager.network.sort-shuffle.min-buffers. By increasing the 
default taskmanager.network.sort-shuffle.min-buffers, the corner case of very 
small in-memory sort-buffer and write-buffer can be avoid, which is better for 
performance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22156) HiveDialectQueryITCase fails on Azure because of no output for 900 seconds

2021-04-08 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-22156:
---

 Summary: HiveDialectQueryITCase fails on Azure because of no 
output for 900 seconds
 Key: FLINK-22156
 URL: https://issues.apache.org/jira/browse/FLINK-22156
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime, Tests
Reporter: Yingjie Cao
 Fix For: 1.13.0


[https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_apis/build/builds/16105/logs/139]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22153) Manually test the sort-merge blocking shuffle

2021-04-08 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-22153:
---

 Summary: Manually test the sort-merge blocking shuffle
 Key: FLINK-22153
 URL: https://issues.apache.org/jira/browse/FLINK-22153
 Project: Flink
  Issue Type: Task
  Components: Runtime / Network
Affects Versions: 1.13.0
Reporter: Yingjie Cao
 Fix For: 1.13.0


In 1.12, we introduced sort-merge blocking shuffle to Flink and in 1.13, the 
feature was optimized which improves the usability (fix direct memory OOM 
issue) and performance (introduce IO scheduling and broadcast optimization).

The sort-merge blocking shuffle can be tested following the bellow process:
 # Write a simple batch job using either sql/table or DataStream API; (Word 
count should be enough)
 # Enable sort-merge blocking shuffle by setting 
taskmanager.network.sort-shuffle.min-parallelism to 1 in the Flink 
configuration file;
 # Submit and run the batch job with different parallelism and data volume;
 # Tune the relevant config options 
(taskmanager.network.blocking-shuffle.compression.enabled, 
taskmanager.network.sort-shuffle.min-buffers, 
taskmanager.memory.framework.off-heap.batch-shuffle.size) and see the 
influence. 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22127) Enrich error message of read buffer request timeout exception

2021-04-06 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-22127:
---

 Summary: Enrich error message of read buffer request timeout 
exception
 Key: FLINK-22127
 URL: https://issues.apache.org/jira/browse/FLINK-22127
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.13.0
Reporter: Yingjie Cao
 Fix For: 1.13.0


Enrich error message of read buffer request timeout exception to tell the user 
how to solve the timeout exception.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21951) Fix wrong if condition in BufferReaderWriterUtil#writeBuffers

2021-03-24 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-21951:
---

 Summary: Fix wrong if condition in 
BufferReaderWriterUtil#writeBuffers
 Key: FLINK-21951
 URL: https://issues.apache.org/jira/browse/FLINK-21951
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.13.0


The wrong if condition in BufferReaderWriterUtil#writeBuffers may lead to data 
loss when bulk writing a large amount of data into file. This is a bug intruded 
since 1.9, but only small amount of data is written, so the bug never occurs. 
In 1.13, the sort-merge shuffle uses it to write more data which triggers the 
bug.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21859) Batch job fails due to "Could not mark slot 61a637e3977c58a0e6b73533c419297d active"

2021-03-18 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-21859:
---

 Summary: Batch job fails due to "Could not mark slot 
61a637e3977c58a0e6b73533c419297d active"
 Key: FLINK-21859
 URL: https://issues.apache.org/jira/browse/FLINK-21859
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Task
Affects Versions: 1.13.0
Reporter: Yingjie Cao


Here is the error stack:
{code:java}
2021-03-18 19:05:31org.apache.flink.runtime.JobException: Recovery is 
suppressed by NoRestartBackoffTimeStrategyat 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:130)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:81)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:221)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:212)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:203)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:701)
at 
org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
at 
org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1449)
at 
org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1105)
at 
org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1045)
at 
org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:754)
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195)
at 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182)
at 
org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:271)
at 
java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656)
at 
java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:669)
at 
java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:1997)  
  at org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:271) 
   at 
org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152)
at 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:385)
at 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.lambda$releaseSlot$1(DefaultDeclarativeSlotPool.java:376)
at java.util.Optional.ifPresent(Optional.java:159)at 
org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlot(DefaultDeclarativeSlotPool.java:374)
at 
org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.failAllocation(DeclarativeSlotPoolService.java:198)
at 
org.apache.flink.runtime.jobmaster.JobMaster.internalFailAllocation(JobMaster.java:650)
at 
org.apache.flink.runtime.jobmaster.JobMaster.failSlot(JobMaster.java:636)at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)   
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:301)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)at 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)at 
akka.actor.Actor$class.aroundReceive(Actor.scala:517)at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)at 
akka.actor.ActorCell.invoke(ActorCell.scala:561)at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)at 
akka.dispatch.Mailbox.run(Mailbox.scala:225)at 

[jira] [Created] (FLINK-21857) StackOverflow for large parallelism jobs when processing EndOfChannelStateEvent

2021-03-18 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-21857:
---

 Summary: StackOverflow for large parallelism jobs when processing 
EndOfChannelStateEvent
 Key: FLINK-21857
 URL: https://issues.apache.org/jira/browse/FLINK-21857
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.13.0


CheckpointedInputGate#handleEvent calls pollNext recursively when processing 
EndOfChannelStateEvent, for large parallelism job of large amount of input 
channels, the stack can become really deep thus causing StackOverflow. The 
following is the stack:
{code:java}
12:11
java.lang.StackOverflowError
  at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.waitAndGetNextData(SingleInputGate.java:650)
  at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:625)
  at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNext(SingleInputGate.java:611)
  at 
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.pollNext(InputGateWithMetrics.java:109)
  at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:149)
  at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:202)
  at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
  at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:202)
  at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
  at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:202)
  at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
  at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:202)
  at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
  at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:202)
  at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
  at 
org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:202){code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21850) Improve document and config description of sort-merge blocking shuffle

2021-03-17 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-21850:
---

 Summary: Improve document and config description of sort-merge 
blocking shuffle
 Key: FLINK-21850
 URL: https://issues.apache.org/jira/browse/FLINK-21850
 Project: Flink
  Issue Type: Sub-task
  Components: Documentation
Reporter: Yingjie Cao
 Fix For: 1.13.0


After the improvement of FLINK-19614, some of the previous document description 
for sort-merge blocking shuffle is not accurate, we need to improve the 
corresponding document.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21790) Shuffle data directories to make data directory section of different TaskManagers fairer

2021-03-15 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-21790:
---

 Summary: Shuffle data directories to make data directory section 
of different TaskManagers fairer
 Key: FLINK-21790
 URL: https://issues.apache.org/jira/browse/FLINK-21790
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.13.0


Currently, different TaskManagers select data directory in the same order and 
if there are multiple disk, some disks may stores more data than others which 
is bad for performance. A simple improvement is that each TaskManager shuffles 
the given data directories randomly and select the data directory in different 
order.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21789) Make FileChannelManagerImpl#getNextPathNum select data directory fairly

2021-03-15 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-21789:
---

 Summary: Make FileChannelManagerImpl#getNextPathNum select data 
directory fairly
 Key: FLINK-21789
 URL: https://issues.apache.org/jira/browse/FLINK-21789
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.13.0


The get and increase next path index in FileChannelManagerImpl#getNextPathNum 
is not atomic which may cause unfairness of data directory selection (bad for 
performance if multiple disk is configured).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21788) Throw PartitionNotFoundException if the partition file has been lost for blocking shuffle

2021-03-15 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-21788:
---

 Summary: Throw PartitionNotFoundException if the partition file 
has been lost for blocking shuffle
 Key: FLINK-21788
 URL: https://issues.apache.org/jira/browse/FLINK-21788
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.13.0


Currently, if the partition file has been lost for blocking shuffle, 
FileNotFoundException will be thrown and the partition data is not regenerated, 
so failover can not recover the job. It should throw PartitionNotFoundException 
instead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21778) Use heap memory instead of direct memory as index entry cache for sort-merge shuffle

2021-03-13 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-21778:
---

 Summary: Use heap memory instead of direct memory as index entry 
cache for sort-merge shuffle
 Key: FLINK-21778
 URL: https://issues.apache.org/jira/browse/FLINK-21778
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.12.0
Reporter: Yingjie Cao
 Fix For: 1.13.0


Currently, the sort-merge shuffle implementation uses a piece of direct memory 
as index entry cache for acceleration. We can use heap memory instead to reduce 
the usage of direct memory which further reduces the possibility of 
OutOfMemoryError.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21777) Replace the 4M data writing cache of sort-merge shuffle with writev system call

2021-03-13 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-21777:
---

 Summary: Replace the 4M data writing cache of sort-merge shuffle 
with writev system call
 Key: FLINK-21777
 URL: https://issues.apache.org/jira/browse/FLINK-21777
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.12.0
Reporter: Yingjie Cao
 Fix For: 1.13.0


Currently, the sort-merge shuffle implementation uses 4M unmanned direct memory 
as cache for data written. It can be replaced by the writev system call which 
can reduce the unmanned direct memory usage without any performance loss.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20758) Use region file mechanism for shuffle data reading before we switch to managed memory

2020-12-23 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-20758:
---

 Summary: Use region file mechanism for shuffle data reading before 
we switch to managed memory
 Key: FLINK-20758
 URL: https://issues.apache.org/jira/browse/FLINK-20758
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.12.0
Reporter: Yingjie Cao
 Fix For: 1.12.2


FLINK-15981 implemented region file based data reader to solve the direct 
memory OOM issue introduced by usage of unmanaged direct memory, however only 
for BoundedBlockingResultPartition. We can introduce it to sort-merge based 
blocking shuffle to avoid the similar direct memory OOM problem which can 
improve the usability a lot.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20757) Optimize data broadcast for sort-merge shuffle

2020-12-23 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-20757:
---

 Summary: Optimize data broadcast for sort-merge shuffle
 Key: FLINK-20757
 URL: https://issues.apache.org/jira/browse/FLINK-20757
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.12.0
Reporter: Yingjie Cao
 Fix For: 1.13.0


For data broadcast, we can only copy the record once when writing data into 
SortBuffer. Besides, we can write only one copy of data when spilling data into 
disk. These optimizations can improve the performance of data broadcast.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20740) Use managed memory to avoid direct memory OOM error

2020-12-22 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-20740:
---

 Summary: Use managed memory to avoid direct memory OOM error
 Key: FLINK-20740
 URL: https://issues.apache.org/jira/browse/FLINK-20740
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.12.0
Reporter: Yingjie Cao
 Fix For: 1.13.0


Currently, sort-merge blocking shuffle uses some unmanaged memory for data 
writing and reading, which means users must increase the size of direct memory, 
otherwise, one may encounter direct memory OOM error, which is really bad for 
usability.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20013) BoundedBlockingSubpartition may leak network buffer if task is failed or canceled

2020-11-05 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-20013:
---

 Summary: BoundedBlockingSubpartition may leak network buffer if 
task is failed or canceled
 Key: FLINK-20013
 URL: https://issues.apache.org/jira/browse/FLINK-20013
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.12.0


BoundedBlockingSubpartition may leak network buffer if task is failed or 
canceled. We need to recycle the current BufferConsumer when task is failed or 
canceled.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-20010) SinkITCase.writerAndCommitterAndGlobalCommitterExecuteInStreamingMode fails on Azure Pipeline

2020-11-05 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-20010:
---

 Summary: 
SinkITCase.writerAndCommitterAndGlobalCommitterExecuteInStreamingMode fails on 
Azure Pipeline
 Key: FLINK-20010
 URL: https://issues.apache.org/jira/browse/FLINK-20010
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Yingjie Cao
 Fix For: 1.12.0


SinkITCase.writerAndCommitterAndGlobalCommitterExecuteInStreamingMode fails on 
Azure Pipeline
{code:java}
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:273)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:238)
at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:159)
at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
at 
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)

{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19991) UnalignedCheckpointITCase#shouldPerformUnalignedCheckpointMassivelyParallel fails on Azure Pipeline

2020-11-05 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-19991:
---

 Summary: 
UnalignedCheckpointITCase#shouldPerformUnalignedCheckpointMassivelyParallel 
fails on Azure Pipeline
 Key: FLINK-19991
 URL: https://issues.apache.org/jira/browse/FLINK-19991
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing, Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.12.0


UnalignedCheckpointITCase#shouldPerformUnalignedCheckpointMassivelyParallel 
fails on Azure Pipeline
{code:java}
java.lang.AssertionError: 

Expected: <0L>
 but: was <1809L>
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
at org.junit.Assert.assertThat(Assert.java:956)
at org.junit.rules.ErrorCollector$1.call(ErrorCollector.java:65)
at org.junit.rules.ErrorCollector.checkSucceeds(ErrorCollector.java:78)
at org.junit.rules.ErrorCollector.checkThat(ErrorCollector.java:63)
at org.junit.rules.ErrorCollector.checkThat(ErrorCollector.java:54)
at 
org.apache.flink.test.checkpointing.UnalignedCheckpointITCase.execute(UnalignedCheckpointITCase.java:189)
at 
org.apache.flink.test.checkpointing.UnalignedCheckpointITCase.shouldPerformUnalignedCheckpointMassivelyParallel(UnalignedCheckpointITCase.java:179)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.Verifier$1.evaluate(Verifier.java:35)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
at 
org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)

{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19938) Implement shuffle data read scheduling for sort-merge blocking shuffle

2020-11-02 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-19938:
---

 Summary: Implement shuffle data read scheduling for sort-merge 
blocking shuffle
 Key: FLINK-19938
 URL: https://issues.apache.org/jira/browse/FLINK-19938
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.12.0
Reporter: Yingjie Cao
 Fix For: 1.12.0


As described in 
[https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Merge+Based+Blocking+Shuffle+to+Flink.]
 shuffle IO scheduling is important for performance. We'd like to Introduce it 
to sort-merge shuffle first.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19614) Further optimization of sort-merge based blocking shuffle

2020-10-13 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-19614:
---

 Summary: Further optimization of sort-merge based blocking shuffle
 Key: FLINK-19614
 URL: https://issues.apache.org/jira/browse/FLINK-19614
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Yingjie Cao


FLINK-19582 introduces a basic sort-merge based blocking shuffle 
implementation. We can further optimize it based on the approaches proposed in 
[https://docs.google.com/document/d/1mpekX6aAHJhBsQ0pS9MxDiFQjHQIuaJH0GAQHh0GlJ0/edit?usp=sharing|https://docs.google.com/document/d/1mpekX6aAHJhBsQ0pS9MxDiFQjHQIuaJH0GAQHh0GlJ0/edit?usp=sharing,].

This is the umbrella ticket for the optimizations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19603) Introduce shuffle data compression to sort-merge based blocking shuffle

2020-10-13 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-19603:
---

 Summary: Introduce shuffle data compression to sort-merge based 
blocking shuffle
 Key: FLINK-19603
 URL: https://issues.apache.org/jira/browse/FLINK-19603
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.12.0
Reporter: Yingjie Cao
 Fix For: 1.12.0


Shuffle data compression can reduce the storage overhead and improve the 
disk/network IO performance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19602) Introduce new config options to enable sort-merge based blocking shuffle

2020-10-13 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-19602:
---

 Summary: Introduce new config options to enable sort-merge based 
blocking shuffle
 Key: FLINK-19602
 URL: https://issues.apache.org/jira/browse/FLINK-19602
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.12.0
Reporter: Yingjie Cao
 Fix For: 1.12.0


1. taskmanager.network.sort-merge-blocking-shuffle.enabled: Boolean flag to 
enable/disable sort-merge based blocking shuffle.

2. taskmanager.network.sort-merge-blocking-shuffle.buffers-per-partition: 
Number of network buffers required for each sort-merge blocking result 
partition.

3. taskmanager.network.sort-merge-blocking-shuffle.min-parallelism: Minimum 
parallelism to enable the sort-merge based blocking shuffle.

With these new config options, the default behavior of blocking shuffle stays 
unchanged.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19601) Introduce sort-merge based blocking result partition SortMergeResultPartition and the corresponding subpartition reader

2020-10-13 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-19601:
---

 Summary: Introduce sort-merge based blocking result partition 
SortMergeResultPartition and the corresponding subpartition reader
 Key: FLINK-19601
 URL: https://issues.apache.org/jira/browse/FLINK-19601
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.12.0
Reporter: Yingjie Cao
 Fix For: 1.12.0


SortMergeResultPartition appends all added records and events to a SortBuffer 
and after the SortBuffer is full, all data in the SortBuffer will be copied and 
spilled to a PartitionedFile in subpartition index order. Different from the 
hash-based blocking shuffle implementation, SortMergeResultPartition can write 
at most one file concurrently.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19600) Introduce PartitionedFile and the corresponding writer/reader for sort-merge based blocking shuffle

2020-10-13 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-19600:
---

 Summary: Introduce PartitionedFile and the corresponding 
writer/reader for sort-merge based blocking shuffle
 Key: FLINK-19600
 URL: https://issues.apache.org/jira/browse/FLINK-19600
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.12.0
Reporter: Yingjie Cao
 Fix For: 1.12.0


PartitionedFile is the persistent file type of sort-merge based blocking 
shuffle. It stores data of all subpartitions in subpartition index order. 
PartitionedFile can be produced by PartitionedFileWriter and consumed by 
PartitionedFileReader. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19597) Introduce SortBuffer and its implementation PartitionSortedBuffer for sort-merge based blocking shuffle

2020-10-12 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-19597:
---

 Summary: Introduce SortBuffer and its implementation 
PartitionSortedBuffer for sort-merge based blocking shuffle
 Key: FLINK-19597
 URL: https://issues.apache.org/jira/browse/FLINK-19597
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Affects Versions: 1.12.0
Reporter: Yingjie Cao
 Fix For: 1.12.0


Data of different channels can be appended to a SortBuffer and after the 
SortBuffer is finished, the appended data can be copied from it in channel 
index order. PartitionSortedBuffer is an implementation of SortBuffer and it 
sorts all appended records only by subpartition index. Records of the same 
subpartition keep the appended order.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19582) Introduce sort-merge based blocking shuffle to Flink

2020-10-12 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-19582:
---

 Summary: Introduce sort-merge based blocking shuffle to Flink
 Key: FLINK-19582
 URL: https://issues.apache.org/jira/browse/FLINK-19582
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Affects Versions: 1.12.0
Reporter: Yingjie Cao
 Fix For: 1.12.0


 

*Motivation*

Hash-based blocking shuffle and sort-merge based blocking shuffle are two main 
blocking shuffle implementations wildly adopted by existing distributed data 
processing frameworks. Hash-based implementation writes data sent to different 
reducer tasks into separate files concurrently while sort-merge based approach 
writes those data together into a single file and merges those small files into 
bigger ones. Compared to sort-merge based approach, hash-based approach has 
several weak points when it comes to running large scale batch jobs:

*1. Stability*

For high parallelism (tens of thousands) batch job, current hash-based blocking 
shuffle implementation writes too many files concurrently which gives high 
pressure to the file system, for example, maintenance of too many  file metas, 
high system cpu consumption and exhaustion of inodes or file descriptors. All 
of these can be potential stability issues which we encountered in our 
production environment before we switch to sort-merge based blocking shuffle.

Sort-Merge based blocking shuffle don’t have the problem because for one result 
partition, only one file is written at the same time.

*2. Performance*

Large amounts of small shuffle files and random io can influence shuffle 
performance a lot especially for hdd (for ssd, sequential read is also 
important because of read ahead and cache). 

For batch job processing massive data, small amount of data per subpartition is 
common, because to reduce the job completion time, we usually increase the job 
parallelism to reduce the amount of data processed per task and the average 
data amount per subpartition is relevant to:

(the amount of data per task) / (parallelism) = (total amount of data) / 
(parallelism^2)

which means increasing parallelism can decrease the amount of data per 
subpartition rapidly. 

Besides, data skew is another cause of small subpartition files. By merging 
data of all subpartitions together in one file, more sequential read can be 
achieved.

*3. Resource*

For current hash-based implementation, each subpartition needs at least one 
buffer. For large scale batch shuffles, the memory consumption can be huge. For 
example, we need at least 320M network memory per result partition if 
parallelism is set to 1 and because of the huge network consumption, it is 
hard to config the network memory for large scale batch job and  sometimes 
parallelism can not be increased just because of insufficient network memory  
which leads to bad user experience.

By introducing the sort-merge based approach to Flink, we can improve Flink’s 
capability of running large scale batch jobs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19344) DispatcherResourceCleanupTest#testJobSubmissionUnderSameJobId is unstable on Azure Pipeline

2020-09-22 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-19344:
---

 Summary: 
DispatcherResourceCleanupTest#testJobSubmissionUnderSameJobId is unstable on 
Azure Pipeline
 Key: FLINK-19344
 URL: https://issues.apache.org/jira/browse/FLINK-19344
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Yingjie Cao


Here is the log and stack: 
[https://dev.azure.com/kevin-flink/flink/_build/results?buildId=88=logs=6e58d712-c5cc-52fb-0895-6ff7bd56c46b=f30a8e80-b2cf-535c-9952-7f521a4ae374|https://dev.azure.com/kevin-flink/flink/_build/results?buildId=88=logs=6e58d712-c5cc-52fb-0895-6ff7bd56c46b=f30a8e80-b2cf-535c-9952-7f521a4ae374.]
{code:java}
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19323) Small optimization of network layer record serialization

2020-09-21 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-19323:
---

 Summary: Small optimization of network layer record serialization
 Key: FLINK-19323
 URL: https://issues.apache.org/jira/browse/FLINK-19323
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.12.0


Currently, when serializing a record, the SpanningRecordSerializer will first 
skip 4 bytes for length and serialize the record. Then it gets the serialized 
record length and skip back to position 0 and write the length field. After 
that, it skip again to the tail of the serialized data. In fact, the last two 
skip can be avoid by writing length field to position 0 directly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19320) Remove clearBuffers from the public interfaces of RecordWriter

2020-09-21 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-19320:
---

 Summary: Remove clearBuffers from the public interfaces of 
RecordWriter
 Key: FLINK-19320
 URL: https://issues.apache.org/jira/browse/FLINK-19320
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Task
Reporter: Yingjie Cao
 Fix For: 1.12.0


Currently, RecordWriter#clearBuffers is only used to finish the current 
BufferBuilder and it does not clear buffers any more. Previously, it was used 
to recycle the partially filled buffer in the serializer but currently the 
serializer does not contains any network buffer any more.

For now, only some tests and DataSet classes use it and all of these usage 
should be replaced by RecordWriter#close which dose the same thing. Besides, 
for FLINK-19297, we do not want to keep this method in the new 
ResultPartitionWriter Interface. So in this ticket, we propose to remove 
clearBuffers from the public interfaces of RecordWriter.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19312) Introduce BufferWritingResultPartition which wraps the logic of writing buffers to ResultSubpartition

2020-09-21 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-19312:
---

 Summary: Introduce BufferWritingResultPartition which wraps the 
logic of writing buffers to ResultSubpartition
 Key: FLINK-19312
 URL: https://issues.apache.org/jira/browse/FLINK-19312
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.12.0


In the current abstraction, buffers are written to and read from 
ResultSubpartition which is a hash-style data writing and reading 
implementation. In the future, sort-merge based ResultPartitionWriter will be 
implemented which can not share the current hash-style ResultSubpartition 
related logic. This ticket tries to introduce the BufferWritingResultPartition 
which wraps the logic related to ResultSubpartition, after which the 
ResultPartition is free of ResultSubpartition and can be reused by the future 
sort-merge based ResultPartitionWriter implementation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19302) Flushing of BoundedBlockingResultPartition should finish current BufferBuilder

2020-09-21 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-19302:
---

 Summary: Flushing of BoundedBlockingResultPartition should finish 
current BufferBuilder
 Key: FLINK-19302
 URL: https://issues.apache.org/jira/browse/FLINK-19302
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.12.0


Currently, flushing of BoundedBlockingResultPartition flushes and closes the 
current BufferConsumer but dose not finish the corresponding BufferBuilder. As 
a result, the records coming latter can be appended to already recycled  buffer.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19299) NettyShuffleEnvironmentBuilder#setBufferSize does not take effect

2020-09-20 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-19299:
---

 Summary: NettyShuffleEnvironmentBuilder#setBufferSize does not 
take effect
 Key: FLINK-19299
 URL: https://issues.apache.org/jira/browse/FLINK-19299
 Project: Flink
  Issue Type: Bug
  Components: Tests
Reporter: Yingjie Cao
 Fix For: 1.12.0


Currently, NettyShuffleEnvironmentBuilder#setBufferSize does not take effect 
because the set value is never used when building the NettyShuffleEnvironment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-19297) Make ResultPartitionWriter record-oriented

2020-09-20 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-19297:
---

 Summary: Make ResultPartitionWriter record-oriented
 Key: FLINK-19297
 URL: https://issues.apache.org/jira/browse/FLINK-19297
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Affects Versions: 1.12.0
Reporter: Yingjie Cao
 Fix For: 1.12.0


Currently, ResultPartitionWriter is buffer-oriented, that is, RecordWriter will 
add buffer of different channels to ResultPartitionWriter and the buffer 
boundary serves as a nature boundary of data belonging to different channels. 
However, this abstraction is not flexible enough to handle some cases where 
records are appended a joint-structure shared by all channels and sorting is 
used to cluster data belonging to different channels.

In this ticket, we propose to make ResultPartitionWriter record oriented which 
offers more flexibility to the implementation of ResultPartitionWriter. And 
based on the new record-oriented Interface, we will introduce the sort-merge 
based blocking shuffle to Flink in the future.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18762) Make network buffers per incoming/outgoing channel can be configured separately

2020-07-30 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-18762:
---

 Summary: Make network buffers per incoming/outgoing channel can be 
configured separately
 Key: FLINK-18762
 URL: https://issues.apache.org/jira/browse/FLINK-18762
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.12.0


In FLINK-16012, we want to decrease the default number of exclusive buffers at 
receiver side from 2 to 1 to accelerate checkpoint in cases of backpressure. 
However, number of buffers per outgoing and incoming channels are configured by 
a single configuration key. It is better to make network buffers per 
incoming/outgoing channel can be configured separately which is more flexible. 
At the same time, we can keep the default behavior compatible.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18728) Make initialCredit of RemoteInputChannel final

2020-07-27 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-18728:
---

 Summary: Make initialCredit of RemoteInputChannel final
 Key: FLINK-18728
 URL: https://issues.apache.org/jira/browse/FLINK-18728
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.12.0


The filed initialCredit of RemoteInputChannel is set only once and can be 
accessed by multi threads. We can make the filed final and moves the 
initialization to the constructor of RemoteInputChannel to avoid potential 
thread safety issues.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18727) Remove the previous finished empty Buffer in PipelinedSubpartition when adding a new Buffer

2020-07-27 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-18727:
---

 Summary: Remove the previous finished empty Buffer in 
PipelinedSubpartition when adding a new Buffer
 Key: FLINK-18727
 URL: https://issues.apache.org/jira/browse/FLINK-18727
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Yingjie Cao
 Fix For: 1.12.0


For current implementation of PipelinedSubpartition, empty Buffer consumes 
credit, which means we need at lease one credit to handle the finished empty 
Buffer without any data. We can remove and recycle the finished empty Buffer in 
the queue when adding a new Buffer, after which, the reader does not need any 
available credit to handle the finished empty buffer. For example, if the new 
buffer is an event and the previous buffer is finished but without any data, 
after it is removed, the reader is able to process the new added event without 
any available credit.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18453) Stack overflow of AggregateITCase#testAggregationCodeSplit on Azure

2020-06-29 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-18453:
---

 Summary: Stack overflow of 
AggregateITCase#testAggregationCodeSplit  on Azure
 Key: FLINK-18453
 URL: https://issues.apache.org/jira/browse/FLINK-18453
 Project: Flink
  Issue Type: Bug
Reporter: Yingjie Cao


Here is some log:

```
[ERROR] testAggregationCodeSplit[LocalGlobal=OFF, MiniBatch=OFF, 
StateBackend=HEAP](org.apache.flink.table.planner.runtime.stream.sql.AggregateITCase)
 Time elapsed: 8.167 s <<< ERROR!
Caused by: java.lang.StackOverflowError 
 at org.codehaus.janino.CodeContext.extract16BitValue(CodeContext.java:720) 
 at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:561) 
 at org.codehaus.janino.CodeContext.flowAnalysis(CodeContext.java:557)
``` 
The whole log: 
[https://dev.azure.com/kevin-flink/flink/_build/results?buildId=30=logs=a1590513-d0ea-59c3-3c7b-aad756c48f25=5129dea2-618b-5c74-1b8f-9ec63a37a8a6]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18408) BinaryRowData should use de-/serialization method of MemorySegment with explicit endianness

2020-06-22 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-18408:
---

 Summary: BinaryRowData should use de-/serialization method of 
MemorySegment with explicit endianness
 Key: FLINK-18408
 URL: https://issues.apache.org/jira/browse/FLINK-18408
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Reporter: Yingjie Cao


I notice that classes like BinaryRowData use de-/serialization method of 
MemorySegment without explicit endianness, I think this is not safe if we 
serialize some data on one machine and deserialize on another machine with 
different endianness. To be one the safe side, I think we should use 
de-/serialization method of MemorySegment with explicit endianness. What do you 
think? (BinaryRowWriter may have the same issue and there may be some other 
classes also suffer from this but I did not notice)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17665) Serialize buffer data type of Buffer into BufferResponse

2020-05-13 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-17665:
---

 Summary: Serialize buffer data type of Buffer into BufferResponse
 Key: FLINK-17665
 URL: https://issues.apache.org/jira/browse/FLINK-17665
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Affects Versions: 1.11.0
Reporter: Yingjie Cao
 Fix For: 1.11.0


After serializing DataType of Buffer into BufferResponse we can get the 
DataType without deserializing the Buffer at downstream which is more 
performance friendly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17572) Remove checkpoint alignment buffered metric from webui

2020-05-08 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-17572:
---

 Summary: Remove checkpoint alignment buffered metric from webui
 Key: FLINK-17572
 URL: https://issues.apache.org/jira/browse/FLINK-17572
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Web Frontend
Affects Versions: 1.11.0
Reporter: Yingjie Cao


After FLINK-16404, we never cache buffers while checkpoint barrier alignment, 
so the checkpoint alignment buffered metric will be always 0, we should remove 
it directly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17568) Task may consume data after checkpoint barrier before performing checkpoint for unaligned checkpoint

2020-05-08 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-17568:
---

 Summary: Task may consume data after checkpoint barrier before 
performing checkpoint for unaligned checkpoint
 Key: FLINK-17568
 URL: https://issues.apache.org/jira/browse/FLINK-17568
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.11.0
Reporter: Yingjie Cao
 Fix For: 1.11.0


For unaligned checkpoint, task may consume data after the checkpoint barrier 
before performing checkpoint which lead to consumption of duplicated data and 
corruption of data stream.

More specifically, when the Netty thread notifies the checkpoint barrier for 
the first time and enqueue a checkpointing task in the mailbox, the task thread 
may still in data consumption loop and if it reads a new checkpoint barrier 
from another channel it will not return to the mailbox and instead it will 
continue to read data until a all data consumed or we have a full record, 
meanwhile, the data after checkpoint barrier may be read and consumed which 
lead to inconsistency.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17564) Inflight data of incoming channel may be disordered for unaligned checkpoint

2020-05-07 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-17564:
---

 Summary: Inflight data of incoming channel may be disordered for 
unaligned checkpoint
 Key: FLINK-17564
 URL: https://issues.apache.org/jira/browse/FLINK-17564
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Affects Versions: 1.11.0
Reporter: Yingjie Cao
 Fix For: 1.11.0


For unaligned checkpoint, when checkpointing the inflight data of incoming 
channel, both task thread and Netty thread may add data to the channel state 
writer. More specifically, the task thread will first request inflight buffers 
from the input channel and add the buffers to the channel state writer, and 
then the Netty thread will add the following up buffers (if any) to the channel 
state writer. The buffer adding of task thread and Netty thread is not 
synchronized so the Netty thread may add buffers before the task thread which 
leads to disorder of the data and corruption of the data stream.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17299) Add remote channel throughput benchmark

2020-04-21 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-17299:
---

 Summary: Add remote channel throughput benchmark
 Key: FLINK-17299
 URL: https://issues.apache.org/jira/browse/FLINK-17299
 Project: Flink
  Issue Type: Sub-task
  Components: Benchmarks
Affects Versions: 1.11.0
Reporter: Yingjie Cao
 Fix For: 1.11.0


To guarantee that the implementation of FLINK-16404 does not introduce any 
regressions for fast checkpoint scenario (1s checkpoint Interval and checkpoint 
complete in 1s), we introduce this benchmark.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17208) Reduce redundant data available notification of PipelinedSubpartition

2020-04-17 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-17208:
---

 Summary: Reduce redundant data available notification of 
PipelinedSubpartition
 Key: FLINK-17208
 URL: https://issues.apache.org/jira/browse/FLINK-17208
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Affects Versions: 1.11.0
Reporter: Yingjie Cao
 Fix For: 1.11.0


There are three scenarios that may lead to redundant data available 
notifications for PipelinedSubpartition:
 # Add a new BufferConsumer after an event;
 # Finish the current subpartition;
 # Flush an event buffer.

We can avoid these redundant data available notifications.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-17107) CheckpointCoordinatorConfiguration#isExactlyOnce() is inconsistent with StreamConfig#getCheckpointMode()

2020-04-13 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-17107:
---

 Summary: CheckpointCoordinatorConfiguration#isExactlyOnce() is 
inconsistent with StreamConfig#getCheckpointMode()
 Key: FLINK-17107
 URL: https://issues.apache.org/jira/browse/FLINK-17107
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Checkpointing
Reporter: Yingjie Cao


CheckpointCoordinatorConfiguration#isExactlyOnce() is inconsistent with 
StreamConfig#getCheckpointMode() when checkpoint is disabled. 
CheckpointCoordinatorConfiguration#isExactlyOnce() returns true if checkpoint 
mode is  EXACTLY_ONCE mode and return false if checkpoint mode is AT_LEAST_ONCE 
while StreamConfig#getCheckpointMode() will always return AT_LEAST_ONCE which 
means always not exactly once.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-16298) GroupWindowTableAggregateITCase.testEventTimeTumblingWindow fails on Travis

2020-02-26 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-16298:
---

 Summary: 
GroupWindowTableAggregateITCase.testEventTimeTumblingWindow fails on Travis
 Key: FLINK-16298
 URL: https://issues.apache.org/jira/browse/FLINK-16298
 Project: Flink
  Issue Type: Test
  Components: Tests
Reporter: Yingjie Cao


GroupWindowTableAggregateITCase.testEventTimeTumblingWindow fails on Travis. 
link: [https://api.travis-ci.com/v3/job/291610383/log.txt]

stack:
{code:java}
05:38:01.976 [ERROR] Tests run: 18, Failures: 0, Errors: 1, Skipped: 0, Time 
elapsed: 7.537 s <<< FAILURE! - in 
org.apache.flink.table.planner.runtime.stream.table.GroupWindowTableAggregateITCase
05:38:01.976 [ERROR] 
testEventTimeTumblingWindow[StateBackend=HEAP](org.apache.flink.table.planner.runtime.stream.table.GroupWindowTableAggregateITCase)
  Time elapsed: 0.459 s  <<< ERROR!
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at 
org.apache.flink.table.planner.runtime.stream.table.GroupWindowTableAggregateITCase.testEventTimeTumblingWindow(GroupWindowTableAggregateITCase.scala:151)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=1, 
backoffTimeMS=0)
Caused by: java.lang.Exception: Artificial Failure
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15455) Enable TCP connection reuse across multiple jobs.

2020-01-02 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-15455:
---

 Summary: Enable TCP connection reuse across multiple jobs.
 Key: FLINK-15455
 URL: https://issues.apache.org/jira/browse/FLINK-15455
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Yingjie Cao


Currently, tcp connections can be only reuse by tasks residing in the same 
TaskManager and consumes the same IntermediateResult. And after job finish or 
failover, the TCP connections are closed and new connections must be setup 
latter.

As an improvement, we can make tcp connections a cluster level resource which 
can be reused by multi jobs. The advantages are as follows:
 # Reduce the number of TCP connections so we can save some resources.
 # Reduce the overhead of connection setup and close so restarted jobs after 
failover and latter jobs submitted to the same session cluster can reuse the 
previous connections.

We use Flink session cluster as a service for ad-hoc queries and the users can 
produce some statistics or create some statements and reports at any time. Most 
of the queries finish in 2s and we find tcp connection reuse help a lot to 
reduce the average execution time which means more queries can be processed 
using the same resource and time with even better user experience.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15166) Shuffle data compression wrongly decrease the buffer reference count.

2019-12-09 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-15166:
---

 Summary: Shuffle data compression wrongly decrease the buffer 
reference count.
 Key: FLINK-15166
 URL: https://issues.apache.org/jira/browse/FLINK-15166
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.10.0
Reporter: Yingjie Cao
 Fix For: 1.10.0


FLINK-15140 report two relevant problems which are both triggered by broadcast 
partitioner, to make it more clear, I create this Jira to addresses the 
problems separately.

 

For blocking shuffle compression, we recycle the compressed intermediate buffer 
each time after we write data out, however when the data is not compressed, the 
return buffer is the original buffer and should not be recycled, but we wrongly 
recycled it.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15140) Shuffle data compression does not work with BroadcastRecordWriter.

2019-12-08 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-15140:
---

 Summary: Shuffle data compression does not work with 
BroadcastRecordWriter.
 Key: FLINK-15140
 URL: https://issues.apache.org/jira/browse/FLINK-15140
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.10.0
Reporter: Yingjie Cao
 Fix For: 1.10.0


I tested the newest code of master branch last weekend with more test cases. 
Unfortunately, several problems were encountered, including a bug of 
compression.

When BroadcastRecordWriter is used, for pipelined mode, because the compressor 
copies the data back to the input buffer, however, the underlying buffer is 
shared when BroadcastRecordWriter is used. So we can not copy the compressed 
buffer back to the input buffer if the underlying buffer is shared. For 
blocking mode, we wrongly recycle the buffer when buffer is not compressed, and 
the problem is also triggered when BroadcastRecordWriter is used.

To fix the problem, for blocking shuffle, the reference counter should be 
maintained correctly, for pipelined shuffle, the simplest way maybe disable 
compression is the underlying buffer is shared. I will open a PR to fix the 
problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-15030) Potential deadlock for bounded blocking ResultPartition.

2019-12-02 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-15030:
---

 Summary: Potential deadlock for bounded blocking ResultPartition.
 Key: FLINK-15030
 URL: https://issues.apache.org/jira/browse/FLINK-15030
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.9.1, 1.9.0
Reporter: Yingjie Cao


Currently, the BoundedBlockingSubpartition relies on the add of the next 
BufferConsumer to flush and recycle the previous one, which means we need at 
least (numsubpartition + 1) buffers to make the bounded blocking 
ResultPartition work. However, the ResultPartitionFactory gives only 
(numsubpartition) required buffers to the BoundedBlockingSubpartition which may 
lead to deadlock.

This problem exists only in version 1.9. In version 1.10 (master), this problem 
has been fixed by this commit: 2c8b4ef572f05bf4740b7e204af1e5e709cd945c.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-14872) Potential deadlock for task reading from blocking ResultPartition.

2019-11-20 Thread Yingjie Cao (Jira)
Yingjie Cao created FLINK-14872:
---

 Summary: Potential deadlock for task reading from blocking 
ResultPartition.
 Key: FLINK-14872
 URL: https://issues.apache.org/jira/browse/FLINK-14872
 Project: Flink
  Issue Type: Bug
Reporter: Yingjie Cao


Currently, the buffer pool size of InputGate reading from blocking 
ResultPartition is unbounded which have a potential of using too many buffers 
and may lead to ResultPartition of the same task can not acquire enough core 
buffers and finally lead to deadlock.

Considered the following case:

Core buffers are reserved for InputGate and ResultPartition -> InputGate 
consumes lots of Buffer (not including the buffer reserved for ResultPartition) 
-> Other tasks acquire exclusive buffer for InputGate and trigger redistribute 
of Buffers (Buffers taken by previous InputGate can not be released) -> The 
first task of which InputGate uses lots of buffers begin to emit records but 
can not acquire enough core Buffers (Some operators may not emit records out 
immediately or there is just nothing to emit) -> Deadlock.

 

I think we can fix this problem by limit the number of Buffers can be allocated 
by a InputGate which reads from blocking ResultPartition.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >