I haven't seen any issues for users on the new sink. The switch was
seamless.

On Fri, Jun 27, 2025 at 5:48 PM Péter Váry <peter.vary.apa...@gmail.com>
wrote:

> Oh.. I missed that. That's a bit better, but I'm still not sure. Do we
> know of internal users who are already on the new Sink? What was the
> experience there?
>
> Maximilian Michels <m...@apache.org> ezt írta (időpont: 2025. jún. 27., P,
> 13:49):
>
>> Peter, I was suggesting not to remove earlier, but to switch the default
>> sink implementation earlier. Is that what you mean?
>>
>> -Max
>>
>> On Thu, Jun 26, 2025 at 5:19 PM Péter Váry <peter.vary.apa...@gmail.com>
>> wrote:
>>
>>> +1 for the more conservative removal time
>>>
>>> Maximilian Michels <m...@apache.org> ezt írta (időpont: 2025. jún. 26.,
>>> Cs, 15:54):
>>>
>>>> Hey Rod,
>>>>
>>>> IcebergSink does not currently support speculative batch execution.
>>>> Modern sinks need to implement SupportsConcurrentExecutionAttempts [1]
>>>> in order to work with speculative (batch) execution. This is merely a
>>>> marker interface. I've verified that after adding it to IcebergSink, the
>>>> test passes fine.
>>>>
>>>> Hey Steven,
>>>>
>>>> >I will reiterate one point that I mentioned before. For the Flink
>>>> connector, sink is the dominant use case (compared source). Regardless of
>>>> the unit test issue, it is probably better to go a little slower on this
>>>> switch of sink implementation.
>>>>
>>>> Agreed, but I'm still a bit skeptical, that leaving IcebergSink
>>>> disabled by default we will reach greater maturity. We will have a similar
>>>> situation for the Iceberg 1.11.0 release, where we again need to decide
>>>> whether to make the switch. The truth is, only after the sink is out and
>>>> about, we can find any potential issues. Since we have the option for users
>>>> to switch back to the old implementation, we at least won't be blocking
>>>> users. That said, I hear your concerns and I'm perfectly fine with
>>>> waiting until Iceberg 1.11.0 to make IcebergSink the default Flink sink.
>>>>
>>>> -Max
>>>>
>>>> [1]
>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/speculative_execution/#enable-sinks-for-speculative-execution
>>>>
>>>> On Thu, Jun 26, 2025 at 5:38 AM Steven Wu <stevenz...@gmail.com> wrote:
>>>>
>>>>> I will reiterate one point that I mentioned before. For the Flink
>>>>> connector, sink is the dominant use case (compared source). Regardless of
>>>>> the unit test issue, it is probably better to go a little slower on this
>>>>> switch of sink implementation.
>>>>>
>>>>> On Wed, Jun 25, 2025 at 4:22 PM Rodrigo Meneses <rmene...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello!
>>>>>>
>>>>>>
>>>>>> I wanted to give you folks an update about this, especially because
>>>>>> of Max's suggestion of releasing this change in 1.10, as I know the 
>>>>>> release
>>>>>> branch will be cut soon, and without this unit test passing, I don’t feel
>>>>>> comfortable releasing this.
>>>>>>
>>>>>>
>>>>>> During the implementation of this feature (which makes the
>>>>>> IcebergSink the default sink for SQL), there’s a unit test that is 
>>>>>> failing
>>>>>> that is covering Speculative Execution Mode.
>>>>>>
>>>>>> `
>>>>>>
>>>>>> TestIcebergSpeculativeExecutionSupport.testSpeculativeExecution()
>>>>>>
>>>>>> `
>>>>>>
>>>>>> The test is timing out. I debugged it while using `FlinkSink` as the
>>>>>> sink engine, and it works
>>>>>>
>>>>>> With FlinkSink, in the debugger, I observed that this code:
>>>>>>
>>>>>> ```java
>>>>>>
>>>>>> if (taskInfo.getIndexOfThisSubtask() == 0 &&
>>>>>> taskInfo.getAttemptNumber() <= 0) {
>>>>>>
>>>>>>   Thread.*sleep*(Integer.*MAX_VALUE*);
>>>>>>
>>>>>> }
>>>>>>
>>>>>> ```
>>>>>>
>>>>>> Is being hit 3 times:
>>>>>>
>>>>>>    1. The first time this `taskInfo.getIndexOfThisSubtask() == 0 &&
>>>>>>    taskInfo.getAttemptNumber() <= 0` evaluates to true, so we Sleep 
>>>>>> forever
>>>>>>    2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1, so
>>>>>>    we skip the infinite sleep and we return the row
>>>>>>    3. The third time (the speculative try),
>>>>>>    `taskInfo.getAttemptNumber()` is 1, so we skip the infinite sleep, and
>>>>>>    return the row.
>>>>>>
>>>>>> And then, the unit test passes.
>>>>>>
>>>>>>
>>>>>> However, with `IcebergSink`, I only see this code hit 2 times.
>>>>>>
>>>>>>    1. The first time this `taskInfo.getIndexOfThisSubtask() == 0 &&
>>>>>>    taskInfo.getAttemptNumber() <= 0` evaluates to true, so we Sleep 
>>>>>> forever
>>>>>>    2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1, so
>>>>>>    we return the row.
>>>>>>
>>>>>> There’s no third time, which makes me believe that the speculative
>>>>>> mode is not kicking in, as the slow (infinite waiting) task is not being
>>>>>> retried.
>>>>>>
>>>>>>
>>>>>> This could mean 2 things:
>>>>>>
>>>>>>    1. IcebergSink not working as expected with Speculative Mode
>>>>>>    2. The unit test needs to be adjusted for IcebergSink
>>>>>>
>>>>>> While reviewing the code, I found @stevenzu commit from August 2024
>>>>>> [1] which is related to this unit test also hanging and that adding
>>>>>> `taskInfo.getIndexOfThisSubtask() == 0` was necessary for the unit test 
>>>>>> to
>>>>>> pass.
>>>>>>
>>>>>>
>>>>>> I am still digging deeper, but so far I haven’t been able to pinpoint
>>>>>> the problem. Will continue on this, but there’s a possibility that this
>>>>>> won’t be ready in time for this to be considered to be part of 1.10 
>>>>>> release.
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Rodrigo
>>>>>>
>>>>>>
>>>>>> [1]
>>>>>> https://github.com/apache/iceberg/pull/10832#discussion_r1701015316
>>>>>>
>>>>>> On Tue, Jun 24, 2025 at 3:05 AM Maximilian Michels <m...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> +1 Great proposal.
>>>>>>>
>>>>>>> What about moving the timeline one release ahead? There isn't any
>>>>>>> more
>>>>>>> planned work for IcebergSink between the 1.10.0 and the 1.11.0
>>>>>>> release. IcebergSink already got enough exposure and we won't gain
>>>>>>> anything from waiting longer to make it the default. Thus, I would
>>>>>>> suggest to already deprecate FlinkSink in Iceberg 1.10.0 and promote
>>>>>>> IcebergSink to be the default implementation. Users will have the
>>>>>>> option to switch back to the old sink anytime via the configuration
>>>>>>> option Rodrigo mentioned. If there are no issues, we can proceed to
>>>>>>> remove FlinkSink in Iceberg 1.11.0.
>>>>>>>
>>>>>>> What do you think?
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Max
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Jun 23, 2025 at 10:15 PM Péter Váry <
>>>>>>> peter.vary.apa...@gmail.com> wrote:
>>>>>>> >
>>>>>>> > +1
>>>>>>> >
>>>>>>> > On Mon, Jun 23, 2025, 22:04 Őrhidi Mátyás <matyas.orh...@gmail.com>
>>>>>>> wrote:
>>>>>>> >>
>>>>>>> >> +1, sounds reasonable to me
>>>>>>> >>
>>>>>>> >> Thanks,
>>>>>>> >> Matyas
>>>>>>> >>
>>>>>>> >> On Mon, Jun 23, 2025 at 11:28 AM Rodrigo Meneses <
>>>>>>> rmene...@gmail.com> wrote:
>>>>>>> >>>
>>>>>>> >>> Hi devs,
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>> I’d like to start a discussion about the current and future
>>>>>>> state of our Flink Sink Connectors.
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>> As it stands today, we currently have 3 sink implementations:
>>>>>>> >>>
>>>>>>> >>> FlinkSink [1]
>>>>>>> >>> IcebergSink [2]
>>>>>>> >>> DynamicSink [3]
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>> FlinkSink [1] is the current and default implementation of the
>>>>>>> Flink Sink Connector.
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>> IcebergSink [2] is another implementation of the Flink Sink
>>>>>>> Connector which was introduced in
>>>>>>> https://github.com/apache/iceberg/pull/10179. It leverages the
>>>>>>> latest SinkV2 interfaces in Flink, and it offers the possibility of 
>>>>>>> adding
>>>>>>> cleanup tasks by the way of implementing the `PostCommitTopology`
>>>>>>> interface. There is already some work in progress to enable this
>>>>>>> functionality: https://github.com/apache/iceberg/pull/12979
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>> DynamicSink [3] has been recently contributed in
>>>>>>> https://github.com/apache/iceberg/pull/13304 and it can be used to
>>>>>>> write to any number of tables, dynamically creating and updating tables 
>>>>>>> and
>>>>>>> dynamically updating the schema and partition spec of tables.
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>> Currently, `IcebergSink` is marked as `@Experimental` and it
>>>>>>> already offers feature parity with `FlinkSink` (the missing RANGE
>>>>>>> distribution was recently merged
>>>>>>> https://github.com/apache/iceberg/pull/12071).
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>> With https://github.com/apache/iceberg/pull/11244, users have
>>>>>>> the choice of specifying which Sink Implementation ([1] or [2]) they 
>>>>>>> want
>>>>>>> to use for Flink SQL.
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>> With all this said, we’re proposing the following for Iceberg
>>>>>>> 1.11:
>>>>>>> >>>
>>>>>>> >>> @Deprecate FlinkSink
>>>>>>> >>> Promote IcebergSink from @Experimental to @PublicEvolving
>>>>>>> >>> Make IcebergSink the default implementation in Flink SQL.
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>> Then in Iceberg 1.12 we will:
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>> Remove the `FlinkSink` implementation.
>>>>>>> >>> Remove @PublicEvolving from IcebergSink
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>> What do you think about this plan?
>>>>>>> >>>
>>>>>>> >>>
>>>>>>> >>> Thanks,
>>>>>>> >>>
>>>>>>> >>> Rodrigo
>>>>>>> >>>
>>>>>>> >>>
>>>>>>>
>>>>>>

Reply via email to