Github user rmetzger commented on the pull request:
https://github.com/apache/flink/pull/938#issuecomment-125520977
I tried this pull request on a cluster, because the current code is failing
with the following exception when running it with a checkpoint interval of 1
second and a buffer timeout of 0 ms.
```
01:08:41,472 ERROR
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask - Flat Map
(114/120) failed
java.lang.RuntimeException: Error in barrier buffer logic
at
org.apache.flink.streaming.runtime.io.BarrierBuffer.releaseBlocks(BarrierBuffer.java:192)
at
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:252)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:163)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:70)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:577)
at java.lang.Thread.run(Thread.java:745)
01:08:41,472 INFO org.apache.flink.runtime.taskmanager.Task
- Flat Map (114/120) switched to FAILED with exception.
java.lang.RuntimeException: Error in barrier buffer logic
at
org.apache.flink.streaming.runtime.io.BarrierBuffer.releaseBlocks(BarrierBuffer.java:192)
at
org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:252)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:163)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.invoke(OneInputStreamTask.java:70)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:577)
at java.lang.Thread.run(Thread.java:745)
```
The code from this PR is working, however, I got the following WARN
messages, probably because the barriers are triggered too early (in the job
lifecycle):
```
08:59:24,775 WARN org.apache.flink.streaming.runtime.io.BarrierBuffer
- Received checkpoint barrier for checkpoint 4 before completing current
checkpoint 3. Skipping current checkpoint.
08:59:24,785 WARN org.apache.flink.streaming.runtime.io.BarrierBuffer
- Received checkpoint barrier for checkpoint 4 before completing current
checkpoint 3. Skipping current checkpoint.
08:59:24,786 WARN org.apache.flink.streaming.runtime.io.BarrierBuffer
- Received checkpoint barrier for checkpoint 4 before completing current
checkpoint 3. Skipping current checkpoint.
08:59:24,787 WARN org.apache.flink.streaming.runtime.io.BarrierBuffer
- Received checkpoint barrier for checkpoint 4 before completing current
checkpoint 3. Skipping current checkpoint.
08:59:25,223 WARN org.apache.flink.streaming.runtime.io.BarrierBuffer
- Received checkpoint barrier for checkpoint 5 before completing current
checkpoint 4. Skipping current checkpoint.
08:59:25,225 WARN org.apache.flink.streaming.runtime.io.BarrierBuffer
- Received checkpoint barrier for checkpoint 5 before completing current
checkpoint 4. Skipping current checkpoint.
08:59:25,226 WARN org.apache.flink.streaming.runtime.io.BarrierBuffer
- Received checkpoint barrier for checkpoint 5 before completing current
checkpoint 4. Skipping current checkpoint.
08:59:25,229 WARN org.apache.flink.streaming.runtime.io.BarrierBuffer
- Received checkpoint barrier for checkpoint 5 before completing current
checkpoint 4. Skipping current checkpoint.
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---