[ https://issues.apache.org/jira/browse/FLINK-4973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15748686#comment-15748686 ]
ASF GitHub Bot commented on FLINK-4973: --------------------------------------- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/3008 [FLINK-4973] Let LatencyMarksEmitter use StreamTask's ProcessingTimeService The LatencyMarksEmitter class uses now the StreamTask's ProcessingTimeService to schedule latency mark emission. For that the ProcessingTimeService was extended to have the method scheduleAtFixedRate to schedule repeated tasks. The latency mark emission is such a repeated task. cc @rmetzger. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixLatencyMarksEmitter Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3008.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3008 ---- commit c06b3457f2b98969f444da07569656ac07727166 Author: Till Rohrmann <trohrm...@apache.org> Date: 2016-12-14T13:53:11Z [FLINK-4973] Let LatencyMarksEmitter use StreamTask's ProcessingTimeService The LatencyMarksEmitter class uses now the StreamTask's ProcessingTimeService to schedule latency mark emission. For that the ProcessingTimeService was extended to have the method scheduleAtFixedRate to schedule repeated tasks. The latency mark emission is such a repeated task. ---- > Flakey Yarn tests due to recently added latency marker > ------------------------------------------------------ > > Key: FLINK-4973 > URL: https://issues.apache.org/jira/browse/FLINK-4973 > Project: Flink > Issue Type: Bug > Components: Tests > Affects Versions: 1.2.0 > Reporter: Till Rohrmann > Assignee: Till Rohrmann > Priority: Critical > Labels: test-stability > Fix For: 1.2.0 > > > The newly introduced {{LatencyMarksEmitter}} emits latency marker on the > {{Output}}. This can still happen after the underlying {{BufferPool}} has > been destroyed. The occurring exception is then logged: > {code} > 2016-10-29 15:00:48,088 INFO org.apache.flink.runtime.taskmanager.Task > - Source: Custom File Source (1/1) switched to FINISHED > 2016-10-29 15:00:48,089 INFO org.apache.flink.runtime.taskmanager.Task > - Freeing task resources for Source: Custom File Source (1/1) > 2016-10-29 15:00:48,089 INFO org.apache.flink.yarn.YarnTaskManager > - Un-registering task and sending final execution state > FINISHED to JobManager for task Source: Custom File Source > (8fe0f817fa6d960ea33f6e57e0c3891c) > 2016-10-29 15:00:48,101 WARN > org.apache.flink.streaming.api.operators.AbstractStreamOperator - Error > while emitting latency marker > java.lang.RuntimeException: Buffer pool is destroyed. > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:99) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:734) > at > org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.run(StreamSource.java:134) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.IllegalStateException: Buffer pool is destroyed. > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:144) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:118) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:103) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:104) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:96) > ... 9 more > {code} > This exception is clearly related to the shutdown of a stream operator and > does not indicate a wrong behaviour. Since the yarn tests simply scan the log > for some keywords (including exception) such a case can make them fail. > Best if we could make sure that the {{LatencyMarksEmitter}} would only emit > latency marker if the {{Output}} would still be active. But we could also > simply not log exceptions which occurred after the stream operator has been > stopped. > https://s3.amazonaws.com/archive.travis-ci.org/jobs/171578846/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)