Hi Seth, Upon restoring, splits will be re-shuffled among the new tasks, and I believe that state is repartitioned in a round robin way (although I am not 100% sure so I am also including Stefan and Aljoscha in this). The priority queues will be reconstructed based on the restored elements. So task managers may get a relatively equal number of splits, but “recent” ones may be concentrated on a few nodes. This may also have to do with how your monitor sends them to the reader (e.g. all splits of a recent file go to the same node).
As far as I know, we do not have an option for custom state re-partitioner. To see what is restored, you can enable DEBUG logging and this will print upon restoring sth like: "ContinuousFileReaderOperator (taskIdx={subtaskIdx}) restored {restoredReaderState}" with the restoredReaderState containing the restored splits. And something similar upon checkpointing. This will give you a better look in what may be happening. Thanks, Kostas > On May 4, 2017, at 3:45 PM, Seth Wiesman <swies...@mediamath.com> wrote: > > I am curious about how operator state is repartitioned to subtasks when a job > is resumed from a checkpoint or savepoint. The reason is that I am having > issues with the ContinuousFileReaderOperator when recovering from a failure. > > I consume most of my data from files off S3. I have a custom file monitor > that understands how to walk my directory structure and outputs > TimestampedFileSplits downstream in chronological order to the stock > ContinuousFileReaderOperator. The reader consumes those splits and stores > them a priority queue based on their last modified time ensuring that files > are read in chronological order which is exactly what I want. The problem is > when recovering, the unread splits being partitioned out to each of the > subtasks seem to be heavily skewed in terms of last modified time. > > While each task may have a similar number of files I find then one or two > will have a disproportionate number of old files. This in turn holds back my > watermark (sometimes for several hours depending on the number of unread > splits) which keeps timers from firing, windows from purging, etc. > > I was hoping there were some way I could add a custom partitioner to ensure > that splits are uniformly distributed in a temporal manner or if someone had > other ideas of how I could mitigate the problem. > > Thank you, > > Seth Wiesman >