jirawech.s created FLINK-31689: ---------------------------------- Summary: Filesystem sink fails when parallelism of compactor operator changed Key: FLINK-31689 URL: https://issues.apache.org/jira/browse/FLINK-31689 Project: Flink Issue Type: Bug Components: Connectors / FileSystem Affects Versions: 1.16.1 Reporter: jirawech.s Attachments: HelloFlinkHadoopSink.java
I encounter this error when i tried to use Filesystem sink with Table SQL. I have not tested with Datastream API tho. You may refers to the error as below {code:java} // code placeholder java.util.NoSuchElementException at java.util.ArrayList$Itr.next(ArrayList.java:864) at org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:119) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:750) {code} I cannot attach the full reproducible code here, but you may follow my pseudo code in attachment and reproducible steps below 1. Create Kafka source 2. Set state.savepoints.dir 3. Set Job parallelism to 1 4. Create FileSystem Sink 5. Run the job and trigger savepoint with API {noformat} curl -X POST localhost:8081/jobs/:jobId/savepoints -d '{"cancel-job": false}'{noformat} {color:#172b4d}6. Cancel job, change parallelism to 2, and resume job from savepoint{color} -- This message was sent by Atlassian Jira (v8.20.10#820010)