[ 
https://issues.apache.org/jira/browse/FLINK-4973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15748686#comment-15748686
 ] 

ASF GitHub Bot commented on FLINK-4973:
---------------------------------------

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/3008

    [FLINK-4973] Let LatencyMarksEmitter use StreamTask's ProcessingTimeService

    The LatencyMarksEmitter class uses now the StreamTask's 
ProcessingTimeService to schedule
    latency mark emission. For that the ProcessingTimeService was extended to 
have the method
    scheduleAtFixedRate to schedule repeated tasks. The latency mark emission 
is such a repeated
    task.
    
    cc @rmetzger.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink fixLatencyMarksEmitter

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3008.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3008
    
----
commit c06b3457f2b98969f444da07569656ac07727166
Author: Till Rohrmann <trohrm...@apache.org>
Date:   2016-12-14T13:53:11Z

    [FLINK-4973] Let LatencyMarksEmitter use StreamTask's ProcessingTimeService
    
    The LatencyMarksEmitter class uses now the StreamTask's 
ProcessingTimeService to schedule
    latency mark emission. For that the ProcessingTimeService was extended to 
have the method
    scheduleAtFixedRate to schedule repeated tasks. The latency mark emission 
is such a repeated
    task.

----


> Flakey Yarn tests due to recently added latency marker
> ------------------------------------------------------
>
>                 Key: FLINK-4973
>                 URL: https://issues.apache.org/jira/browse/FLINK-4973
>             Project: Flink
>          Issue Type: Bug
>          Components: Tests
>    Affects Versions: 1.2.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Critical
>              Labels: test-stability
>             Fix For: 1.2.0
>
>
> The newly introduced {{LatencyMarksEmitter}} emits latency marker on the 
> {{Output}}. This can still happen after the underlying {{BufferPool}} has 
> been destroyed. The occurring exception is then logged:
> {code}
> 2016-10-29 15:00:48,088 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Source: Custom File Source (1/1) switched to FINISHED
> 2016-10-29 15:00:48,089 INFO  org.apache.flink.runtime.taskmanager.Task       
>               - Freeing task resources for Source: Custom File Source (1/1)
> 2016-10-29 15:00:48,089 INFO  org.apache.flink.yarn.YarnTaskManager           
>               - Un-registering task and sending final execution state 
> FINISHED to JobManager for task Source: Custom File Source 
> (8fe0f817fa6d960ea33f6e57e0c3891c)
> 2016-10-29 15:00:48,101 WARN  
> org.apache.flink.streaming.api.operators.AbstractStreamOperator  - Error 
> while emitting latency marker
> java.lang.RuntimeException: Buffer pool is destroyed.
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:99)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:734)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.run(StreamSource.java:134)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>       at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
>       at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:144)
>       at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
>       at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:118)
>       at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:103)
>       at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:104)
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:96)
>       ... 9 more
> {code}
> This exception is clearly related to the shutdown of a stream operator and 
> does not indicate a wrong behaviour. Since the yarn tests simply scan the log 
> for some keywords (including exception) such a case can make them fail.
> Best if we could make sure that the {{LatencyMarksEmitter}} would only emit 
> latency marker if the {{Output}} would still be active. But we could also 
> simply not log exceptions which occurred after the stream operator has been 
> stopped.
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/171578846/log.txt



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to