Re: Streaming pipeline failing to complete checkpoints at scale

2016-12-23 Thread Cliff Resnick
Thanks Stephan, those are great suggestions and I look forward to working
through them. The Thread.yield() in the tight loop looks particularly
interesting. In a similar vein, I have noticed that the
ContinuousFIleReader sometimes at load lags behind the
ContinuousFileWatcher and seems to stay that way as buffers fill despite
the much lower cardinality of the Watcher's Source data. As an experiment I
added a forced timeout in the FileWatcher to approximate the FileReader's
consumption time, and I have been able to gain consistent checkpoints this
way. Despite the obvious kludgery (the success is more likely an effect of
simply slowing down the pipeline!),  I'm wondering if there's an
unrecoverable Barrier lag somehow hiding here. I stopped the pipeline and
resumed with more network buffers just to see how that affects things.

One minor note about RocksDb and multiple task managers: the RocksDb
library is loaded to be shared among TaskMangers from flink/tmp, but this
creates a lot of thrash on startup as each TaskManager tries to overwrite
the library, We sometimes use 32 or 36 core instances and this thrash would
be intolerable with that many TaskManagers. As a workaround I added random
a directory creation in the flink-statebackend-rocksdb code. This then
becomes a cleanup issue as directories proliferate but that's a useful
tradeoff for us. Maybe add a config for this?

Thanks again,
Cliff



On Fri, Dec 23, 2016 at 2:25 PM, Stephan Ewen  wrote:

> Hi Cliff!
>
> Sorry to hear that you are running into so much trouble with the
> checkpointing.
>
> Here are a few things we found helping another user that was running into
> a similar issue:
>
> (1) Make sure you use a rather new version of the 1.2 branch (there have
> been some important fixes added to the network in the past weeks)
>
> (2) Use TaskManagers with one slot (start more TaskManagers instead,
> multiple TaskManagers can run on the same machine). That way, network
> connections do not get into each other's way. Flink's connection
> multiplexing has a vulnerability that may slow down networks in some cases,
> which can affect checkpoints.
>
> (3) This is very important: Try to move all state that is large to keyed
> state (value state, list state, reducing state, ...) and use RocksDB as the
> state backend. That way snapshots happen asynchronously, which greatly
> helps the checkpoints.
>
> (4) Briefly looking over the File Reading Operator code, it could be that
> the operator does not release the checkpoint lock long enough for barriers
> to get injected in a timely fashion. If you are running a custom Flink
> build anyways, you could try to change the following code block in the
> "ContinuousFileReaderOperator" (as an experiment, not a final fix)
>
> while (!format.reachedEnd()) {
> synchronized (checkpointLock) {
> nextElement = format.nextRecord(nextElement);
> if (nextElement != null) {
> readerContext.collect(nextElement);
> } else {
> break;
> }
> }
> }
>
> while (!format.reachedEnd()) {
> synchronized (checkpointLock) {
> nextElement = format.nextRecord(nextElement);
> if (nextElement != null) {
> readerContext.collect(nextElement);
> } else {
> break;
> }
> }
> // give the checkpoint thread a change to work
> Thread.yield();
> }
>
>
> If you are already doing all of that, or if this does not help, we have
> two courses of action (if you can bear with us for a bit):
>
>   - There is some much enhanced Checkpoint Monitoring almost ready to be
> merged. That should help in fining out where the barriers get delayed.
>
>   - Finally, we are experimenting with some other checkpoint alignment
> variants (alternatives to the BarrierBuffer). We can ping you when we have
> an experimental branch with that.
>
> Hope we get this under control!
>
> Best,
> Stephan
>
>
> On Fri, Dec 23, 2016 at 3:47 PM, Cliff Resnick  wrote:
>
>> We are running a DataStream pipeline using Exactly Once/Event Time
>> semantics on 1.2-SNAPSHOT. The pipeline sources from S3 using the
>> ContinuousFileReaderOperator. We use a custom version of the
>> ContinuousFileMonitoringFunction since our source directory changes over
>> time. The pipeline transforms and aggregates tuples of data that is steady
>> over time (spike-less), windowing by hour with an allowed lateness of 6
>> hours. We are running on ec2 c4 instances in a simple YARN setup.
>>
>> What we are seeing is that as we scale the system to hundreds of cores
>> reading around 10 million events per second, the pipeline might checkpoint
>> up to few times before it reaches a state where it becomes unable to
>> complete a checkpoint. The checkpoint interval does not seem to matter as
>> we've tested intervals from 5 minutes to one hour and timeouts up to one
>> hour as well.
>> What we'll usually see is something like:
>>
>> checkpoint 1 (2G) 1 minute
>> checkpoint 2 (3-4G)  1-5 minutes
>> checkpoint 3 (5-6G) 1-6 minutes
>> checkpoint 4-5 (mixed bag, 4-25 minutes, or never)
>> 

Re: Streaming pipeline failing to complete checkpoints at scale

2016-12-23 Thread Stephan Ewen
Hi Cliff!

Sorry to hear that you are running into so much trouble with the
checkpointing.

Here are a few things we found helping another user that was running into a
similar issue:

(1) Make sure you use a rather new version of the 1.2 branch (there have
been some important fixes added to the network in the past weeks)

(2) Use TaskManagers with one slot (start more TaskManagers instead,
multiple TaskManagers can run on the same machine). That way, network
connections do not get into each other's way. Flink's connection
multiplexing has a vulnerability that may slow down networks in some cases,
which can affect checkpoints.

(3) This is very important: Try to move all state that is large to keyed
state (value state, list state, reducing state, ...) and use RocksDB as the
state backend. That way snapshots happen asynchronously, which greatly
helps the checkpoints.

(4) Briefly looking over the File Reading Operator code, it could be that
the operator does not release the checkpoint lock long enough for barriers
to get injected in a timely fashion. If you are running a custom Flink
build anyways, you could try to change the following code block in the
"ContinuousFileReaderOperator" (as an experiment, not a final fix)

while (!format.reachedEnd()) {
synchronized (checkpointLock) {
nextElement = format.nextRecord(nextElement);
if (nextElement != null) {
readerContext.collect(nextElement);
} else {
break;
}
}
}

while (!format.reachedEnd()) {
synchronized (checkpointLock) {
nextElement = format.nextRecord(nextElement);
if (nextElement != null) {
readerContext.collect(nextElement);
} else {
break;
}
}
// give the checkpoint thread a change to work
Thread.yield();
}


If you are already doing all of that, or if this does not help, we have two
courses of action (if you can bear with us for a bit):

  - There is some much enhanced Checkpoint Monitoring almost ready to be
merged. That should help in fining out where the barriers get delayed.

  - Finally, we are experimenting with some other checkpoint alignment
variants (alternatives to the BarrierBuffer). We can ping you when we have
an experimental branch with that.

Hope we get this under control!

Best,
Stephan


On Fri, Dec 23, 2016 at 3:47 PM, Cliff Resnick  wrote:

> We are running a DataStream pipeline using Exactly Once/Event Time
> semantics on 1.2-SNAPSHOT. The pipeline sources from S3 using the
> ContinuousFileReaderOperator. We use a custom version of the
> ContinuousFileMonitoringFunction since our source directory changes over
> time. The pipeline transforms and aggregates tuples of data that is steady
> over time (spike-less), windowing by hour with an allowed lateness of 6
> hours. We are running on ec2 c4 instances in a simple YARN setup.
>
> What we are seeing is that as we scale the system to hundreds of cores
> reading around 10 million events per second, the pipeline might checkpoint
> up to few times before it reaches a state where it becomes unable to
> complete a checkpoint. The checkpoint interval does not seem to matter as
> we've tested intervals from 5 minutes to one hour and timeouts up to one
> hour as well.
> What we'll usually see is something like:
>
> checkpoint 1 (2G) 1 minute
> checkpoint 2 (3-4G)  1-5 minutes
> checkpoint 3 (5-6G) 1-6 minutes
> checkpoint 4-5 (mixed bag, 4-25 minutes, or never)
> checkpoint 6-n never
>
> We've added debugging output to Flink internal code, e.g.
> BarrierBuffer.java. From the debugging output it's clear that the actual
> checkpoint per operator always completes in at most several seconds. What
> seem to be happening, however, is that CheckpointBarriers start to become
> slower to arrive, and after a few checkpoints it gets worse with
> CheckpointBarriers going greatly askew and finally never arriving.
> Meanwhile we can see that downstream counts are closely tailing upstream
> counts,  < .01  behind, but once the barrier flow seemingly stops, the
> downstream slows to a stop as (I guess) network buffers fill, then the
> pipeline is dead.
>
> Meanwhile, cluster resources are not being stressed.
>
> At this point we've stripped down the pipeline, tried various StateBackend
> configs, etc. but the result is invariably the same sad story. It would be
> great if somebody could provide more insight into where things might be
> going wrong. Hopefully this is a simple config issue, but we'd be open to
> any and all suggestions regarding testing, tweaking, etc.
>
> -Cliff
>
>
>
>
>
>


Streaming pipeline failing to complete checkpoints at scale

2016-12-23 Thread Cliff Resnick
We are running a DataStream pipeline using Exactly Once/Event Time
semantics on 1.2-SNAPSHOT. The pipeline sources from S3 using the
ContinuousFileReaderOperator. We use a custom version of the
ContinuousFileMonitoringFunction since our source directory changes over
time. The pipeline transforms and aggregates tuples of data that is steady
over time (spike-less), windowing by hour with an allowed lateness of 6
hours. We are running on ec2 c4 instances in a simple YARN setup.

What we are seeing is that as we scale the system to hundreds of cores
reading around 10 million events per second, the pipeline might checkpoint
up to few times before it reaches a state where it becomes unable to
complete a checkpoint. The checkpoint interval does not seem to matter as
we've tested intervals from 5 minutes to one hour and timeouts up to one
hour as well.
What we'll usually see is something like:

checkpoint 1 (2G) 1 minute
checkpoint 2 (3-4G)  1-5 minutes
checkpoint 3 (5-6G) 1-6 minutes
checkpoint 4-5 (mixed bag, 4-25 minutes, or never)
checkpoint 6-n never

We've added debugging output to Flink internal code, e.g.
BarrierBuffer.java. From the debugging output it's clear that the actual
checkpoint per operator always completes in at most several seconds. What
seem to be happening, however, is that CheckpointBarriers start to become
slower to arrive, and after a few checkpoints it gets worse with
CheckpointBarriers going greatly askew and finally never arriving.
Meanwhile we can see that downstream counts are closely tailing upstream
counts,  < .01  behind, but once the barrier flow seemingly stops, the
downstream slows to a stop as (I guess) network buffers fill, then the
pipeline is dead.

Meanwhile, cluster resources are not being stressed.

At this point we've stripped down the pipeline, tried various StateBackend
configs, etc. but the result is invariably the same sad story. It would be
great if somebody could provide more insight into where things might be
going wrong. Hopefully this is a simple config issue, but we'd be open to
any and all suggestions regarding testing, tweaking, etc.

-Cliff