jirawech.s created FLINK-31689:
----------------------------------

             Summary: Filesystem sink fails when parallelism of compactor 
operator changed
                 Key: FLINK-31689
                 URL: https://issues.apache.org/jira/browse/FLINK-31689
             Project: Flink
          Issue Type: Bug
          Components: Connectors / FileSystem
    Affects Versions: 1.16.1
            Reporter: jirawech.s
         Attachments: HelloFlinkHadoopSink.java

I encounter this error when i tried to use Filesystem sink with Table SQL. I 
have not tested with Datastream API tho. You may refers to the error as below
{code:java}
// code placeholder
java.util.NoSuchElementException
        at java.util.ArrayList$Itr.next(ArrayList.java:864)
        at 
org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:119)
        at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
        at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
        at java.lang.Thread.run(Thread.java:750) {code}
I cannot attach the full reproducible code here, but you may follow my pseudo 
code in attachment and reproducible steps below
1. Create Kafka source

2. Set state.savepoints.dir

3. Set Job parallelism to 1

4. Create FileSystem Sink

5. Run the job and trigger savepoint with API
{noformat}
curl -X POST localhost:8081/jobs/:jobId/savepoints -d '{"cancel-job": 
false}'{noformat}
{color:#172b4d}6. Cancel job, change parallelism to 2, and resume job from 
savepoint{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to