Sergey Anokhovskiy created FLINK-35899:
------------------------------------------

             Summary: Accumulated TumblingProcessingTimeWindows in the state 
for a few days
                 Key: FLINK-35899
                 URL: https://issues.apache.org/jira/browse/FLINK-35899
             Project: Flink
          Issue Type: Bug
          Components: API / Core, Runtime / Checkpointing
    Affects Versions: 1.13.5
            Reporter: Sergey Anokhovskiy


One of the sub-task out of 40 of the TumblingProcessingTimeWindows operator 
accumulated windows over a day. The next restart of the job caused it to 
process the accumulated windows, which caused the checkpointing timeout. Once 
the sub-task has processed the old windows (might take several hours) it works 
normally again. *Could you please come up with the ideas of what might cause 
the window operator sub-task to accumulate old windows for days?*

 

Here is more context:



At Yelp we built a connector to the database based on Flink. We aimed to reduce 
the load to the database. That's why a time window with reduce function was 
introduced in that only the latest version of the document does matter for us. 
Here is the configuration of the window: 

 

private def windowedStream(input: DataStream[FieldsToIndex]) = {

    input.keyBy(f => f.id)

      
.window(TumblingProcessingTimeWindows.of(seconds(elasticPipeJobConfig.deduplicationWindowTimeInSec)))

      .reduce(

        (e1, e2) => {

          if (e1.messageTimestamp > e2.messageTimestamp) {

            e1

          } else {

            e2

          }

        })

  }

 

It works as expected most of the time but a few times per year on sub-task of 
the dedup_window operator got stuck and caused checkpointing to fail. We took a 
look at the state data and added extra logging to the custom trigger and here 
is what we found:


 # It turned out that the state of the 17th (different number every incident) 
sub-task is more than 100 times bigger than the others (see the txt file). It 
caused the job and particular sub-task to initialize slowly (see the screen 
shot)


 # Statics of RocksDB tables:
_timer_state/processing_window-timers ~8MB,
_timer_state/event_window-timers was 0
window-contents was ~20GB with ~960k entries and ~14k unique message ids. 
Counts by id were distributed (see ids_destribution.txt)


 # Each window-contents value has associated timer entry in 
_timer_state/processing_window-timers The timers accumulated gradually, time 
(Pacific) bucket counts (see timers.txt)


 # The earliest entries are from 9:23am Pacific on June 26th, over a day before 
the incident. Flink log showed that a taskmanager went away at 9:28, forcing a 
job restart (see taskmanager_exception.txt). The job came back up at ~9:41am.


 # Debug logs in the custom trigger in the functions 
onClear/onProcessingTime/onElement/onEventTime confirmed that the job is busy 
on processing the old windows


 # It seems that the subtask was in a bad state after the restart. It is 
unclear if it was stuck not processing any window events, or if it was just not 
removing them from the state. The next time the job restarted it had to churn 
through a days worth of writes, causing the delay.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to