Hi, I'm a co-author for opensource Delta-Flink connector hosted on [1]. The connector was originated for Flink 1.12 and currently we migrated to 1.14. Both sink and source are using new Unified API from Flink 1.12.
I'm evaluating migration to Flink 1.15 where Sink v1 was marked as deprecated. After the migration, one of our integration test for Sink started to fail for cluster failover scenario [2] The test is heavily based on Flink's StreamingExecutionFileSinkITCase [3] but since we use Junit5, we do not extend this Flink's class. For our 1.15 test setup I'm using `SinkV1Adapter.wrap(...)` to wrap our V1 Sink instance. The test fails in one of the two ways: Caused by: java.lang.NullPointerException: Unknown subtask for 1 at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104) at org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.getSubtaskCommittableManager(CheckpointCommittableManagerImpl.java:96) at org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.addCommittable(CheckpointCommittableManagerImpl.java:90) at org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addCommittable(CommittableCollector.java:234) at org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:126) at org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator.processElement(GlobalCommitterOperator.java:190) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:748) OR Caused by: java.lang.UnsupportedOperationException: Currently it is not supported to update the CommittableSummary for a checkpoint coming from the same subtask. Please check the status of FLINK-25920 at org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.upsertSummary(CheckpointCommittableManagerImpl.java:84) at org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addSummary(CommittableCollector.java:230) at org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:124) at org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator.processElement(GlobalCommitterOperator.java:190) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:748) Test is passing for Flink 1.12, 1.13 and 1.14. I would like to ask for any suggestions, what might be causing this. Thanks, Krzysztof Chmielewski [1] https://github.com/delta-io/connectors/tree/master/flink [2] https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java [3] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java