[ https://issues.apache.org/jira/browse/FLINK-31689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17707928#comment-17707928 ]
luoyuxia commented on FLINK-31689: ---------------------------------- [~jirawech.s] Yes, it's normal behavior. Maybe you can disable auto compaction and try it again. By say "the state", I mean the state of {{CompactOperator}} in File Sink. Other sinks have their own implementation which can be state compatible. But after diving into the {{{}CompactOperator{}}}, I think it can be implemented in state compatible style which then won't throw exception though we change parallelism. > Filesystem sink fails when parallelism of compactor operator changed > -------------------------------------------------------------------- > > Key: FLINK-31689 > URL: https://issues.apache.org/jira/browse/FLINK-31689 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem > Affects Versions: 1.16.1 > Reporter: jirawech.s > Priority: Major > Attachments: HelloFlinkHadoopSink.java > > > I encounter this error when i tried to use Filesystem sink with Table SQL. I > have not tested with Datastream API tho. You may refers to the error as below > {code:java} > // code placeholder > java.util.NoSuchElementException > at java.util.ArrayList$Itr.next(ArrayList.java:864) > at > org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:119) > at > org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286) > at > org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700) > at > org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.lang.Thread.run(Thread.java:750) {code} > I cannot attach the full reproducible code here, but you may follow my pseudo > code in attachment and reproducible steps below > 1. Create Kafka source > 2. Set state.savepoints.dir > 3. Set Job parallelism to 1 > 4. Create FileSystem Sink > 5. Run the job and trigger savepoint with API > {noformat} > curl -X POST localhost:8081/jobs/:jobId/savepoints -d '{"cancel-job": > false}'{noformat} > {color:#172b4d}6. Cancel job, change parallelism to 2, and resume job from > savepoint{color} -- This message was sent by Atlassian Jira (v8.20.10#820010)