[ https://issues.apache.org/jira/browse/FLINK-30032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17635081#comment-17635081 ]
luoyuxia commented on FLINK-30032: ---------------------------------- Just curious about why this fix can solve this problem. Could you please explain a little bit more? Another question, when you catch the IOException at line 67 and do the logging, will you throw the IOException again? > IOException during MAX_WATERMARK emission causes message missing > ---------------------------------------------------------------- > > Key: FLINK-30032 > URL: https://issues.apache.org/jira/browse/FLINK-30032 > Project: Flink > Issue Type: Bug > Affects Versions: 1.14.0 > Reporter: Haoze Wu > Priority: Major > > We are doing testing on Flink (version 1.14.0). We launch 1 > StandaloneSessionClusterEntrypoint and 2 TaskManagerRunner. Then we run a > Flink client which submit a WordCount workload. The code is similar to > [https://github.com/apache/flink/blob/release-1.14.0/flink-examples/flink-examples-batch/src/main/java/org/apache/flink/examples/java/wordcount/WordCount.java], > and we only add a Kafka topic output: > {code:java} > private static DataStreamSink<String> addKafkaSink( > final DataStream<String> events, final String brokers, final > String topic) { > return events.sinkTo(KafkaSink.<String>builder() > .setBootstrapServers(brokers) > .setRecordSerializer( > KafkaRecordSerializationSchema.<String>builder() > .setValueSerializationSchema(new > SimpleStringSchema()) > .setTopic(topic) > .build()) > .build()); > } > public static void run(final String[] args) throws Exception { > final String brokers = args[0]; > final String textFilePath = args[1]; > final String kafkaTopic = args[2]; > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setRuntimeMode(RuntimeExecutionMode.BATCH); > final DataStream<String> text = env.readTextFile(textFilePath); > final DataStream<Tuple2<String, Integer>> counts = > text.flatMap(new Tokenizer()).keyBy(value -> value.f0).sum(1); > addKafkaSink(counts.map(String::valueOf), brokers, kafkaTopic); > final long nano = System.nanoTime(); > env.execute("WordCount"); > FlinkGrayClientMain.reply("success", nano); > } > {code} > We found that sometimes the Kafka topic fails to receive a few messages. We > reproduce the symptom multiple times. We found that the Kafka topic always > gets 160~169 messages while the expected number of messages is 170. We also > found that the missing messages are always the expected last few messages > from the 170 expected messages. > Then we inspect the logs and code. > First, we have an IOException to one of the TaskManagerRunner: > {code:java} > 2021-11-02T17:43:41,070 WARN source.ContinuousFileReaderOperator > (ContinuousFileReaderOperator.java:finish(461)) - unable to emit watermark > while closing > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to next operator > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:114) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:40) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndEmitWatermark(StreamSourceContexts.java:428) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.emitWatermark(StreamSourceContexts.java:544) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.emitWatermark(StreamSourceContexts.java:113) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.finish(ContinuousFileReaderOperator.java:459) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finishOperator(StreamOperatorWrapper.java:211) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferFinishOperatorToMailbox$3(StreamOperatorWrapper.java:185) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:97) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndFinishOperator(StreamOperatorWrapper.java:162) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.finish(StreamOperatorWrapper.java:130) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.finishOperators(RegularOperatorChain.java:117) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.endData(StreamTask.java:549) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:508) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) > [flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) > [flink-dist_2.11-1.14.0.jar:1.14.0] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) > [flink-dist_2.11-1.14.0.jar:1.14.0] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) > [flink-dist_2.11-1.14.0.jar:1.14.0] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275] > Caused by: java.lang.RuntimeException: McGray injected exception > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:40) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:605) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > at > org.apache.flink.streaming.runtime.tasks.ChainingOutput.emitWatermark(ChainingOutput.java:112) > ~[flink-dist_2.11-1.14.0.jar:1.14.0] > ... 24 more > Caused by: java.io.IOException {code} > The IOException is from line 104 in RecordWriter#emit: > {code:java} > protected void emit(T record, int targetSubpartition) throws IOException { > checkErroneous(); > targetPartition.emitRecord(serializeRecord(serializer, record), > targetSubpartition); // line 104 > if (flushAlways) { > targetPartition.flush(targetSubpartition); > } > } {code} > Here, `targetPartition.emitRecord` will finally call some file I/O or memory > map I/O, triggering the IOException for some reason. > This exception is caught at `RecordWriterOutput#emitWatermark`: > {code:java} > @Override > public void emitWatermark(Watermark mark) { > if (announcedStatus.isIdle()) { > return; > } > watermarkGauge.setCurrentWatermark(mark.getTimestamp()); > serializationDelegate.setInstance(mark); > try { > recordWriter.broadcastEmit(serializationDelegate); > } catch (Exception e) { > throw new RuntimeException(e.getMessage(), e); > } > } {code} > And then caught at `ChainingOutput#emitWatermark`: > {code:java} > @Override > public void emitWatermark(Watermark mark) { > if (announcedStatus.isIdle()) { > return; > } > try { > watermarkGauge.setCurrentWatermark(mark.getTimestamp()); > input.processWatermark(mark); > } catch (Exception e) { > throw new ExceptionInChainedOperatorException(e); > } > } {code} > And finally caught at `ContinuousFileReaderOperator#finish`: > {code:java} > @Override > public void finish() throws Exception { > LOG.debug("finishing"); > super.finish(); switch (state) { > case IDLE: > switchState(ReaderState.FINISHED); > break; > case FINISHED: > LOG.warn("operator is already closed, doing nothing"); > return; > default: > switchState(ReaderState.FINISHING); > while (!state.isTerminal()) { > executor.yield(); > } > } > try { > sourceContext.emitWatermark(Watermark.MAX_WATERMARK); > } catch (Exception e) { > LOG.warn("unable to emit watermark while closing", e); > } > } {code} > Here `Watermark.MAX_WATERMARK` is emitted to properly finish the computation. > In Flink (version 1.14.0), the full call stack of all the aforementioned > workflow is: > {code:java} > org.apache.flink.runtime.io.network.api.writer.RecordWriter#emit:104 > org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter#broadcastEmit:67 > org.apache.flink.streaming.runtime.io.RecordWriterOutput#emitWatermark:119 > org.apache.flink.streaming.api.operators.CountingOutput#emitWatermark:40 > org.apache.flink.streaming.api.operators.AbstractStreamOperator#processWatermark:605 > org.apache.flink.streaming.runtime.tasks.ChainingOutput#emitWatermark:112 > org.apache.flink.streaming.api.operators.CountingOutput#emitWatermark:40 > org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext#processAndEmitWatermark:428 > org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext#emitWatermark:544 > org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose#emitWatermark:113 > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator#finish:459 > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1#runThrowing:50 > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#finishOperator:211 > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#lambda$deferFinishOperatorToMailbox$3:185 > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1#runThrowing:50 > org.apache.flink.streaming.runtime.tasks.mailbox.Mail#run:90 > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl#tryYield:97 > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#quiesceTimeServiceAndFinishOperator:162 > org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper#finish:130 > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain#finishOperators:117 > org.apache.flink.streaming.runtime.tasks.StreamTask#endData:549 > org.apache.flink.streaming.runtime.tasks.StreamTask#processInput:508 > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor#runMailboxLoop:203 > org.apache.flink.streaming.runtime.tasks.StreamTask#runMailboxLoop:809 > org.apache.flink.streaming.runtime.tasks.StreamTask#invoke:761 > org.apache.flink.runtime.taskmanager.Task#runWithSystemExitMonitoring:958 > org.apache.flink.runtime.taskmanager.Task#restoreAndInvoke:937 > org.apache.flink.runtime.taskmanager.Task#doRun:766 > org.apache.flink.runtime.taskmanager.Task#run:575 > java.lang.Thread#run:748 {code} > We think the reason for missing a few ending messages in Kafka topic is in > `ChannelSelectorRecordWriter#broadcastEmit` (also in the shown call stack): > {code:java} > @Override > public void broadcastEmit(T record) throws IOException { > checkErroneous(); > // Emitting to all channels in a for loop can be better than calling > // ResultPartitionWriter#broadcastRecord because the broadcastRecord > // method incurs extra overhead. > ByteBuffer serializedRecord = serializeRecord(serializer, record); > for (int channelIndex = 0; channelIndex < numberOfChannels; > channelIndex++) { > serializedRecord.rewind(); > emit(record, channelIndex); // line 67 > } > if (flushAlways) { > flushAll(); > } > } {code} > Line 67 tries to emit `Watermark.MAX_WATERMARK` (from > `ContinuousFileReaderOperator#finish`) to all channels. When the IOException > is thrown here, `ContinuousFileReaderOperator#finish` swallows all the > exceptions and loop fails to continue running line 67 for the remaining > channels. We reproduce the symptom multiple times and we found the number of > missing messages is exactly equal to the number of affected channels. > That being said, we suspect the potential IOException at line 67 is not > properly handled because the current symptom and logging is not convenient > for the user to notice the issue or debug. The user may suddenly get a few > ending messages missing. And then the user can only find that there is some > IOException when emitting `Watermark.MAX_WATERMARK` somewhere. The users > still don’t know why and how a few ending messages are missing. > We would like to propose a fix for this issue. A simple solution is catching > the IOException at line 67 and then do some logging and maybe retry to emit. > We implemented this solution and found the symptom disappears. However, we > also found that this `broadcastEmit` method is called at many places. So, > this fix will also affect the other callers, and we are not sure whether this > behavior is also proper for those callers. > We are looking for suggestions and feedback. Thanks! -- This message was sent by Atlassian Jira (v8.20.10#820010)