[ https://issues.apache.org/jira/browse/FLINK-3688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15223353#comment-15223353 ]
Konstantin Knauf commented on FLINK-3688: ----------------------------------------- Regarding dropping Watermarks in {{StreamRecordSerializer.serialize()}}. Did you mean to change {{StreamRecordSerializer<T> extends TypeSerializer<StreamRecord<T>}} to {{StreamRecordSerializer<T> extends TypeSerializer<StreamElement>}}? If so questions about that: # This induces some changes to {{WindowedStream}} and other classes, like the {{KeyedCEPPatternOperator}}, which affects type safety there. So I am not to sure about the change. # How to handle non-StreamRecord StreamElements in copy-methods. I throw RuntimeExceptions now. You can have a look at this change here: https://github.com/knaufk/flink/tree/FLINK-3688. If not, we have to drop Watermarks somewhere higher up the chain, I think, or am I missing something? We could also only merge 4. as this fixes the issue, and think about how to change StreamRecordSerializer in a separate issue. > ClassCastException in StreamRecordSerializer when WindowOperator.trigger() is > called and TimeCharacteristic = ProcessingTime > ---------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-3688 > URL: https://issues.apache.org/jira/browse/FLINK-3688 > Project: Flink > Issue Type: Bug > Affects Versions: 1.0.0 > Reporter: Konstantin Knauf > Assignee: Konstantin Knauf > Priority: Critical > > Hi, > when using {{TimeCharacteristics.ProcessingTime}} a ClassCastException is > thrown in {{StreamRecordSerializer}} when > {{WindowOperator.processWatermark()}} is called from > {{WindowOperator.trigger()}}, i.e. whenever a ProcessingTimeTimer is > triggered. > The problem seems to be that {{processWatermark()}} is also called in > {{trigger()}}, when time characteristic is ProcessingTime, but in > {{RecordWriterOutput}} {{enableWatermarkMultiplexing}} is {{false}} and the > {{TypeSerializer}} is a {{StreamRecordSerializer}}, which ultimately leads to > the ClassCastException. Do you agree? > If this is indeed a bug, there several possible solutions. > # Only calling {{processWatermark()}} in {{trigger()}}, when > TimeCharacteristic is EventTime > # Not calling {{processWatermark()}} in {{trigger()}} at all, instead wait > for the next watermark to trigger the EventTimeTimers with a timestamp behind > the current watermark. This is, of course, a trade off. > # Using {{MultiplexingStreamRecordSerializer}} all the time, but I have no > idea what the side effect of this change would be. I assume there is a reason > for existence of the StreamRecordSerializer ;) > StackTrace: > {quote} > TimerException\{java.lang.RuntimeException: > org.apache.flink.streaming.api.watermark.Watermark cannot be cast to > org.apache.flink.streaming.runtime.streamrecord.StreamRecord\} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:716) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.run(FutureTask.java:262) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:744) > Caused by: java.lang.RuntimeException: > org.apache.flink.streaming.api.watermark.Watermark cannot be cast to > org.apache.flink.streaming.runtime.streamrecord.StreamRecord > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:93) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:370) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processWatermark(WindowOperator.java:293) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.trigger(WindowOperator.java:323) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:710) > ... 7 more > Caused by: java.lang.ClassCastException: > org.apache.flink.streaming.api.watermark.Watermark cannot be cast to > org.apache.flink.streaming.runtime.streamrecord.StreamRecord > at > org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.serialize(StreamRecordSerializer.java:42) > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) > at > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.broadcastEmit(RecordWriter.java:109) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.broadcastEmit(StreamRecordWriter.java:95) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:90) > ... 11 more > {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)