[ 
https://issues.apache.org/jira/browse/FLINK-31689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17708030#comment-17708030
 ] 

jirawech.s commented on FLINK-31689:
------------------------------------

[~luoyuxia] I see. So, we could say that it is normal behaviour of 
CompactOperator in File Sink for now. If we were to improve, we could do that 
by implementing state compatible CompactOperator right? Could you point me to 
the code/class i should check out. I am not so familiar with Flink development

> Filesystem sink fails when parallelism of compactor operator changed
> --------------------------------------------------------------------
>
>                 Key: FLINK-31689
>                 URL: https://issues.apache.org/jira/browse/FLINK-31689
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / FileSystem
>    Affects Versions: 1.16.1
>            Reporter: jirawech.s
>            Priority: Major
>         Attachments: HelloFlinkHadoopSink.java
>
>
> I encounter this error when i tried to use Filesystem sink with Table SQL. I 
> have not tested with Datastream API tho. You may refers to the error as below
> {code:java}
> // code placeholder
> java.util.NoSuchElementException
>       at java.util.ArrayList$Itr.next(ArrayList.java:864)
>       at 
> org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:119)
>       at 
> org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
>       at 
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
>       at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>       at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>       at java.lang.Thread.run(Thread.java:750) {code}
> I cannot attach the full reproducible code here, but you may follow my pseudo 
> code in attachment and reproducible steps below
> 1. Create Kafka source
> 2. Set state.savepoints.dir
> 3. Set Job parallelism to 1
> 4. Create FileSystem Sink
> 5. Run the job and trigger savepoint with API
> {noformat}
> curl -X POST localhost:8081/jobs/:jobId/savepoints -d '{"cancel-job": 
> false}'{noformat}
> {color:#172b4d}6. Cancel job, change parallelism to 2, and resume job from 
> savepoint{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to