Ok! Looks like we reached a decision together. Let's move forward with
implementing this in 1.11.0 then.
I will start raising PR towards that objective.
Thanks again for all your valuable input!


On Mon, Jun 30, 2025 at 2:59 AM Maximilian Michels <m...@apache.org> wrote:

> We have used IcebergSink for about 6 months on a significant number of
> pipelines. That's all I can say.
>
> But I understand the need for caution. Let's make an effort to collect
> more data on IcebergSink until the 1.11.0 release and make IcebergSink the
> default then, if we don't encounter blockers.
>
> On Mon, Jun 30, 2025 at 11:42 AM Péter Váry <peter.vary.apa...@gmail.com>
> wrote:
>
>> Is this 5-10-100-1000 different use-case we have seen the migration for?
>> Also, it would be good to see if someone in the community has some
>> experience too.
>>
>> Maximilian Michels <m...@apache.org> ezt írta (időpont: 2025. jún. 30.,
>> H, 11:17):
>>
>>> I haven't seen any issues for users on the new sink. The switch was
>>> seamless.
>>>
>>> On Fri, Jun 27, 2025 at 5:48 PM Péter Váry <peter.vary.apa...@gmail.com>
>>> wrote:
>>>
>>>> Oh.. I missed that. That's a bit better, but I'm still not sure. Do we
>>>> know of internal users who are already on the new Sink? What was the
>>>> experience there?
>>>>
>>>> Maximilian Michels <m...@apache.org> ezt írta (időpont: 2025. jún. 27.,
>>>> P, 13:49):
>>>>
>>>>> Peter, I was suggesting not to remove earlier, but to switch the
>>>>> default sink implementation earlier. Is that what you mean?
>>>>>
>>>>> -Max
>>>>>
>>>>> On Thu, Jun 26, 2025 at 5:19 PM Péter Váry <
>>>>> peter.vary.apa...@gmail.com> wrote:
>>>>>
>>>>>> +1 for the more conservative removal time
>>>>>>
>>>>>> Maximilian Michels <m...@apache.org> ezt írta (időpont: 2025. jún.
>>>>>> 26., Cs, 15:54):
>>>>>>
>>>>>>> Hey Rod,
>>>>>>>
>>>>>>> IcebergSink does not currently support speculative batch execution.
>>>>>>> Modern sinks need to implement SupportsConcurrentExecutionAttempts [1]
>>>>>>> in order to work with speculative (batch) execution. This is merely a
>>>>>>> marker interface. I've verified that after adding it to IcebergSink, the
>>>>>>> test passes fine.
>>>>>>>
>>>>>>> Hey Steven,
>>>>>>>
>>>>>>> >I will reiterate one point that I mentioned before. For the Flink
>>>>>>> connector, sink is the dominant use case (compared source). Regardless 
>>>>>>> of
>>>>>>> the unit test issue, it is probably better to go a little slower on this
>>>>>>> switch of sink implementation.
>>>>>>>
>>>>>>> Agreed, but I'm still a bit skeptical, that leaving IcebergSink
>>>>>>> disabled by default we will reach greater maturity. We will have a 
>>>>>>> similar
>>>>>>> situation for the Iceberg 1.11.0 release, where we again need to decide
>>>>>>> whether to make the switch. The truth is, only after the sink is out and
>>>>>>> about, we can find any potential issues. Since we have the option for 
>>>>>>> users
>>>>>>> to switch back to the old implementation, we at least won't be blocking
>>>>>>> users. That said, I hear your concerns and I'm perfectly fine with
>>>>>>> waiting until Iceberg 1.11.0 to make IcebergSink the default Flink sink.
>>>>>>>
>>>>>>> -Max
>>>>>>>
>>>>>>> [1]
>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/speculative_execution/#enable-sinks-for-speculative-execution
>>>>>>>
>>>>>>> On Thu, Jun 26, 2025 at 5:38 AM Steven Wu <stevenz...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I will reiterate one point that I mentioned before. For the Flink
>>>>>>>> connector, sink is the dominant use case (compared source). Regardless 
>>>>>>>> of
>>>>>>>> the unit test issue, it is probably better to go a little slower on 
>>>>>>>> this
>>>>>>>> switch of sink implementation.
>>>>>>>>
>>>>>>>> On Wed, Jun 25, 2025 at 4:22 PM Rodrigo Meneses <rmene...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hello!
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I wanted to give you folks an update about this, especially
>>>>>>>>> because of Max's suggestion of releasing this change in 1.10, as I 
>>>>>>>>> know the
>>>>>>>>> release branch will be cut soon, and without this unit test passing, I
>>>>>>>>> don’t feel comfortable releasing this.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> During the implementation of this feature (which makes the
>>>>>>>>> IcebergSink the default sink for SQL), there’s a unit test that is 
>>>>>>>>> failing
>>>>>>>>> that is covering Speculative Execution Mode.
>>>>>>>>>
>>>>>>>>> `
>>>>>>>>>
>>>>>>>>> TestIcebergSpeculativeExecutionSupport.testSpeculativeExecution()
>>>>>>>>>
>>>>>>>>> `
>>>>>>>>>
>>>>>>>>> The test is timing out. I debugged it while using `FlinkSink` as
>>>>>>>>> the sink engine, and it works
>>>>>>>>>
>>>>>>>>> With FlinkSink, in the debugger, I observed that this code:
>>>>>>>>>
>>>>>>>>> ```java
>>>>>>>>>
>>>>>>>>> if (taskInfo.getIndexOfThisSubtask() == 0 &&
>>>>>>>>> taskInfo.getAttemptNumber() <= 0) {
>>>>>>>>>
>>>>>>>>>   Thread.*sleep*(Integer.*MAX_VALUE*);
>>>>>>>>>
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> ```
>>>>>>>>>
>>>>>>>>> Is being hit 3 times:
>>>>>>>>>
>>>>>>>>>    1. The first time this `taskInfo.getIndexOfThisSubtask() == 0
>>>>>>>>>    && taskInfo.getAttemptNumber() <= 0` evaluates to true, so we 
>>>>>>>>> Sleep forever
>>>>>>>>>    2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1,
>>>>>>>>>    so we skip the infinite sleep and we return the row
>>>>>>>>>    3. The third time (the speculative try),
>>>>>>>>>    `taskInfo.getAttemptNumber()` is 1, so we skip the infinite sleep, 
>>>>>>>>> and
>>>>>>>>>    return the row.
>>>>>>>>>
>>>>>>>>> And then, the unit test passes.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> However, with `IcebergSink`, I only see this code hit 2 times.
>>>>>>>>>
>>>>>>>>>    1. The first time this `taskInfo.getIndexOfThisSubtask() == 0
>>>>>>>>>    && taskInfo.getAttemptNumber() <= 0` evaluates to true, so we 
>>>>>>>>> Sleep forever
>>>>>>>>>    2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1,
>>>>>>>>>    so we return the row.
>>>>>>>>>
>>>>>>>>> There’s no third time, which makes me believe that the speculative
>>>>>>>>> mode is not kicking in, as the slow (infinite waiting) task is not 
>>>>>>>>> being
>>>>>>>>> retried.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> This could mean 2 things:
>>>>>>>>>
>>>>>>>>>    1. IcebergSink not working as expected with Speculative Mode
>>>>>>>>>    2. The unit test needs to be adjusted for IcebergSink
>>>>>>>>>
>>>>>>>>> While reviewing the code, I found @stevenzu commit from August
>>>>>>>>> 2024 [1] which is related to this unit test also hanging and that 
>>>>>>>>> adding
>>>>>>>>> `taskInfo.getIndexOfThisSubtask() == 0` was necessary for the unit 
>>>>>>>>> test to
>>>>>>>>> pass.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I am still digging deeper, but so far I haven’t been able to
>>>>>>>>> pinpoint the problem. Will continue on this, but there’s a 
>>>>>>>>> possibility that
>>>>>>>>> this won’t be ready in time for this to be considered to be part of 
>>>>>>>>> 1.10
>>>>>>>>> release.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>> Rodrigo
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://github.com/apache/iceberg/pull/10832#discussion_r1701015316
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Jun 24, 2025 at 3:05 AM Maximilian Michels <m...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> +1 Great proposal.
>>>>>>>>>>
>>>>>>>>>> What about moving the timeline one release ahead? There isn't any
>>>>>>>>>> more
>>>>>>>>>> planned work for IcebergSink between the 1.10.0 and the 1.11.0
>>>>>>>>>> release. IcebergSink already got enough exposure and we won't gain
>>>>>>>>>> anything from waiting longer to make it the default. Thus, I would
>>>>>>>>>> suggest to already deprecate FlinkSink in Iceberg 1.10.0 and
>>>>>>>>>> promote
>>>>>>>>>> IcebergSink to be the default implementation. Users will have the
>>>>>>>>>> option to switch back to the old sink anytime via the
>>>>>>>>>> configuration
>>>>>>>>>> option Rodrigo mentioned. If there are no issues, we can proceed
>>>>>>>>>> to
>>>>>>>>>> remove FlinkSink in Iceberg 1.11.0.
>>>>>>>>>>
>>>>>>>>>> What do you think?
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Max
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Jun 23, 2025 at 10:15 PM Péter Váry <
>>>>>>>>>> peter.vary.apa...@gmail.com> wrote:
>>>>>>>>>> >
>>>>>>>>>> > +1
>>>>>>>>>> >
>>>>>>>>>> > On Mon, Jun 23, 2025, 22:04 Őrhidi Mátyás <
>>>>>>>>>> matyas.orh...@gmail.com> wrote:
>>>>>>>>>> >>
>>>>>>>>>> >> +1, sounds reasonable to me
>>>>>>>>>> >>
>>>>>>>>>> >> Thanks,
>>>>>>>>>> >> Matyas
>>>>>>>>>> >>
>>>>>>>>>> >> On Mon, Jun 23, 2025 at 11:28 AM Rodrigo Meneses <
>>>>>>>>>> rmene...@gmail.com> wrote:
>>>>>>>>>> >>>
>>>>>>>>>> >>> Hi devs,
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>> >>> I’d like to start a discussion about the current and future
>>>>>>>>>> state of our Flink Sink Connectors.
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>> >>> As it stands today, we currently have 3 sink implementations:
>>>>>>>>>> >>>
>>>>>>>>>> >>> FlinkSink [1]
>>>>>>>>>> >>> IcebergSink [2]
>>>>>>>>>> >>> DynamicSink [3]
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>> >>> FlinkSink [1] is the current and default implementation of
>>>>>>>>>> the Flink Sink Connector.
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>> >>> IcebergSink [2] is another implementation of the Flink Sink
>>>>>>>>>> Connector which was introduced in
>>>>>>>>>> https://github.com/apache/iceberg/pull/10179. It leverages the
>>>>>>>>>> latest SinkV2 interfaces in Flink, and it offers the possibility of 
>>>>>>>>>> adding
>>>>>>>>>> cleanup tasks by the way of implementing the `PostCommitTopology`
>>>>>>>>>> interface. There is already some work in progress to enable this
>>>>>>>>>> functionality: https://github.com/apache/iceberg/pull/12979
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>> >>> DynamicSink [3] has been recently contributed in
>>>>>>>>>> https://github.com/apache/iceberg/pull/13304 and it can be used
>>>>>>>>>> to write to any number of tables, dynamically creating and updating 
>>>>>>>>>> tables
>>>>>>>>>> and dynamically updating the schema and partition spec of tables.
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>> >>> Currently, `IcebergSink` is marked as `@Experimental` and it
>>>>>>>>>> already offers feature parity with `FlinkSink` (the missing RANGE
>>>>>>>>>> distribution was recently merged
>>>>>>>>>> https://github.com/apache/iceberg/pull/12071).
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>> >>> With https://github.com/apache/iceberg/pull/11244, users
>>>>>>>>>> have the choice of specifying which Sink Implementation ([1] or [2]) 
>>>>>>>>>> they
>>>>>>>>>> want to use for Flink SQL.
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>> >>> With all this said, we’re proposing the following for Iceberg
>>>>>>>>>> 1.11:
>>>>>>>>>> >>>
>>>>>>>>>> >>> @Deprecate FlinkSink
>>>>>>>>>> >>> Promote IcebergSink from @Experimental to @PublicEvolving
>>>>>>>>>> >>> Make IcebergSink the default implementation in Flink SQL.
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>> >>> Then in Iceberg 1.12 we will:
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>> >>> Remove the `FlinkSink` implementation.
>>>>>>>>>> >>> Remove @PublicEvolving from IcebergSink
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>> >>> What do you think about this plan?
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>> >>> Thanks,
>>>>>>>>>> >>>
>>>>>>>>>> >>> Rodrigo
>>>>>>>>>> >>>
>>>>>>>>>> >>>
>>>>>>>>>>
>>>>>>>>>

Reply via email to