Hi Seth,

Upon restoring, splits will be re-shuffled among the new tasks, and I believe 
that state is repartitioned 
in a round robin way (although I am not 100% sure so I am also including Stefan 
and Aljoscha in this).
The priority queues will be reconstructed based on the restored elements. So 
task managers may get
a relatively equal number of splits, but “recent” ones may be concentrated on a 
few nodes. This may 
also have to do with how your monitor sends them to the reader (e.g. all splits 
of a recent file go to the 
same node).

As far as I know, we do not have an option for custom state re-partitioner.

To see what is restored, you can enable DEBUG logging and this will print upon 
restoring sth like:

"ContinuousFileReaderOperator (taskIdx={subtaskIdx}) restored 
{restoredReaderState}"

with the restoredReaderState containing the restored splits.

And something similar upon checkpointing. This will give you a better look in 
what may be happening.

Thanks,
Kostas

> On May 4, 2017, at 3:45 PM, Seth Wiesman <swies...@mediamath.com> wrote:
> 
> I am curious about how operator state is repartitioned to subtasks when a job 
> is resumed from a checkpoint or savepoint. The reason is that I am having 
> issues with the ContinuousFileReaderOperator when recovering from a failure. 
>  
> I consume most of my data from files off S3. I have a custom file monitor 
> that understands how to walk my directory structure and outputs 
> TimestampedFileSplits downstream in chronological order to the stock 
> ContinuousFileReaderOperator. The reader consumes those splits and stores 
> them a priority queue based on their last modified time ensuring that files 
> are read in chronological order which is exactly what I want. The problem is 
> when recovering, the unread splits being partitioned out to each of the 
> subtasks seem to be heavily skewed in terms of last modified time.
>  
> While each task may have a similar number of files I find then one or two 
> will have a disproportionate number of old files. This in turn holds back my 
> watermark (sometimes for several hours depending on the number of unread 
> splits) which keeps timers from firing, windows from purging, etc.
>  
> I was hoping there were some way I could add a custom partitioner to ensure 
> that splits are uniformly distributed in a temporal manner or if someone had 
> other ideas of how I could mitigate the problem.
>  
> Thank you, 
>  
> Seth Wiesman 
>  

Reply via email to