[ 
https://issues.apache.org/jira/browse/FLINK-31689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17707928#comment-17707928
 ] 

luoyuxia commented on FLINK-31689:
----------------------------------

[~jirawech.s] Yes, it's normal behavior. Maybe you can disable auto compaction 
and try it again.  By say "the state", I mean the state of {{CompactOperator}} 
in File Sink. Other sinks have their own implementation which can be state 
compatible.

But after diving into the {{{}CompactOperator{}}}, I think  it can be 
implemented in state compatible style which then won't throw exception though 
we change parallelism.

> Filesystem sink fails when parallelism of compactor operator changed
> --------------------------------------------------------------------
>
>                 Key: FLINK-31689
>                 URL: https://issues.apache.org/jira/browse/FLINK-31689
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / FileSystem
>    Affects Versions: 1.16.1
>            Reporter: jirawech.s
>            Priority: Major
>         Attachments: HelloFlinkHadoopSink.java
>
>
> I encounter this error when i tried to use Filesystem sink with Table SQL. I 
> have not tested with Datastream API tho. You may refers to the error as below
> {code:java}
> // code placeholder
> java.util.NoSuchElementException
>       at java.util.ArrayList$Itr.next(ArrayList.java:864)
>       at 
> org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:119)
>       at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
>       at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>       at java.lang.Thread.run(Thread.java:750) {code}
> I cannot attach the full reproducible code here, but you may follow my pseudo 
> code in attachment and reproducible steps below
> 1. Create Kafka source
> 2. Set state.savepoints.dir
> 3. Set Job parallelism to 1
> 4. Create FileSystem Sink
> 5. Run the job and trigger savepoint with API
> {noformat}
> curl -X POST localhost:8081/jobs/:jobId/savepoints -d '{"cancel-job": 
> false}'{noformat}
> {color:#172b4d}6. Cancel job, change parallelism to 2, and resume job from 
> savepoint{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to