Ok! Looks like we reached a decision together. Let's move forward with implementing this in 1.11.0 then. I will start raising PR towards that objective. Thanks again for all your valuable input!
On Mon, Jun 30, 2025 at 2:59 AM Maximilian Michels <m...@apache.org> wrote: > We have used IcebergSink for about 6 months on a significant number of > pipelines. That's all I can say. > > But I understand the need for caution. Let's make an effort to collect > more data on IcebergSink until the 1.11.0 release and make IcebergSink the > default then, if we don't encounter blockers. > > On Mon, Jun 30, 2025 at 11:42 AM Péter Váry <peter.vary.apa...@gmail.com> > wrote: > >> Is this 5-10-100-1000 different use-case we have seen the migration for? >> Also, it would be good to see if someone in the community has some >> experience too. >> >> Maximilian Michels <m...@apache.org> ezt írta (időpont: 2025. jún. 30., >> H, 11:17): >> >>> I haven't seen any issues for users on the new sink. The switch was >>> seamless. >>> >>> On Fri, Jun 27, 2025 at 5:48 PM Péter Váry <peter.vary.apa...@gmail.com> >>> wrote: >>> >>>> Oh.. I missed that. That's a bit better, but I'm still not sure. Do we >>>> know of internal users who are already on the new Sink? What was the >>>> experience there? >>>> >>>> Maximilian Michels <m...@apache.org> ezt írta (időpont: 2025. jún. 27., >>>> P, 13:49): >>>> >>>>> Peter, I was suggesting not to remove earlier, but to switch the >>>>> default sink implementation earlier. Is that what you mean? >>>>> >>>>> -Max >>>>> >>>>> On Thu, Jun 26, 2025 at 5:19 PM Péter Váry < >>>>> peter.vary.apa...@gmail.com> wrote: >>>>> >>>>>> +1 for the more conservative removal time >>>>>> >>>>>> Maximilian Michels <m...@apache.org> ezt írta (időpont: 2025. jún. >>>>>> 26., Cs, 15:54): >>>>>> >>>>>>> Hey Rod, >>>>>>> >>>>>>> IcebergSink does not currently support speculative batch execution. >>>>>>> Modern sinks need to implement SupportsConcurrentExecutionAttempts [1] >>>>>>> in order to work with speculative (batch) execution. This is merely a >>>>>>> marker interface. I've verified that after adding it to IcebergSink, the >>>>>>> test passes fine. >>>>>>> >>>>>>> Hey Steven, >>>>>>> >>>>>>> >I will reiterate one point that I mentioned before. For the Flink >>>>>>> connector, sink is the dominant use case (compared source). Regardless >>>>>>> of >>>>>>> the unit test issue, it is probably better to go a little slower on this >>>>>>> switch of sink implementation. >>>>>>> >>>>>>> Agreed, but I'm still a bit skeptical, that leaving IcebergSink >>>>>>> disabled by default we will reach greater maturity. We will have a >>>>>>> similar >>>>>>> situation for the Iceberg 1.11.0 release, where we again need to decide >>>>>>> whether to make the switch. The truth is, only after the sink is out and >>>>>>> about, we can find any potential issues. Since we have the option for >>>>>>> users >>>>>>> to switch back to the old implementation, we at least won't be blocking >>>>>>> users. That said, I hear your concerns and I'm perfectly fine with >>>>>>> waiting until Iceberg 1.11.0 to make IcebergSink the default Flink sink. >>>>>>> >>>>>>> -Max >>>>>>> >>>>>>> [1] >>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/speculative_execution/#enable-sinks-for-speculative-execution >>>>>>> >>>>>>> On Thu, Jun 26, 2025 at 5:38 AM Steven Wu <stevenz...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> I will reiterate one point that I mentioned before. For the Flink >>>>>>>> connector, sink is the dominant use case (compared source). Regardless >>>>>>>> of >>>>>>>> the unit test issue, it is probably better to go a little slower on >>>>>>>> this >>>>>>>> switch of sink implementation. >>>>>>>> >>>>>>>> On Wed, Jun 25, 2025 at 4:22 PM Rodrigo Meneses <rmene...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hello! >>>>>>>>> >>>>>>>>> >>>>>>>>> I wanted to give you folks an update about this, especially >>>>>>>>> because of Max's suggestion of releasing this change in 1.10, as I >>>>>>>>> know the >>>>>>>>> release branch will be cut soon, and without this unit test passing, I >>>>>>>>> don’t feel comfortable releasing this. >>>>>>>>> >>>>>>>>> >>>>>>>>> During the implementation of this feature (which makes the >>>>>>>>> IcebergSink the default sink for SQL), there’s a unit test that is >>>>>>>>> failing >>>>>>>>> that is covering Speculative Execution Mode. >>>>>>>>> >>>>>>>>> ` >>>>>>>>> >>>>>>>>> TestIcebergSpeculativeExecutionSupport.testSpeculativeExecution() >>>>>>>>> >>>>>>>>> ` >>>>>>>>> >>>>>>>>> The test is timing out. I debugged it while using `FlinkSink` as >>>>>>>>> the sink engine, and it works >>>>>>>>> >>>>>>>>> With FlinkSink, in the debugger, I observed that this code: >>>>>>>>> >>>>>>>>> ```java >>>>>>>>> >>>>>>>>> if (taskInfo.getIndexOfThisSubtask() == 0 && >>>>>>>>> taskInfo.getAttemptNumber() <= 0) { >>>>>>>>> >>>>>>>>> Thread.*sleep*(Integer.*MAX_VALUE*); >>>>>>>>> >>>>>>>>> } >>>>>>>>> >>>>>>>>> ``` >>>>>>>>> >>>>>>>>> Is being hit 3 times: >>>>>>>>> >>>>>>>>> 1. The first time this `taskInfo.getIndexOfThisSubtask() == 0 >>>>>>>>> && taskInfo.getAttemptNumber() <= 0` evaluates to true, so we >>>>>>>>> Sleep forever >>>>>>>>> 2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1, >>>>>>>>> so we skip the infinite sleep and we return the row >>>>>>>>> 3. The third time (the speculative try), >>>>>>>>> `taskInfo.getAttemptNumber()` is 1, so we skip the infinite sleep, >>>>>>>>> and >>>>>>>>> return the row. >>>>>>>>> >>>>>>>>> And then, the unit test passes. >>>>>>>>> >>>>>>>>> >>>>>>>>> However, with `IcebergSink`, I only see this code hit 2 times. >>>>>>>>> >>>>>>>>> 1. The first time this `taskInfo.getIndexOfThisSubtask() == 0 >>>>>>>>> && taskInfo.getAttemptNumber() <= 0` evaluates to true, so we >>>>>>>>> Sleep forever >>>>>>>>> 2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1, >>>>>>>>> so we return the row. >>>>>>>>> >>>>>>>>> There’s no third time, which makes me believe that the speculative >>>>>>>>> mode is not kicking in, as the slow (infinite waiting) task is not >>>>>>>>> being >>>>>>>>> retried. >>>>>>>>> >>>>>>>>> >>>>>>>>> This could mean 2 things: >>>>>>>>> >>>>>>>>> 1. IcebergSink not working as expected with Speculative Mode >>>>>>>>> 2. The unit test needs to be adjusted for IcebergSink >>>>>>>>> >>>>>>>>> While reviewing the code, I found @stevenzu commit from August >>>>>>>>> 2024 [1] which is related to this unit test also hanging and that >>>>>>>>> adding >>>>>>>>> `taskInfo.getIndexOfThisSubtask() == 0` was necessary for the unit >>>>>>>>> test to >>>>>>>>> pass. >>>>>>>>> >>>>>>>>> >>>>>>>>> I am still digging deeper, but so far I haven’t been able to >>>>>>>>> pinpoint the problem. Will continue on this, but there’s a >>>>>>>>> possibility that >>>>>>>>> this won’t be ready in time for this to be considered to be part of >>>>>>>>> 1.10 >>>>>>>>> release. >>>>>>>>> >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> >>>>>>>>> Rodrigo >>>>>>>>> >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://github.com/apache/iceberg/pull/10832#discussion_r1701015316 >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Jun 24, 2025 at 3:05 AM Maximilian Michels <m...@apache.org> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> +1 Great proposal. >>>>>>>>>> >>>>>>>>>> What about moving the timeline one release ahead? There isn't any >>>>>>>>>> more >>>>>>>>>> planned work for IcebergSink between the 1.10.0 and the 1.11.0 >>>>>>>>>> release. IcebergSink already got enough exposure and we won't gain >>>>>>>>>> anything from waiting longer to make it the default. Thus, I would >>>>>>>>>> suggest to already deprecate FlinkSink in Iceberg 1.10.0 and >>>>>>>>>> promote >>>>>>>>>> IcebergSink to be the default implementation. Users will have the >>>>>>>>>> option to switch back to the old sink anytime via the >>>>>>>>>> configuration >>>>>>>>>> option Rodrigo mentioned. If there are no issues, we can proceed >>>>>>>>>> to >>>>>>>>>> remove FlinkSink in Iceberg 1.11.0. >>>>>>>>>> >>>>>>>>>> What do you think? >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Max >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, Jun 23, 2025 at 10:15 PM Péter Váry < >>>>>>>>>> peter.vary.apa...@gmail.com> wrote: >>>>>>>>>> > >>>>>>>>>> > +1 >>>>>>>>>> > >>>>>>>>>> > On Mon, Jun 23, 2025, 22:04 Őrhidi Mátyás < >>>>>>>>>> matyas.orh...@gmail.com> wrote: >>>>>>>>>> >> >>>>>>>>>> >> +1, sounds reasonable to me >>>>>>>>>> >> >>>>>>>>>> >> Thanks, >>>>>>>>>> >> Matyas >>>>>>>>>> >> >>>>>>>>>> >> On Mon, Jun 23, 2025 at 11:28 AM Rodrigo Meneses < >>>>>>>>>> rmene...@gmail.com> wrote: >>>>>>>>>> >>> >>>>>>>>>> >>> Hi devs, >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> I’d like to start a discussion about the current and future >>>>>>>>>> state of our Flink Sink Connectors. >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> As it stands today, we currently have 3 sink implementations: >>>>>>>>>> >>> >>>>>>>>>> >>> FlinkSink [1] >>>>>>>>>> >>> IcebergSink [2] >>>>>>>>>> >>> DynamicSink [3] >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> FlinkSink [1] is the current and default implementation of >>>>>>>>>> the Flink Sink Connector. >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> IcebergSink [2] is another implementation of the Flink Sink >>>>>>>>>> Connector which was introduced in >>>>>>>>>> https://github.com/apache/iceberg/pull/10179. It leverages the >>>>>>>>>> latest SinkV2 interfaces in Flink, and it offers the possibility of >>>>>>>>>> adding >>>>>>>>>> cleanup tasks by the way of implementing the `PostCommitTopology` >>>>>>>>>> interface. There is already some work in progress to enable this >>>>>>>>>> functionality: https://github.com/apache/iceberg/pull/12979 >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> DynamicSink [3] has been recently contributed in >>>>>>>>>> https://github.com/apache/iceberg/pull/13304 and it can be used >>>>>>>>>> to write to any number of tables, dynamically creating and updating >>>>>>>>>> tables >>>>>>>>>> and dynamically updating the schema and partition spec of tables. >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> Currently, `IcebergSink` is marked as `@Experimental` and it >>>>>>>>>> already offers feature parity with `FlinkSink` (the missing RANGE >>>>>>>>>> distribution was recently merged >>>>>>>>>> https://github.com/apache/iceberg/pull/12071). >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> With https://github.com/apache/iceberg/pull/11244, users >>>>>>>>>> have the choice of specifying which Sink Implementation ([1] or [2]) >>>>>>>>>> they >>>>>>>>>> want to use for Flink SQL. >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> With all this said, we’re proposing the following for Iceberg >>>>>>>>>> 1.11: >>>>>>>>>> >>> >>>>>>>>>> >>> @Deprecate FlinkSink >>>>>>>>>> >>> Promote IcebergSink from @Experimental to @PublicEvolving >>>>>>>>>> >>> Make IcebergSink the default implementation in Flink SQL. >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> Then in Iceberg 1.12 we will: >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> Remove the `FlinkSink` implementation. >>>>>>>>>> >>> Remove @PublicEvolving from IcebergSink >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> What do you think about this plan? >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>> Thanks, >>>>>>>>>> >>> >>>>>>>>>> >>> Rodrigo >>>>>>>>>> >>> >>>>>>>>>> >>> >>>>>>>>>> >>>>>>>>>