[ 
https://issues.apache.org/jira/browse/FLINK-3688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15230059#comment-15230059
 ] 

ASF GitHub Bot commented on FLINK-3688:
---------------------------------------

GitHub user knaufk opened a pull request:

    https://github.com/apache/flink/pull/1861

    [FLINK-3688] WindowOperator.trigger() does not emit Watermark anymore

    Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful 
description of your changes.
    
    - [ ] General
      - The pull request references the related JIRA issue
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [ ] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed
    
    Conflicts:
        
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/knaufk/flink FLINK-3688

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1861.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1861
    
----
commit 0afb53afa19d6ce791367fe15729bec27d114a8b
Author: Konstantin Knauf <konstantin.kn...@tngtech.com>
Date:   2016-04-03T11:57:35Z

    [FLINK-3688] WindowOperator.trigger() does not emit Watermark anymore
    
    Conflicts:
        
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java

----


> ClassCastException in StreamRecordSerializer when WindowOperator.trigger() is 
> called and TimeCharacteristic = ProcessingTime
> ----------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-3688
>                 URL: https://issues.apache.org/jira/browse/FLINK-3688
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.0.0
>            Reporter: Konstantin Knauf
>            Assignee: Konstantin Knauf
>            Priority: Critical
>
> Hi,
> when using {{TimeCharacteristics.ProcessingTime}} a ClassCastException is 
> thrown in {{StreamRecordSerializer}} when 
> {{WindowOperator.processWatermark()}} is called from 
> {{WindowOperator.trigger()}}, i.e. whenever a ProcessingTimeTimer is 
> triggered. 
> The problem seems to be that {{processWatermark()}} is also called in 
> {{trigger()}}, when time characteristic is ProcessingTime, but in 
> {{RecordWriterOutput}} {{enableWatermarkMultiplexing}} is {{false}} and the 
> {{TypeSerializer}} is a {{StreamRecordSerializer}}, which ultimately leads to 
> the ClassCastException. Do you agree?
> If this is indeed a bug, there several possible solutions.
> # Only calling {{processWatermark()}} in {{trigger()}}, when 
> TimeCharacteristic is EventTime
> # Not calling {{processWatermark()}} in {{trigger()}} at all, instead wait 
> for the next watermark to trigger the EventTimeTimers with a timestamp behind 
> the current watermark. This is, of course, a trade off. 
> # Using {{MultiplexingStreamRecordSerializer}} all the time, but I have no 
> idea what the side effect of this change would be. I assume there is a reason 
> for existence of the StreamRecordSerializer ;)
> StackTrace: 
> {quote}
> TimerException\{java.lang.RuntimeException: 
> org.apache.flink.streaming.api.watermark.Watermark cannot be cast to 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord\}
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:716)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
>       at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:744)
> Caused by: java.lang.RuntimeException: 
> org.apache.flink.streaming.api.watermark.Watermark cannot be cast to 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:93)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:370)
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:293)
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:323)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:710)
>       ... 7 more
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.streaming.api.watermark.Watermark cannot be cast to 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord
>       at 
> org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42)
>       at 
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
>       at 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79)
>       at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109)
>       at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:95)
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:90)
>       ... 11 more
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to