tomma-a opened a new issue, #10193: URL: https://github.com/apache/seatunnel/issues/10193
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened From my test :seatunnel 2.3.12 runs on flink engine 1.17 or 1.18 with flink upgradeMode: savepoint or last-state, when upgrading the flinkdeployment ( with flink k8s operator), I encounter following error: It seems like in seatunnel FlinkSourceSplitEnumeratorContext , we are using reflection to restore the seatunnel job , which is not yet compatiable with new versions of flink (just my guess) 2025-12-10 10:44:00,103 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No master state to restore 2025-12-10 10:44:00,104 INFO org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator [] - Resetting coordinator to checkpoint. 2025-12-10 10:44:00,107 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Closing SourceCoordinator for source Source: Kafka-Source. 2025-12-10 10:44:00,108 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source coordinator for source Source: Kafka-Source closed. 2025-12-10 10:44:00,111 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Restoring SplitEnumerator of source Source: Kafka-Source from checkpoint. 2025-12-10 10:44:00,221 WARN org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext [] - Get flink job id failed java.lang.IllegalStateException: Initialize flink job-id failed at org.apache.seatunnel.transla tion.flink.source.FlinkSourceSplitEnumeratorContext.getJobIdForV15(FlinkSourceSplitEnumeratorContext.java:152) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext.getFlinkJobId(FlinkSourceSplitEnumeratorContext.java:100) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext.<init>(FlinkSourceSplitEnumeratorContext.java:57) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at org.apache.seatunnel.translation.flink.source.FlinkSource.restoreEnumerator(FlinkSource.java:116) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at org.apache.seatunnel.translation.flink.source.FlinkSource.restoreEnumerator(FlinkSource.java:48) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52 bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at org.apache.flink.runtime.source.coordinator.SourceCoordinator.resetToCheckpoint(SourceCoordinator.java:444) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.resetAndStart(RecreateOnResetOperatorCoordinator.java:406) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$resetToCheckpoint$7(RecreateOnResetOperatorCoordinator.java:155) ~[flink-dist-1.17.2.jar:1.17.2] at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.whenComplete(Unknown Source) ~[?:?] at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:156) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:122) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:379) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:356) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils .java:112) ~[flink-dist-1.18.0.jar:1.18.0] at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?] at java.lang.Thread.run(Unknown Source) [?:?] Caused by: java.lang.NullPointerException: Cannot invoke "Object.getClass()" because "obj" is null at org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext.getJobIdForV15(FlinkSourceSplitEnumeratorContext.java:142) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-fd7fca630f268e360191f42ba11014b3:2.3.12] ### SeaTunnel Version seatunnel 2.3.12 ### SeaTunnel Config ```conf here is my above test settings: seatunnel job setting, it's a streaming job ` env { parallelism = 1 job.mode = "STREAMING" checkpoint.interval=60000 flink.execution.checkpointing.mode = "EXACTLY_ONCE" flink.execution.checkpointing.timeout = 600000 } source { Kafka { plugin_output="fake2" topic = info consumer.group="testr" bootstrap.servers = "tom-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092" format = json } } sink { Kafka { plugin_input="fake2" topic = topc bootstrap.servers = "tom-cluster1-kafka-bootstrap.kafka.svc.cluster.local:9092" format = json kafka.request.timeout.ms = 60000 semantics = EXACTLY_ONCE } } ` Then I run the seatunnel job in flink (by flink k8s operator) CDR FlinkDeployment ` apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: seatunnel-flink-streaming-example-2 namespace: kafka spec: ...... volumes: - name: seatunnel-config configMap: name: seatunnel-config job: jarURI: local:///opt/seatunnel/starter/seatunnel-flink-15-starter.jar entryClass: org.apache.seatunnel.core.starter.flink.SeaTunnelFlink args: ["--config", "/data/seatunnel.streaming.conf"] parallelism: 2 upgradeMode: savepoint ` The first time , i kubectl apply the above yaml into a k8s cluster, the the seatunnel job is running as normal. the flink checkpoints saved periodically successfully. Then i make some changes the above yaml file , then apply the yaml again in k8s. It's a kind of uprading mode Because my flink upgradeMode is savepoint (if last-state also doesn't work which uses the last checkpoint) The above error happen, can NOT restore from last checkpoint/or savepoint! Please correct me if i'm wrong about this, thanks ``` ### Running Command ```shell by applying a k8s flinkdeployemnt CDR ``` ### Error Exception ```log 2025-12-10 10:44:00,103 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - No master state to restore 2025-12-10 10:44:00,104 INFO org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator [] - Resetting coordinator to checkpoint. 2025-12-10 10:44:00,107 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Closing SourceCoordinator for source Source: Kafka-Source. 2025-12-10 10:44:00,108 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Source coordinator for source Source: Kafka-Source closed. 2025-12-10 10:44:00,111 INFO org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Restoring SplitEnumerator of source Source: Kafka-Source from checkpoint. 2025-12-10 10:44:00,221 WARN org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext [] - Get flink job id failed java.lang.IllegalStateException: Initialize flink job-id failed at org.apache.seatunnel.transla tion.flink.source.FlinkSourceSplitEnumeratorContext.getJobIdForV15(FlinkSourceSplitEnumeratorContext.java:152) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext.getFlinkJobId(FlinkSourceSplitEnumeratorContext.java:100) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext.<init>(FlinkSourceSplitEnumeratorContext.java:57) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at org.apache.seatunnel.translation.flink.source.FlinkSource.restoreEnumerator(FlinkSource.java:116) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at org.apache.seatunnel.translation.flink.source.FlinkSource.restoreEnumerator(FlinkSource.java:48) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52 bc-242b99ed4b860a32f883f68f21d7ff2b:2.3.12] at org.apache.flink.runtime.source.coordinator.SourceCoordinator.resetToCheckpoint(SourceCoordinator.java:444) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$DeferrableCoordinator.resetAndStart(RecreateOnResetOperatorCoordinator.java:406) ~[flink-dist-1.17.2.jar:1.17.2] at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator.lambda$resetToCheckpoint$7(RecreateOnResetOperatorCoordinator.java:155) ~[flink-dist-1.17.2.jar:1.17.2] at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.uniWhenCompleteStage(Unknown Source) ~[?:?] at java.util.concurrent.CompletableFuture.whenComplete(Unknown Source) ~[?:?] at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:156) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:122) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:379) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:356) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100) ~[flink-dist-1.18.0.jar:1.18.0] at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils .java:112) ~[flink-dist-1.18.0.jar:1.18.0] at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?] at java.lang.Thread.run(Unknown Source) [?:?] Caused by: java.lang.NullPointerException: Cannot invoke "Object.getClass()" because "obj" is null at org.apache.seatunnel.translation.flink.source.FlinkSourceSplitEnumeratorContext.getJobIdForV15(FlinkSourceSplitEnumeratorContext.java:142) ~[blob_p-ce35d9ba37fc821b91a3c1462ad9474638da52bc-fd7fca630f268e360191f42ba11014b3:2.3.12] ``` ### Zeta or Flink or Spark Version flink: 1.16 , 1.17 ,1.18 I don't test other versions of flink. ### Java or Scala Version _No response_ ### Screenshots _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
