Xie Yi created FLINK-29816: ------------------------------ Summary: Userfunction exception in ProcessWindowFunction was called before invoke during restore state(subtask was in INITIALIZING state), but SteamTask skip handle Exception Key: FLINK-29816 URL: https://issues.apache.org/jira/browse/FLINK-29816 Project: Flink Issue Type: Bug Components: Runtime / Task Affects Versions: 1.15.2, 1.16.0 Reporter: Xie Yi Attachments: image-2022-10-31-19-49-52-432.png, image-2022-10-31-19-54-12-546.png
h4. 1. How to repeat ProcessWindowFunction, and make some exception in process() test code {code:java} public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(60 * 1000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(60000); KafkaSource<String> kafkaConsumer = KafkaSource.<String>builder() .setBootstrapServers("****") .setTopics("****") .setGroupId("****") .setValueOnlyDeserializer(new SimpleStringSchema()) .setStartingOffsets(OffsetsInitializer.earliest()) .build(); DataStreamSource<String> kafkaSource = env.fromSource(kafkaConsumer, WatermarkStrategy.noWatermarks(), "Kafka Source"); SingleOutputStreamOperator<String> mapSourse = kafkaSource.keyBy(s -> s).window(TumblingProcessingTimeWindows.of(Time.seconds(15))) .process(new ProcessWindowFunction<String, String, String, TimeWindow>() { @Override public void process(String s, ProcessWindowFunction<String, String, String, TimeWindow>.Context context, Iterable<String> iterable, Collector<String> collector) throws Exception { //when process event:"abc" .It causes java.lang.NumberFormatException Integer intS = Integer.valueOf(s); collector.collect(s); } }) .name("name-process").uid("uid-process"); mapSourse.print(); env.execute(); } {code} kafka input event {code:java} >1 >1 >2 >2 >3 >3 >abc >abc > {code} h4. 2. fault phenomena when job process the event:"abc",It will cause java.lang.NumberFormatException and failover ,Then attempt and failover continuously. However, it only failover 2 times(attempt 0, attempt 1) and when attempt for third time, It work normally, and no exception !image-2022-10-31-19-54-12-546.png! h4. 3. possible reasons attempt 2 was restore from checkpoint {code:java} 2022-10-31 17:00:30,033 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring job 7bca78a75b089d447bb4c99efcfd6527 from Checkpoint 1 @ 1667206793605 for 7bca78a75b089d447bb4c99efcfd6527 located at hdfs://eadhadoop/user/sloth/sloth-fs-checkpoints/meta/1_7/7bca78a75b089d447bb4c99efcfd6527/chk-1. {code} *the stack trace in third attempt* user function was called in SteamTask.restore(subtask state is INITIALIZING) {code:java} java.lang.Thread.getStackTrace(Thread.java:1552) com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45) com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:40) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:524) org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284) org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693) org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684) org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:353) org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317) org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:690) org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654) org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) java.lang.Thread.run(Thread.java:745) {code} stack trace(which cause failover) in attempt 0 and attempt 1 user function was called in SteamTask.invoke {code:java} com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:45) com.youdao.analysis.KafkaCheckpointWindowProcessTest$1.process(KafkaCheckpointWindowProcessTest.java:40) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:57) org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:568) org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:524) org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:284) org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1693) org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$22(StreamTask.java:1684) org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) java.lang.Thread.run(Thread.java:745) {code} in org.apache.flink.streaming.runtime.tasks.StreamTask handleAsyncException SteamTask only handleAsyncException when is Running==true [https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L1540] {code:java} @Override public void handleAsyncException(String message, Throwable exception) { if (isRunning) { // only fail if the task is still running asyncExceptionHandler.handleAsyncException(message, exception); } } {code} but during restore,isRunning==false [https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L673] So during Steam.restore, SteamTask skip exception in userfunction of ProcessWindowFunction. -- This message was sent by Atlassian Jira (v8.20.10#820010)