Peter, I was suggesting not to remove earlier, but to switch the default sink implementation earlier. Is that what you mean?
-Max On Thu, Jun 26, 2025 at 5:19 PM Péter Váry <peter.vary.apa...@gmail.com> wrote: > +1 for the more conservative removal time > > Maximilian Michels <m...@apache.org> ezt írta (időpont: 2025. jún. 26., > Cs, 15:54): > >> Hey Rod, >> >> IcebergSink does not currently support speculative batch execution. >> Modern sinks need to implement SupportsConcurrentExecutionAttempts [1] >> in order to work with speculative (batch) execution. This is merely a >> marker interface. I've verified that after adding it to IcebergSink, the >> test passes fine. >> >> Hey Steven, >> >> >I will reiterate one point that I mentioned before. For the Flink >> connector, sink is the dominant use case (compared source). Regardless of >> the unit test issue, it is probably better to go a little slower on this >> switch of sink implementation. >> >> Agreed, but I'm still a bit skeptical, that leaving IcebergSink disabled >> by default we will reach greater maturity. We will have a similar situation >> for the Iceberg 1.11.0 release, where we again need to decide whether to >> make the switch. The truth is, only after the sink is out and about, we can >> find any potential issues. Since we have the option for users to switch >> back to the old implementation, we at least won't be blocking users. That >> said, I hear your concerns and I'm perfectly fine with waiting until >> Iceberg 1.11.0 to make IcebergSink the default Flink sink. >> >> -Max >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/speculative_execution/#enable-sinks-for-speculative-execution >> >> On Thu, Jun 26, 2025 at 5:38 AM Steven Wu <stevenz...@gmail.com> wrote: >> >>> I will reiterate one point that I mentioned before. For the Flink >>> connector, sink is the dominant use case (compared source). Regardless of >>> the unit test issue, it is probably better to go a little slower on this >>> switch of sink implementation. >>> >>> On Wed, Jun 25, 2025 at 4:22 PM Rodrigo Meneses <rmene...@gmail.com> >>> wrote: >>> >>>> Hello! >>>> >>>> >>>> I wanted to give you folks an update about this, especially because of >>>> Max's suggestion of releasing this change in 1.10, as I know the release >>>> branch will be cut soon, and without this unit test passing, I don’t feel >>>> comfortable releasing this. >>>> >>>> >>>> During the implementation of this feature (which makes the IcebergSink >>>> the default sink for SQL), there’s a unit test that is failing that is >>>> covering Speculative Execution Mode. >>>> >>>> ` >>>> >>>> TestIcebergSpeculativeExecutionSupport.testSpeculativeExecution() >>>> >>>> ` >>>> >>>> The test is timing out. I debugged it while using `FlinkSink` as the >>>> sink engine, and it works >>>> >>>> With FlinkSink, in the debugger, I observed that this code: >>>> >>>> ```java >>>> >>>> if (taskInfo.getIndexOfThisSubtask() == 0 && >>>> taskInfo.getAttemptNumber() <= 0) { >>>> >>>> Thread.*sleep*(Integer.*MAX_VALUE*); >>>> >>>> } >>>> >>>> ``` >>>> >>>> Is being hit 3 times: >>>> >>>> 1. The first time this `taskInfo.getIndexOfThisSubtask() == 0 && >>>> taskInfo.getAttemptNumber() <= 0` evaluates to true, so we Sleep forever >>>> 2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1, so we >>>> skip the infinite sleep and we return the row >>>> 3. The third time (the speculative try), >>>> `taskInfo.getAttemptNumber()` is 1, so we skip the infinite sleep, and >>>> return the row. >>>> >>>> And then, the unit test passes. >>>> >>>> >>>> However, with `IcebergSink`, I only see this code hit 2 times. >>>> >>>> 1. The first time this `taskInfo.getIndexOfThisSubtask() == 0 && >>>> taskInfo.getAttemptNumber() <= 0` evaluates to true, so we Sleep forever >>>> 2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1, so we >>>> return the row. >>>> >>>> There’s no third time, which makes me believe that the speculative mode >>>> is not kicking in, as the slow (infinite waiting) task is not being >>>> retried. >>>> >>>> >>>> >>>> This could mean 2 things: >>>> >>>> 1. IcebergSink not working as expected with Speculative Mode >>>> 2. The unit test needs to be adjusted for IcebergSink >>>> >>>> While reviewing the code, I found @stevenzu commit from August 2024 [1] >>>> which is related to this unit test also hanging and that adding >>>> `taskInfo.getIndexOfThisSubtask() == 0` was necessary for the unit test to >>>> pass. >>>> >>>> >>>> I am still digging deeper, but so far I haven’t been able to pinpoint >>>> the problem. Will continue on this, but there’s a possibility that this >>>> won’t be ready in time for this to be considered to be part of 1.10 >>>> release. >>>> >>>> >>>> Thanks, >>>> >>>> Rodrigo >>>> >>>> >>>> [1] https://github.com/apache/iceberg/pull/10832#discussion_r1701015316 >>>> >>>> >>>> On Tue, Jun 24, 2025 at 3:05 AM Maximilian Michels <m...@apache.org> >>>> wrote: >>>> >>>>> +1 Great proposal. >>>>> >>>>> What about moving the timeline one release ahead? There isn't any more >>>>> planned work for IcebergSink between the 1.10.0 and the 1.11.0 >>>>> release. IcebergSink already got enough exposure and we won't gain >>>>> anything from waiting longer to make it the default. Thus, I would >>>>> suggest to already deprecate FlinkSink in Iceberg 1.10.0 and promote >>>>> IcebergSink to be the default implementation. Users will have the >>>>> option to switch back to the old sink anytime via the configuration >>>>> option Rodrigo mentioned. If there are no issues, we can proceed to >>>>> remove FlinkSink in Iceberg 1.11.0. >>>>> >>>>> What do you think? >>>>> >>>>> Cheers, >>>>> Max >>>>> >>>>> >>>>> On Mon, Jun 23, 2025 at 10:15 PM Péter Váry < >>>>> peter.vary.apa...@gmail.com> wrote: >>>>> > >>>>> > +1 >>>>> > >>>>> > On Mon, Jun 23, 2025, 22:04 Őrhidi Mátyás <matyas.orh...@gmail.com> >>>>> wrote: >>>>> >> >>>>> >> +1, sounds reasonable to me >>>>> >> >>>>> >> Thanks, >>>>> >> Matyas >>>>> >> >>>>> >> On Mon, Jun 23, 2025 at 11:28 AM Rodrigo Meneses < >>>>> rmene...@gmail.com> wrote: >>>>> >>> >>>>> >>> Hi devs, >>>>> >>> >>>>> >>> >>>>> >>> I’d like to start a discussion about the current and future state >>>>> of our Flink Sink Connectors. >>>>> >>> >>>>> >>> >>>>> >>> As it stands today, we currently have 3 sink implementations: >>>>> >>> >>>>> >>> FlinkSink [1] >>>>> >>> IcebergSink [2] >>>>> >>> DynamicSink [3] >>>>> >>> >>>>> >>> >>>>> >>> FlinkSink [1] is the current and default implementation of the >>>>> Flink Sink Connector. >>>>> >>> >>>>> >>> >>>>> >>> IcebergSink [2] is another implementation of the Flink Sink >>>>> Connector which was introduced in >>>>> https://github.com/apache/iceberg/pull/10179. It leverages the latest >>>>> SinkV2 interfaces in Flink, and it offers the possibility of adding >>>>> cleanup >>>>> tasks by the way of implementing the `PostCommitTopology` interface. There >>>>> is already some work in progress to enable this functionality: >>>>> https://github.com/apache/iceberg/pull/12979 >>>>> >>> >>>>> >>> >>>>> >>> DynamicSink [3] has been recently contributed in >>>>> https://github.com/apache/iceberg/pull/13304 and it can be used to >>>>> write to any number of tables, dynamically creating and updating tables >>>>> and >>>>> dynamically updating the schema and partition spec of tables. >>>>> >>> >>>>> >>> >>>>> >>> Currently, `IcebergSink` is marked as `@Experimental` and it >>>>> already offers feature parity with `FlinkSink` (the missing RANGE >>>>> distribution was recently merged >>>>> https://github.com/apache/iceberg/pull/12071). >>>>> >>> >>>>> >>> >>>>> >>> With https://github.com/apache/iceberg/pull/11244, users have the >>>>> choice of specifying which Sink Implementation ([1] or [2]) they want to >>>>> use for Flink SQL. >>>>> >>> >>>>> >>> >>>>> >>> With all this said, we’re proposing the following for Iceberg 1.11: >>>>> >>> >>>>> >>> @Deprecate FlinkSink >>>>> >>> Promote IcebergSink from @Experimental to @PublicEvolving >>>>> >>> Make IcebergSink the default implementation in Flink SQL. >>>>> >>> >>>>> >>> >>>>> >>> Then in Iceberg 1.12 we will: >>>>> >>> >>>>> >>> >>>>> >>> Remove the `FlinkSink` implementation. >>>>> >>> Remove @PublicEvolving from IcebergSink >>>>> >>> >>>>> >>> >>>>> >>> What do you think about this plan? >>>>> >>> >>>>> >>> >>>>> >>> Thanks, >>>>> >>> >>>>> >>> Rodrigo >>>>> >>> >>>>> >>> >>>>> >>>>