[ 
https://issues.apache.org/jira/browse/FLINK-30032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17635081#comment-17635081
 ] 

luoyuxia commented on FLINK-30032:
----------------------------------

Just curious about why this fix can solve this problem. Could you please 
explain a little bit more?

Another question, when you catch the IOException at line 67 and do the logging, 
will you throw the IOException again?

 

 

> IOException during MAX_WATERMARK emission causes message missing
> ----------------------------------------------------------------
>
>                 Key: FLINK-30032
>                 URL: https://issues.apache.org/jira/browse/FLINK-30032
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.14.0
>            Reporter: Haoze Wu
>            Priority: Major
>
> We are doing testing on Flink (version 1.14.0). We launch 1 
> StandaloneSessionClusterEntrypoint and 2 TaskManagerRunner. Then we run a 
> Flink client which submit a WordCount workload. The code is similar to 
> [https://github.com/apache/flink/blob/release-1.14.0/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java],
>  and we only add a Kafka topic output:
> {code:java}
>     private static DataStreamSink<String> addKafkaSink(
>             final DataStream<String> events, final String brokers, final 
> String topic) {
>         return events.sinkTo(KafkaSink.<String>builder()
>                 .setBootstrapServers(brokers)
>                 .setRecordSerializer(
>                         KafkaRecordSerializationSchema.<String>builder()
>                                 .setValueSerializationSchema(new 
> SimpleStringSchema())
>                                 .setTopic(topic)
>                                 .build())
>                 .build());
>     }
>     public static void run(final String[] args) throws Exception {
>         final String brokers = args[0];
>         final String textFilePath = args[1];
>         final String kafkaTopic = args[2];
>         final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>         final DataStream<String> text = env.readTextFile(textFilePath);
>         final DataStream<Tuple2<String, Integer>> counts =
>                 text.flatMap(new Tokenizer()).keyBy(value -> value.f0).sum(1);
>         addKafkaSink(counts.map(String::valueOf), brokers, kafkaTopic);
>         final long nano = System.nanoTime();
>         env.execute("WordCount");
>         FlinkGrayClientMain.reply("success", nano);
>     }
>  {code}
> We found that sometimes the Kafka topic fails to receive a few messages. We 
> reproduce the symptom multiple times. We found that the Kafka topic always 
> gets 160~169 messages while the expected number of messages is 170. We also 
> found that the missing messages are always the expected last few messages 
> from the 170 expected messages.
> Then we inspect the logs and code.
> First, we have an IOException to one of the TaskManagerRunner:
> {code:java}
> 2021-11-02T17:43:41,070 WARN  source.ContinuousFileReaderOperator 
> (ContinuousFileReaderOperator.java:finish(461)) - unable to emit watermark 
> while closing
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
> Could not forward element to next operator
>         at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:114)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:40)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndEmitWatermark(StreamSourceContexts.java:428)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.emitWatermark(StreamSourceContexts.java:544)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.emitWatermark(StreamSourceContexts.java:113)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.finish(ContinuousFileReaderOperator.java:459)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finishOperator(StreamOperatorWrapper.java:211)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferFinishOperatorToMailbox$3(StreamOperatorWrapper.java:185)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
> ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:97)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndFinishOperator(StreamOperatorWrapper.java:162)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:130)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:117)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:549)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:508)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>  [flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) 
> [flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) 
> [flink-dist_2.11-1.14.0.jar:1.14.0]
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
> [flink-dist_2.11-1.14.0.jar:1.14.0]
>         at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
> Caused by: java.lang.RuntimeException: McGray injected exception
>         at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:40)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:605)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         at 
> org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:112)
>  ~[flink-dist_2.11-1.14.0.jar:1.14.0]
>         ... 24 more
> Caused by: java.io.IOException {code}
> The IOException is from line 104 in RecordWriter#emit:
> {code:java}
>     protected void emit(T record, int targetSubpartition) throws IOException {
>         checkErroneous();
>         targetPartition.emitRecord(serializeRecord(serializer, record), 
> targetSubpartition); // line 104
>         if (flushAlways) {
>             targetPartition.flush(targetSubpartition);
>         }
>     } {code}
> Here, `targetPartition.emitRecord` will finally call some file I/O or memory 
> map I/O, triggering the IOException for some reason.
> This exception is caught at `RecordWriterOutput#emitWatermark`:
> {code:java}
>     @Override
>     public void emitWatermark(Watermark mark) {
>         if (announcedStatus.isIdle()) {
>             return;
>         }
>         watermarkGauge.setCurrentWatermark(mark.getTimestamp());
>         serializationDelegate.setInstance(mark);
>         try {
>             recordWriter.broadcastEmit(serializationDelegate);
>         } catch (Exception e) {
>             throw new RuntimeException(e.getMessage(), e);
>         }
>     } {code}
> And then caught at `ChainingOutput#emitWatermark`:
> {code:java}
>     @Override
>     public void emitWatermark(Watermark mark) {
>         if (announcedStatus.isIdle()) {
>             return;
>         }
>         try {
>             watermarkGauge.setCurrentWatermark(mark.getTimestamp());
>             input.processWatermark(mark);
>         } catch (Exception e) {
>             throw new ExceptionInChainedOperatorException(e);
>         }
>     } {code}
> And finally caught at `ContinuousFileReaderOperator#finish`:
> {code:java}
>     @Override
>     public void finish() throws Exception {
>         LOG.debug("finishing");
>         super.finish();        switch (state) {
>             case IDLE:
>                 switchState(ReaderState.FINISHED);
>                 break;
>             case FINISHED:
>                 LOG.warn("operator is already closed, doing nothing");
>                 return;
>             default:
>                 switchState(ReaderState.FINISHING);
>                 while (!state.isTerminal()) {
>                     executor.yield();
>                 }
>         }
>         try {
>             sourceContext.emitWatermark(Watermark.MAX_WATERMARK);
>         } catch (Exception e) {
>             LOG.warn("unable to emit watermark while closing", e);
>         }
>     } {code}
> Here `Watermark.MAX_WATERMARK` is emitted to properly finish the computation.
> In Flink (version 1.14.0), the full call stack of all the aforementioned 
> workflow is:
> {code:java}
> org.apache.flink.runtime.io.network.api.writer.RecordWriter#emit:104
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter#broadcastEmit:67
> org.apache.flink.streaming.runtime.io.RecordWriterOutput#emitWatermark:119
> org.apache.flink.streaming.api.operators.CountingOutput#emitWatermark:40
> org.apache.flink.streaming.api.operators.AbstractStreamOperator#processWatermark:605
> org.apache.flink.streaming.runtime.tasks.ChainingOutput#emitWatermark:112
> org.apache.flink.streaming.api.operators.CountingOutput#emitWatermark:40
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext#processAndEmitWatermark:428
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext#emitWatermark:544
> org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose#emitWatermark:113
> org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator#finish:459
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1#runThrowing:50
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#finishOperator:211
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#lambda$deferFinishOperatorToMailbox$3:185
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1#runThrowing:50
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail#run:90
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl#tryYield:97
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#quiesceTimeServiceAndFinishOperator:162
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#finish:130
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain#finishOperators:117
> org.apache.flink.streaming.runtime.tasks.StreamTask#endData:549
> org.apache.flink.streaming.runtime.tasks.StreamTask#processInput:508
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor#runMailboxLoop:203
> org.apache.flink.streaming.runtime.tasks.StreamTask#runMailboxLoop:809
> org.apache.flink.streaming.runtime.tasks.StreamTask#invoke:761
> org.apache.flink.runtime.taskmanager.Task#runWithSystemExitMonitoring:958
> org.apache.flink.runtime.taskmanager.Task#restoreAndInvoke:937
> org.apache.flink.runtime.taskmanager.Task#doRun:766
> org.apache.flink.runtime.taskmanager.Task#run:575
> java.lang.Thread#run:748 {code}
> We think the reason for missing a few ending messages in Kafka topic is in 
> `ChannelSelectorRecordWriter#broadcastEmit` (also in the shown call stack):
> {code:java}
>     @Override
>     public void broadcastEmit(T record) throws IOException {
>         checkErroneous();
>         // Emitting to all channels in a for loop can be better than calling
>         // ResultPartitionWriter#broadcastRecord because the broadcastRecord
>         // method incurs extra overhead.
>         ByteBuffer serializedRecord = serializeRecord(serializer, record);
>         for (int channelIndex = 0; channelIndex < numberOfChannels; 
> channelIndex++) {
>             serializedRecord.rewind();
>             emit(record, channelIndex);   // line 67
>         }
>         if (flushAlways) {
>             flushAll();
>         }
>     } {code}
> Line 67 tries to emit `Watermark.MAX_WATERMARK` (from 
> `ContinuousFileReaderOperator#finish`) to all channels. When the IOException 
> is thrown here, `ContinuousFileReaderOperator#finish` swallows all the 
> exceptions and loop fails to continue running line 67 for the remaining 
> channels. We reproduce the symptom multiple times and we found the number of 
> missing messages is exactly equal to the number of affected channels.
> That being said, we suspect the potential IOException at line 67 is not 
> properly handled because the current symptom and logging is not convenient 
> for the user to notice the issue or debug. The user may suddenly get a few 
> ending messages missing. And then the user can only find that there is some 
> IOException when emitting `Watermark.MAX_WATERMARK` somewhere. The users 
> still don’t know why and how a few ending messages are missing.
> We would like to propose a fix for this issue. A simple solution is catching 
> the IOException at line 67 and then do some logging and maybe retry to emit. 
> We implemented this solution and found the symptom disappears. However, we 
> also found that this `broadcastEmit` method is called at many places. So, 
> this fix will also affect the other callers, and we are not sure whether this 
> behavior is also proper for those callers.
> We are looking for suggestions and feedback. Thanks!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to