Re: OperatorState partioning when recovering from failure

2017-05-04 Thread Stefan Richter
Hi,

the repartitoning happens indeed as some round-robin algorithm (see 
RoundRobinOperatorStateRepartitioner). This repartitioning happens at the level 
of the checkpoint coordinator in the master on restore, by redistrubution of 
state handles. The state that those handles are pointing to is a black box in 
this place, so all assumptions that we can make is that all partitions can be 
redistributed freely. If we want additional constraints to the repartitioning, 
the user has to apply those when handing over the state partitions, i.e. the 
partitioning into the list state must happen in a way that already groups 
together state partitions that should not end up on separate machines after a 
restore.

Best,
Stefan

> Am 04.05.2017 um 17:29 schrieb Kostas Kloudas :
> 
> Hi Seth,
> 
> Upon restoring, splits will be re-shuffled among the new tasks, and I believe 
> that state is repartitioned 
> in a round robin way (although I am not 100% sure so I am also including 
> Stefan and Aljoscha in this).
> The priority queues will be reconstructed based on the restored elements. So 
> task managers may get
> a relatively equal number of splits, but “recent” ones may be concentrated on 
> a few nodes. This may 
> also have to do with how your monitor sends them to the reader (e.g. all 
> splits of a recent file go to the 
> same node).
> 
> As far as I know, we do not have an option for custom state re-partitioner.
> 
> To see what is restored, you can enable DEBUG logging and this will print 
> upon restoring sth like:
> 
> "ContinuousFileReaderOperator (taskIdx={subtaskIdx}) restored 
> {restoredReaderState}"
> 
> with the restoredReaderState containing the restored splits.
> 
> And something similar upon checkpointing. This will give you a better look in 
> what may be happening.
> 
> Thanks,
> Kostas
> 
>> On May 4, 2017, at 3:45 PM, Seth Wiesman > > wrote:
>> 
>> I am curious about how operator state is repartitioned to subtasks when a 
>> job is resumed from a checkpoint or savepoint. The reason is that I am 
>> having issues with the ContinuousFileReaderOperator when recovering from a 
>> failure. 
>>  
>> I consume most of my data from files off S3. I have a custom file monitor 
>> that understands how to walk my directory structure and outputs 
>> TimestampedFileSplits downstream in chronological order to the stock 
>> ContinuousFileReaderOperator. The reader consumes those splits and stores 
>> them a priority queue based on their last modified time ensuring that files 
>> are read in chronological order which is exactly what I want. The problem is 
>> when recovering, the unread splits being partitioned out to each of the 
>> subtasks seem to be heavily skewed in terms of last modified time.
>>  
>> While each task may have a similar number of files I find then one or two 
>> will have a disproportionate number of old files. This in turn holds back my 
>> watermark (sometimes for several hours depending on the number of unread 
>> splits) which keeps timers from firing, windows from purging, etc.
>>  
>> I was hoping there were some way I could add a custom partitioner to ensure 
>> that splits are uniformly distributed in a temporal manner or if someone had 
>> other ideas of how I could mitigate the problem.
>>  
>> Thank you, 
>>  
>> Seth Wiesman 
>>  
> 



Re: OperatorState partioning when recovering from failure

2017-05-04 Thread Kostas Kloudas
Hi Seth,

Upon restoring, splits will be re-shuffled among the new tasks, and I believe 
that state is repartitioned 
in a round robin way (although I am not 100% sure so I am also including Stefan 
and Aljoscha in this).
The priority queues will be reconstructed based on the restored elements. So 
task managers may get
a relatively equal number of splits, but “recent” ones may be concentrated on a 
few nodes. This may 
also have to do with how your monitor sends them to the reader (e.g. all splits 
of a recent file go to the 
same node).

As far as I know, we do not have an option for custom state re-partitioner.

To see what is restored, you can enable DEBUG logging and this will print upon 
restoring sth like:

"ContinuousFileReaderOperator (taskIdx={subtaskIdx}) restored 
{restoredReaderState}"

with the restoredReaderState containing the restored splits.

And something similar upon checkpointing. This will give you a better look in 
what may be happening.

Thanks,
Kostas

> On May 4, 2017, at 3:45 PM, Seth Wiesman  wrote:
> 
> I am curious about how operator state is repartitioned to subtasks when a job 
> is resumed from a checkpoint or savepoint. The reason is that I am having 
> issues with the ContinuousFileReaderOperator when recovering from a failure. 
>  
> I consume most of my data from files off S3. I have a custom file monitor 
> that understands how to walk my directory structure and outputs 
> TimestampedFileSplits downstream in chronological order to the stock 
> ContinuousFileReaderOperator. The reader consumes those splits and stores 
> them a priority queue based on their last modified time ensuring that files 
> are read in chronological order which is exactly what I want. The problem is 
> when recovering, the unread splits being partitioned out to each of the 
> subtasks seem to be heavily skewed in terms of last modified time.
>  
> While each task may have a similar number of files I find then one or two 
> will have a disproportionate number of old files. This in turn holds back my 
> watermark (sometimes for several hours depending on the number of unread 
> splits) which keeps timers from firing, windows from purging, etc.
>  
> I was hoping there were some way I could add a custom partitioner to ensure 
> that splits are uniformly distributed in a temporal manner or if someone had 
> other ideas of how I could mitigate the problem.
>  
> Thank you, 
>  
> Seth Wiesman 
>  



OperatorState partioning when recovering from failure

2017-05-04 Thread Seth Wiesman
I am curious about how operator state is repartitioned to subtasks when a job 
is resumed from a checkpoint or savepoint. The reason is that I am having 
issues with the ContinuousFileReaderOperator when recovering from a failure.

I consume most of my data from files off S3. I have a custom file monitor that 
understands how to walk my directory structure and outputs 
TimestampedFileSplits downstream in chronological order to the stock 
ContinuousFileReaderOperator. The reader consumes those splits and stores them 
a priority queue based on their last modified time ensuring that files are read 
in chronological order which is exactly what I want. The problem is when 
recovering, the unread splits being partitioned out to each of the subtasks 
seem to be heavily skewed in terms of last modified time.

While each task may have a similar number of files I find then one or two will 
have a disproportionate number of old files. This in turn holds back my 
watermark (sometimes for several hours depending on the number of unread 
splits) which keeps timers from firing, windows from purging, etc.

I was hoping there were some way I could add a custom partitioner to ensure 
that splits are uniformly distributed in a temporal manner or if someone had 
other ideas of how I could mitigate the problem.

Thank you,

Seth Wiesman