Hi Krzysztof,
I'll continue tracking the issue and hopefully it
could be fixed before the next minor releases for
1.15.x, 1.16.x and 1.17. 
Best,
Yun Gao
------------------------------------------------------------------
From:Krzysztof Chmielewski <krzysiek.chmielew...@gmail.com>
Send Time:2022 Sep. 28 (Wed.) 23:46
To:dev <dev@flink.apache.org>; Yun Gao <yungao...@aliyun.com>
Subject:Re: Migrating Sink v1 to v2
Yun Gao,
Thank you very much for this.
Do you see this ticked be picked up any time soon?
For a moment I was thinking about trying to take my change with it but I 
guessing it would be rather hard without deeper understanding of Flink's 
internals, plus its marked as "major"
Regards,
Krzysztof Chmielewski
śr., 28 wrz 2022 o 17:22 Yun Gao <yungao...@aliyun.com.invalid> napisał(a):
Hi Krzysztof,
 Very sorry for the long delay for it takes a bit of time to have a full 
investigation,
 the issue should be caused by the implementation bugs and I have filed an issue
 for the bugs. 
 For the following actions, let's move to the thread of [2] for global 
synchronization.
 Very sorry for the inconvenience brought.
 Best,
 Yun Gao
 [1] https://issues.apache.org/jira/browse/FLINK-29459 
<https://issues.apache.org/jira/browse/FLINK-29459 > 
<https://issues.apache.org/jira/browse/FLINK-29459 
<https://issues.apache.org/jira/browse/FLINK-29459 > >
 [2] https://lists.apache.org/thread/wzwkqd08qkcmmf5m2xroxpxnzzwfphc9 
<https://lists.apache.org/thread/wzwkqd08qkcmmf5m2xroxpxnzzwfphc9 > 
<https://lists.apache.org/thread/wzwkqd08qkcmmf5m2xroxpxnzzwfphc9 
<https://lists.apache.org/thread/wzwkqd08qkcmmf5m2xroxpxnzzwfphc9 > >
 ------------------------------------------------------------------
 From:Krzysztof Chmielewski <krzysiek.chmielew...@gmail.com 
<mailto:krzysiek.chmielew...@gmail.com >>
 Send Time:2022 Sep. 10 (Sat.) 04:04
 To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org >>
 Subject:Re: Migrating Sink v1 to v2
 It seems that there might be an issue with state recovery or some kind
 concurrency issue for GlobalCommitterOperator created by SinkV1Adapter
 that uses StandardSinkTopologies.addGlobalCommitter method in scenarios
 with cluster failover.
 I've tried to locate the place where the issue is and from what I can tell
 is that for setup with two writers, SubtaskCommittableManager from both
 writer tasks are registered in CheckpointCommittableManagerImpl using
 upsertSummary method.
 Then, CommittableWithLineage objects are processed in
 CheckpointCommittableManagerImpl.addCommittable by
 SubtaskCommittableManager corresponding to subtaskId taken from
 CommittableWithLineage.getSubtaskId().
 However with failover scenario (Task Manager crush), where for example
 there is an exception thrown in source, CheckpointCommittableManagerImpl
 is recreated without any SubtaskCommittableManager although that could be
 actually ok. What is more important is that during recovery,
 CheckpointCommittableManagerImpl.addCommittable is called twice but
 for CommittableSummary with the same subtaskID, for example 0 instead 0 and
 1 like it was done before designed TM crush.
 The later CommittableSummary fails with exception pointing to FLINK-25920.
 After that or sometimes before the second CommittableSummary arrived,
 CheckpointCommittableManagerImpl.addCommittable is called for subtaskId 1
 that is not registered in CheckpointCommittableManagerImpl. That causes
 NPE.
 My streaming environment has the following configuration:
 Configuration config = new Configuration();
 config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.STREAMING);
 env.configure(config, getClass().getClassLoader());
 env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
 For now, I don't have any other, more compact reproducer than testFileSink
 test from [1].
 [1]
https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java
 
<https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java
 > 
<https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java
 
<https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java
 > >
 śr., 7 wrz 2022 o 13:22 Krzysztof Chmielewski <
krzysiek.chmielew...@gmail.com <mailto:krzysiek.chmielew...@gmail.com >> 
napisał(a):
 > A small update,
 > When I change number of Sinks from 3 to 1, test passes.
 >
 > śr., 7 wrz 2022 o 12:18 Krzysztof Chmielewski <
 > krzysiek.chmielew...@gmail.com <mailto:krzysiek.chmielew...@gmail.com >> 
 > napisał(a):
 >
 >> Hi,
 >> I'm a co-author for opensource Delta-Flink connector hosted on [1].
 >> The connector was originated for Flink 1.12 and currently we migrated to
 >> 1.14.
 >> Both sink and source are using new Unified API from Flink 1.12.
 >>
 >> I'm evaluating migration to Flink 1.15 where Sink v1 was marked as
 >> deprecated.
 >> After the migration, one of our integration test for Sink started to fail
 >> for cluster failover scenario [2]
 >> The test is heavily based on Flink's StreamingExecutionFileSinkITCase [3]
 >> but since we use Junit5, we do not extend this Flink's class.
 >>
 >> For our 1.15 test setup I'm using `SinkV1Adapter.wrap(...)` to wrap our
 >> V1 Sink instance.
 >>
 >> The test fails in one of the two ways:
 >>
 >> Caused by: java.lang.NullPointerException: Unknown subtask for 1
 >> at
 >> org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
 >> at
 >> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.getSubtaskCommittableManager(CheckpointCommittableManagerImpl.java:96)
 >> at
 >> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.addCommittable(CheckpointCommittableManagerImpl.java:90)
 >> at
 >> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addCommittable(CommittableCollector.java:234)
 >> at
 >> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:126)
 >> at
 >> org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator.processElement(GlobalCommitterOperator.java:190)
 >> at
 >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
 >> at
 >> org.apache.flink.streaming.runtime.io <http://runtime.io 
 >> >.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
 >> at
 >> org.apache.flink.streaming.runtime.io <http://runtime.io 
 >> >.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
 >> at
 >> org.apache.flink.streaming.runtime.io <http://runtime.io 
 >> >.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 >> at
 >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
 >> at
 >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
 >> at
 >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
 >> at
 >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
 >> at
 >> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
 >> at
 >> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
 >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
 >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
 >> at java.lang.Thread.run(Thread.java:748)
 >>
 >>
 >> OR
 >>
 >> Caused by: java.lang.UnsupportedOperationException: Currently it is not
 >> supported to update the CommittableSummary for a checkpoint coming from the
 >> same subtask. Please check the status of FLINK-25920
 >> at
 >> org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.upsertSummary(CheckpointCommittableManagerImpl.java:84)
 >> at
 >> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addSummary(CommittableCollector.java:230)
 >> at
 >> org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector.addMessage(CommittableCollector.java:124)
 >> at
 >> org.apache.flink.streaming.api.connector.sink2.GlobalCommitterOperator.processElement(GlobalCommitterOperator.java:190)
 >> at
 >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
 >> at
 >> org.apache.flink.streaming.runtime.io <http://runtime.io 
 >> >.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
 >> at
 >> org.apache.flink.streaming.runtime.io <http://runtime.io 
 >> >.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
 >> at
 >> org.apache.flink.streaming.runtime.io <http://runtime.io 
 >> >.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 >> at
 >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
 >> at
 >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
 >> at
 >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
 >> at
 >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
 >> at
 >> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
 >> at
 >> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
 >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
 >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
 >> at java.lang.Thread.run(Thread.java:748)
 >>
 >> Test is passing for Flink 1.12, 1.13 and 1.14.
 >>
 >> I would like to ask for any suggestions, what might be causing this.
 >>
 >> Thanks,
 >> Krzysztof Chmielewski
 >>
 >>
 >> [1] https://github.com/delta-io/connectors/tree/master/flink 
 >> <https://github.com/delta-io/connectors/tree/master/flink > 
 >> <https://github.com/delta-io/connectors/tree/master/flink 
 >> <https://github.com/delta-io/connectors/tree/master/flink > >
 >> [2]
 >> https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java
 >>  
 >> <https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java
 >>  > 
 >> <https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java
 >>  
 >> <https://github.com/kristoffSC/connectors/blob/Flink_1.15/flink/src/test/java/io/delta/flink/sink/DeltaSinkStreamingExecutionITCase.java
 >>  > >
 >> [3]
 >> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java
 >>  
 >> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java
 >>  > 
 >> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java
 >>  
 >> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/StreamingExecutionFileSinkITCase.java
 >>  > >
 >>
 >

Reply via email to