I haven't seen any issues for users on the new sink. The switch was seamless.
On Fri, Jun 27, 2025 at 5:48 PM Péter Váry <peter.vary.apa...@gmail.com> wrote: > Oh.. I missed that. That's a bit better, but I'm still not sure. Do we > know of internal users who are already on the new Sink? What was the > experience there? > > Maximilian Michels <m...@apache.org> ezt írta (időpont: 2025. jún. 27., P, > 13:49): > >> Peter, I was suggesting not to remove earlier, but to switch the default >> sink implementation earlier. Is that what you mean? >> >> -Max >> >> On Thu, Jun 26, 2025 at 5:19 PM Péter Váry <peter.vary.apa...@gmail.com> >> wrote: >> >>> +1 for the more conservative removal time >>> >>> Maximilian Michels <m...@apache.org> ezt írta (időpont: 2025. jún. 26., >>> Cs, 15:54): >>> >>>> Hey Rod, >>>> >>>> IcebergSink does not currently support speculative batch execution. >>>> Modern sinks need to implement SupportsConcurrentExecutionAttempts [1] >>>> in order to work with speculative (batch) execution. This is merely a >>>> marker interface. I've verified that after adding it to IcebergSink, the >>>> test passes fine. >>>> >>>> Hey Steven, >>>> >>>> >I will reiterate one point that I mentioned before. For the Flink >>>> connector, sink is the dominant use case (compared source). Regardless of >>>> the unit test issue, it is probably better to go a little slower on this >>>> switch of sink implementation. >>>> >>>> Agreed, but I'm still a bit skeptical, that leaving IcebergSink >>>> disabled by default we will reach greater maturity. We will have a similar >>>> situation for the Iceberg 1.11.0 release, where we again need to decide >>>> whether to make the switch. The truth is, only after the sink is out and >>>> about, we can find any potential issues. Since we have the option for users >>>> to switch back to the old implementation, we at least won't be blocking >>>> users. That said, I hear your concerns and I'm perfectly fine with >>>> waiting until Iceberg 1.11.0 to make IcebergSink the default Flink sink. >>>> >>>> -Max >>>> >>>> [1] >>>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/speculative_execution/#enable-sinks-for-speculative-execution >>>> >>>> On Thu, Jun 26, 2025 at 5:38 AM Steven Wu <stevenz...@gmail.com> wrote: >>>> >>>>> I will reiterate one point that I mentioned before. For the Flink >>>>> connector, sink is the dominant use case (compared source). Regardless of >>>>> the unit test issue, it is probably better to go a little slower on this >>>>> switch of sink implementation. >>>>> >>>>> On Wed, Jun 25, 2025 at 4:22 PM Rodrigo Meneses <rmene...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hello! >>>>>> >>>>>> >>>>>> I wanted to give you folks an update about this, especially because >>>>>> of Max's suggestion of releasing this change in 1.10, as I know the >>>>>> release >>>>>> branch will be cut soon, and without this unit test passing, I don’t feel >>>>>> comfortable releasing this. >>>>>> >>>>>> >>>>>> During the implementation of this feature (which makes the >>>>>> IcebergSink the default sink for SQL), there’s a unit test that is >>>>>> failing >>>>>> that is covering Speculative Execution Mode. >>>>>> >>>>>> ` >>>>>> >>>>>> TestIcebergSpeculativeExecutionSupport.testSpeculativeExecution() >>>>>> >>>>>> ` >>>>>> >>>>>> The test is timing out. I debugged it while using `FlinkSink` as the >>>>>> sink engine, and it works >>>>>> >>>>>> With FlinkSink, in the debugger, I observed that this code: >>>>>> >>>>>> ```java >>>>>> >>>>>> if (taskInfo.getIndexOfThisSubtask() == 0 && >>>>>> taskInfo.getAttemptNumber() <= 0) { >>>>>> >>>>>> Thread.*sleep*(Integer.*MAX_VALUE*); >>>>>> >>>>>> } >>>>>> >>>>>> ``` >>>>>> >>>>>> Is being hit 3 times: >>>>>> >>>>>> 1. The first time this `taskInfo.getIndexOfThisSubtask() == 0 && >>>>>> taskInfo.getAttemptNumber() <= 0` evaluates to true, so we Sleep >>>>>> forever >>>>>> 2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1, so >>>>>> we skip the infinite sleep and we return the row >>>>>> 3. The third time (the speculative try), >>>>>> `taskInfo.getAttemptNumber()` is 1, so we skip the infinite sleep, and >>>>>> return the row. >>>>>> >>>>>> And then, the unit test passes. >>>>>> >>>>>> >>>>>> However, with `IcebergSink`, I only see this code hit 2 times. >>>>>> >>>>>> 1. The first time this `taskInfo.getIndexOfThisSubtask() == 0 && >>>>>> taskInfo.getAttemptNumber() <= 0` evaluates to true, so we Sleep >>>>>> forever >>>>>> 2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1, so >>>>>> we return the row. >>>>>> >>>>>> There’s no third time, which makes me believe that the speculative >>>>>> mode is not kicking in, as the slow (infinite waiting) task is not being >>>>>> retried. >>>>>> >>>>>> >>>>>> This could mean 2 things: >>>>>> >>>>>> 1. IcebergSink not working as expected with Speculative Mode >>>>>> 2. The unit test needs to be adjusted for IcebergSink >>>>>> >>>>>> While reviewing the code, I found @stevenzu commit from August 2024 >>>>>> [1] which is related to this unit test also hanging and that adding >>>>>> `taskInfo.getIndexOfThisSubtask() == 0` was necessary for the unit test >>>>>> to >>>>>> pass. >>>>>> >>>>>> >>>>>> I am still digging deeper, but so far I haven’t been able to pinpoint >>>>>> the problem. Will continue on this, but there’s a possibility that this >>>>>> won’t be ready in time for this to be considered to be part of 1.10 >>>>>> release. >>>>>> >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Rodrigo >>>>>> >>>>>> >>>>>> [1] >>>>>> https://github.com/apache/iceberg/pull/10832#discussion_r1701015316 >>>>>> >>>>>> On Tue, Jun 24, 2025 at 3:05 AM Maximilian Michels <m...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> +1 Great proposal. >>>>>>> >>>>>>> What about moving the timeline one release ahead? There isn't any >>>>>>> more >>>>>>> planned work for IcebergSink between the 1.10.0 and the 1.11.0 >>>>>>> release. IcebergSink already got enough exposure and we won't gain >>>>>>> anything from waiting longer to make it the default. Thus, I would >>>>>>> suggest to already deprecate FlinkSink in Iceberg 1.10.0 and promote >>>>>>> IcebergSink to be the default implementation. Users will have the >>>>>>> option to switch back to the old sink anytime via the configuration >>>>>>> option Rodrigo mentioned. If there are no issues, we can proceed to >>>>>>> remove FlinkSink in Iceberg 1.11.0. >>>>>>> >>>>>>> What do you think? >>>>>>> >>>>>>> Cheers, >>>>>>> Max >>>>>>> >>>>>>> >>>>>>> On Mon, Jun 23, 2025 at 10:15 PM Péter Váry < >>>>>>> peter.vary.apa...@gmail.com> wrote: >>>>>>> > >>>>>>> > +1 >>>>>>> > >>>>>>> > On Mon, Jun 23, 2025, 22:04 Őrhidi Mátyás <matyas.orh...@gmail.com> >>>>>>> wrote: >>>>>>> >> >>>>>>> >> +1, sounds reasonable to me >>>>>>> >> >>>>>>> >> Thanks, >>>>>>> >> Matyas >>>>>>> >> >>>>>>> >> On Mon, Jun 23, 2025 at 11:28 AM Rodrigo Meneses < >>>>>>> rmene...@gmail.com> wrote: >>>>>>> >>> >>>>>>> >>> Hi devs, >>>>>>> >>> >>>>>>> >>> >>>>>>> >>> I’d like to start a discussion about the current and future >>>>>>> state of our Flink Sink Connectors. >>>>>>> >>> >>>>>>> >>> >>>>>>> >>> As it stands today, we currently have 3 sink implementations: >>>>>>> >>> >>>>>>> >>> FlinkSink [1] >>>>>>> >>> IcebergSink [2] >>>>>>> >>> DynamicSink [3] >>>>>>> >>> >>>>>>> >>> >>>>>>> >>> FlinkSink [1] is the current and default implementation of the >>>>>>> Flink Sink Connector. >>>>>>> >>> >>>>>>> >>> >>>>>>> >>> IcebergSink [2] is another implementation of the Flink Sink >>>>>>> Connector which was introduced in >>>>>>> https://github.com/apache/iceberg/pull/10179. It leverages the >>>>>>> latest SinkV2 interfaces in Flink, and it offers the possibility of >>>>>>> adding >>>>>>> cleanup tasks by the way of implementing the `PostCommitTopology` >>>>>>> interface. There is already some work in progress to enable this >>>>>>> functionality: https://github.com/apache/iceberg/pull/12979 >>>>>>> >>> >>>>>>> >>> >>>>>>> >>> DynamicSink [3] has been recently contributed in >>>>>>> https://github.com/apache/iceberg/pull/13304 and it can be used to >>>>>>> write to any number of tables, dynamically creating and updating tables >>>>>>> and >>>>>>> dynamically updating the schema and partition spec of tables. >>>>>>> >>> >>>>>>> >>> >>>>>>> >>> Currently, `IcebergSink` is marked as `@Experimental` and it >>>>>>> already offers feature parity with `FlinkSink` (the missing RANGE >>>>>>> distribution was recently merged >>>>>>> https://github.com/apache/iceberg/pull/12071). >>>>>>> >>> >>>>>>> >>> >>>>>>> >>> With https://github.com/apache/iceberg/pull/11244, users have >>>>>>> the choice of specifying which Sink Implementation ([1] or [2]) they >>>>>>> want >>>>>>> to use for Flink SQL. >>>>>>> >>> >>>>>>> >>> >>>>>>> >>> With all this said, we’re proposing the following for Iceberg >>>>>>> 1.11: >>>>>>> >>> >>>>>>> >>> @Deprecate FlinkSink >>>>>>> >>> Promote IcebergSink from @Experimental to @PublicEvolving >>>>>>> >>> Make IcebergSink the default implementation in Flink SQL. >>>>>>> >>> >>>>>>> >>> >>>>>>> >>> Then in Iceberg 1.12 we will: >>>>>>> >>> >>>>>>> >>> >>>>>>> >>> Remove the `FlinkSink` implementation. >>>>>>> >>> Remove @PublicEvolving from IcebergSink >>>>>>> >>> >>>>>>> >>> >>>>>>> >>> What do you think about this plan? >>>>>>> >>> >>>>>>> >>> >>>>>>> >>> Thanks, >>>>>>> >>> >>>>>>> >>> Rodrigo >>>>>>> >>> >>>>>>> >>> >>>>>>> >>>>>>