Is this 5-10-100-1000 different use-case we have seen the migration for? Also, it would be good to see if someone in the community has some experience too.
Maximilian Michels <m...@apache.org> ezt írta (időpont: 2025. jún. 30., H, 11:17): > I haven't seen any issues for users on the new sink. The switch was > seamless. > > On Fri, Jun 27, 2025 at 5:48 PM Péter Váry <peter.vary.apa...@gmail.com> > wrote: > >> Oh.. I missed that. That's a bit better, but I'm still not sure. Do we >> know of internal users who are already on the new Sink? What was the >> experience there? >> >> Maximilian Michels <m...@apache.org> ezt írta (időpont: 2025. jún. 27., >> P, 13:49): >> >>> Peter, I was suggesting not to remove earlier, but to switch the default >>> sink implementation earlier. Is that what you mean? >>> >>> -Max >>> >>> On Thu, Jun 26, 2025 at 5:19 PM Péter Váry <peter.vary.apa...@gmail.com> >>> wrote: >>> >>>> +1 for the more conservative removal time >>>> >>>> Maximilian Michels <m...@apache.org> ezt írta (időpont: 2025. jún. 26., >>>> Cs, 15:54): >>>> >>>>> Hey Rod, >>>>> >>>>> IcebergSink does not currently support speculative batch execution. >>>>> Modern sinks need to implement SupportsConcurrentExecutionAttempts [1] >>>>> in order to work with speculative (batch) execution. This is merely a >>>>> marker interface. I've verified that after adding it to IcebergSink, the >>>>> test passes fine. >>>>> >>>>> Hey Steven, >>>>> >>>>> >I will reiterate one point that I mentioned before. For the Flink >>>>> connector, sink is the dominant use case (compared source). Regardless of >>>>> the unit test issue, it is probably better to go a little slower on this >>>>> switch of sink implementation. >>>>> >>>>> Agreed, but I'm still a bit skeptical, that leaving IcebergSink >>>>> disabled by default we will reach greater maturity. We will have a similar >>>>> situation for the Iceberg 1.11.0 release, where we again need to decide >>>>> whether to make the switch. The truth is, only after the sink is out and >>>>> about, we can find any potential issues. Since we have the option for >>>>> users >>>>> to switch back to the old implementation, we at least won't be blocking >>>>> users. That said, I hear your concerns and I'm perfectly fine with >>>>> waiting until Iceberg 1.11.0 to make IcebergSink the default Flink sink. >>>>> >>>>> -Max >>>>> >>>>> [1] >>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/speculative_execution/#enable-sinks-for-speculative-execution >>>>> >>>>> On Thu, Jun 26, 2025 at 5:38 AM Steven Wu <stevenz...@gmail.com> >>>>> wrote: >>>>> >>>>>> I will reiterate one point that I mentioned before. For the Flink >>>>>> connector, sink is the dominant use case (compared source). Regardless of >>>>>> the unit test issue, it is probably better to go a little slower on this >>>>>> switch of sink implementation. >>>>>> >>>>>> On Wed, Jun 25, 2025 at 4:22 PM Rodrigo Meneses <rmene...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hello! >>>>>>> >>>>>>> >>>>>>> I wanted to give you folks an update about this, especially because >>>>>>> of Max's suggestion of releasing this change in 1.10, as I know the >>>>>>> release >>>>>>> branch will be cut soon, and without this unit test passing, I don’t >>>>>>> feel >>>>>>> comfortable releasing this. >>>>>>> >>>>>>> >>>>>>> During the implementation of this feature (which makes the >>>>>>> IcebergSink the default sink for SQL), there’s a unit test that is >>>>>>> failing >>>>>>> that is covering Speculative Execution Mode. >>>>>>> >>>>>>> ` >>>>>>> >>>>>>> TestIcebergSpeculativeExecutionSupport.testSpeculativeExecution() >>>>>>> >>>>>>> ` >>>>>>> >>>>>>> The test is timing out. I debugged it while using `FlinkSink` as the >>>>>>> sink engine, and it works >>>>>>> >>>>>>> With FlinkSink, in the debugger, I observed that this code: >>>>>>> >>>>>>> ```java >>>>>>> >>>>>>> if (taskInfo.getIndexOfThisSubtask() == 0 && >>>>>>> taskInfo.getAttemptNumber() <= 0) { >>>>>>> >>>>>>> Thread.*sleep*(Integer.*MAX_VALUE*); >>>>>>> >>>>>>> } >>>>>>> >>>>>>> ``` >>>>>>> >>>>>>> Is being hit 3 times: >>>>>>> >>>>>>> 1. The first time this `taskInfo.getIndexOfThisSubtask() == 0 && >>>>>>> taskInfo.getAttemptNumber() <= 0` evaluates to true, so we Sleep >>>>>>> forever >>>>>>> 2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1, so >>>>>>> we skip the infinite sleep and we return the row >>>>>>> 3. The third time (the speculative try), >>>>>>> `taskInfo.getAttemptNumber()` is 1, so we skip the infinite sleep, >>>>>>> and >>>>>>> return the row. >>>>>>> >>>>>>> And then, the unit test passes. >>>>>>> >>>>>>> >>>>>>> However, with `IcebergSink`, I only see this code hit 2 times. >>>>>>> >>>>>>> 1. The first time this `taskInfo.getIndexOfThisSubtask() == 0 && >>>>>>> taskInfo.getAttemptNumber() <= 0` evaluates to true, so we Sleep >>>>>>> forever >>>>>>> 2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1, so >>>>>>> we return the row. >>>>>>> >>>>>>> There’s no third time, which makes me believe that the speculative >>>>>>> mode is not kicking in, as the slow (infinite waiting) task is not being >>>>>>> retried. >>>>>>> >>>>>>> >>>>>>> This could mean 2 things: >>>>>>> >>>>>>> 1. IcebergSink not working as expected with Speculative Mode >>>>>>> 2. The unit test needs to be adjusted for IcebergSink >>>>>>> >>>>>>> While reviewing the code, I found @stevenzu commit from August 2024 >>>>>>> [1] which is related to this unit test also hanging and that adding >>>>>>> `taskInfo.getIndexOfThisSubtask() == 0` was necessary for the unit test >>>>>>> to >>>>>>> pass. >>>>>>> >>>>>>> >>>>>>> I am still digging deeper, but so far I haven’t been able to >>>>>>> pinpoint the problem. Will continue on this, but there’s a possibility >>>>>>> that >>>>>>> this won’t be ready in time for this to be considered to be part of 1.10 >>>>>>> release. >>>>>>> >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Rodrigo >>>>>>> >>>>>>> >>>>>>> [1] >>>>>>> https://github.com/apache/iceberg/pull/10832#discussion_r1701015316 >>>>>>> >>>>>>> On Tue, Jun 24, 2025 at 3:05 AM Maximilian Michels <m...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> +1 Great proposal. >>>>>>>> >>>>>>>> What about moving the timeline one release ahead? There isn't any >>>>>>>> more >>>>>>>> planned work for IcebergSink between the 1.10.0 and the 1.11.0 >>>>>>>> release. IcebergSink already got enough exposure and we won't gain >>>>>>>> anything from waiting longer to make it the default. Thus, I would >>>>>>>> suggest to already deprecate FlinkSink in Iceberg 1.10.0 and promote >>>>>>>> IcebergSink to be the default implementation. Users will have the >>>>>>>> option to switch back to the old sink anytime via the configuration >>>>>>>> option Rodrigo mentioned. If there are no issues, we can proceed to >>>>>>>> remove FlinkSink in Iceberg 1.11.0. >>>>>>>> >>>>>>>> What do you think? >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Max >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Jun 23, 2025 at 10:15 PM Péter Váry < >>>>>>>> peter.vary.apa...@gmail.com> wrote: >>>>>>>> > >>>>>>>> > +1 >>>>>>>> > >>>>>>>> > On Mon, Jun 23, 2025, 22:04 Őrhidi Mátyás < >>>>>>>> matyas.orh...@gmail.com> wrote: >>>>>>>> >> >>>>>>>> >> +1, sounds reasonable to me >>>>>>>> >> >>>>>>>> >> Thanks, >>>>>>>> >> Matyas >>>>>>>> >> >>>>>>>> >> On Mon, Jun 23, 2025 at 11:28 AM Rodrigo Meneses < >>>>>>>> rmene...@gmail.com> wrote: >>>>>>>> >>> >>>>>>>> >>> Hi devs, >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> I’d like to start a discussion about the current and future >>>>>>>> state of our Flink Sink Connectors. >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> As it stands today, we currently have 3 sink implementations: >>>>>>>> >>> >>>>>>>> >>> FlinkSink [1] >>>>>>>> >>> IcebergSink [2] >>>>>>>> >>> DynamicSink [3] >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> FlinkSink [1] is the current and default implementation of the >>>>>>>> Flink Sink Connector. >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> IcebergSink [2] is another implementation of the Flink Sink >>>>>>>> Connector which was introduced in >>>>>>>> https://github.com/apache/iceberg/pull/10179. It leverages the >>>>>>>> latest SinkV2 interfaces in Flink, and it offers the possibility of >>>>>>>> adding >>>>>>>> cleanup tasks by the way of implementing the `PostCommitTopology` >>>>>>>> interface. There is already some work in progress to enable this >>>>>>>> functionality: https://github.com/apache/iceberg/pull/12979 >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> DynamicSink [3] has been recently contributed in >>>>>>>> https://github.com/apache/iceberg/pull/13304 and it can be used to >>>>>>>> write to any number of tables, dynamically creating and updating >>>>>>>> tables and >>>>>>>> dynamically updating the schema and partition spec of tables. >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> Currently, `IcebergSink` is marked as `@Experimental` and it >>>>>>>> already offers feature parity with `FlinkSink` (the missing RANGE >>>>>>>> distribution was recently merged >>>>>>>> https://github.com/apache/iceberg/pull/12071). >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> With https://github.com/apache/iceberg/pull/11244, users have >>>>>>>> the choice of specifying which Sink Implementation ([1] or [2]) they >>>>>>>> want >>>>>>>> to use for Flink SQL. >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> With all this said, we’re proposing the following for Iceberg >>>>>>>> 1.11: >>>>>>>> >>> >>>>>>>> >>> @Deprecate FlinkSink >>>>>>>> >>> Promote IcebergSink from @Experimental to @PublicEvolving >>>>>>>> >>> Make IcebergSink the default implementation in Flink SQL. >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> Then in Iceberg 1.12 we will: >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> Remove the `FlinkSink` implementation. >>>>>>>> >>> Remove @PublicEvolving from IcebergSink >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> What do you think about this plan? >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>> Thanks, >>>>>>>> >>> >>>>>>>> >>> Rodrigo >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> >>>>>>>