elon_X created FLINK-35076:
------------------------------

             Summary: Watermark alignment will cause data flow to experience 
serious shake
                 Key: FLINK-35076
                 URL: https://issues.apache.org/jira/browse/FLINK-35076
             Project: Flink
          Issue Type: Improvement
          Components: Runtime / Coordination
    Affects Versions: 1.16.1
            Reporter: elon_X
         Attachments: image-2024-04-10-20-13-14-752.png, 
image-2024-04-10-20-15-05-731.png, image-2024-04-10-20-23-13-872.png

In our company, there is a requirement scenario for multi-stream join 
operations, we are making modifications based on Flink watermark alignment, 
then I found that the final join output would experience serious shake.

and I analyzed the reasons: an upstream topic has more than 300 partitions. The 
number of partitions requested for this topic is too large, causing some 
partitions to frequently experience intermittent writes with QPS=0. This 
phenomenon is more serious between 2 am and 5 am.However, the overall topic 
writing is very smooth.

 

The final join output will experience serious shake, as shown in the following 
diagram:

!image-2024-04-10-20-15-05-731.png!

Root cause:
 # The {{SourceOperator#emitLatestWatermark}} reports the lastEmittedWatermark 
to the SourceCoordinator.
 # If the partition write is zero during a certain period, the 
lastEmittedWatermark sent by the subtask corresponding to that partition 
remains unchanged.
 # The SourceCoordinator aggregates the watermarks of all subtasks according to 
the watermark group and takes the smallest watermark. This means that the 
maxAllowedWatermark may remain unchanged for some time, even though the overall 
upstream data flow is moving forward, until that minimum value is updated. Only 
then will everything change, which will manifest as serious shake in the output 
data stream.

I think choosing the global minimum might not be a good option. Using min/max 
could more likely encounter some edge cases. Perhaps choosing a median value 
would be more appropriate? Or a more complex selection strategy?

If replaced with a median value, it can ensure that the overall data flow is 
very smooth:

!image-2024-04-10-20-23-13-872.png!

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to