Is this 5-10-100-1000 different use-case we have seen the migration for?
Also, it would be good to see if someone in the community has some
experience too.

Maximilian Michels <m...@apache.org> ezt írta (időpont: 2025. jún. 30., H,
11:17):

> I haven't seen any issues for users on the new sink. The switch was
> seamless.
>
> On Fri, Jun 27, 2025 at 5:48 PM Péter Váry <peter.vary.apa...@gmail.com>
> wrote:
>
>> Oh.. I missed that. That's a bit better, but I'm still not sure. Do we
>> know of internal users who are already on the new Sink? What was the
>> experience there?
>>
>> Maximilian Michels <m...@apache.org> ezt írta (időpont: 2025. jún. 27.,
>> P, 13:49):
>>
>>> Peter, I was suggesting not to remove earlier, but to switch the default
>>> sink implementation earlier. Is that what you mean?
>>>
>>> -Max
>>>
>>> On Thu, Jun 26, 2025 at 5:19 PM Péter Váry <peter.vary.apa...@gmail.com>
>>> wrote:
>>>
>>>> +1 for the more conservative removal time
>>>>
>>>> Maximilian Michels <m...@apache.org> ezt írta (időpont: 2025. jún. 26.,
>>>> Cs, 15:54):
>>>>
>>>>> Hey Rod,
>>>>>
>>>>> IcebergSink does not currently support speculative batch execution.
>>>>> Modern sinks need to implement SupportsConcurrentExecutionAttempts [1]
>>>>> in order to work with speculative (batch) execution. This is merely a
>>>>> marker interface. I've verified that after adding it to IcebergSink, the
>>>>> test passes fine.
>>>>>
>>>>> Hey Steven,
>>>>>
>>>>> >I will reiterate one point that I mentioned before. For the Flink
>>>>> connector, sink is the dominant use case (compared source). Regardless of
>>>>> the unit test issue, it is probably better to go a little slower on this
>>>>> switch of sink implementation.
>>>>>
>>>>> Agreed, but I'm still a bit skeptical, that leaving IcebergSink
>>>>> disabled by default we will reach greater maturity. We will have a similar
>>>>> situation for the Iceberg 1.11.0 release, where we again need to decide
>>>>> whether to make the switch. The truth is, only after the sink is out and
>>>>> about, we can find any potential issues. Since we have the option for 
>>>>> users
>>>>> to switch back to the old implementation, we at least won't be blocking
>>>>> users. That said, I hear your concerns and I'm perfectly fine with
>>>>> waiting until Iceberg 1.11.0 to make IcebergSink the default Flink sink.
>>>>>
>>>>> -Max
>>>>>
>>>>> [1]
>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/speculative_execution/#enable-sinks-for-speculative-execution
>>>>>
>>>>> On Thu, Jun 26, 2025 at 5:38 AM Steven Wu <stevenz...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I will reiterate one point that I mentioned before. For the Flink
>>>>>> connector, sink is the dominant use case (compared source). Regardless of
>>>>>> the unit test issue, it is probably better to go a little slower on this
>>>>>> switch of sink implementation.
>>>>>>
>>>>>> On Wed, Jun 25, 2025 at 4:22 PM Rodrigo Meneses <rmene...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello!
>>>>>>>
>>>>>>>
>>>>>>> I wanted to give you folks an update about this, especially because
>>>>>>> of Max's suggestion of releasing this change in 1.10, as I know the 
>>>>>>> release
>>>>>>> branch will be cut soon, and without this unit test passing, I don’t 
>>>>>>> feel
>>>>>>> comfortable releasing this.
>>>>>>>
>>>>>>>
>>>>>>> During the implementation of this feature (which makes the
>>>>>>> IcebergSink the default sink for SQL), there’s a unit test that is 
>>>>>>> failing
>>>>>>> that is covering Speculative Execution Mode.
>>>>>>>
>>>>>>> `
>>>>>>>
>>>>>>> TestIcebergSpeculativeExecutionSupport.testSpeculativeExecution()
>>>>>>>
>>>>>>> `
>>>>>>>
>>>>>>> The test is timing out. I debugged it while using `FlinkSink` as the
>>>>>>> sink engine, and it works
>>>>>>>
>>>>>>> With FlinkSink, in the debugger, I observed that this code:
>>>>>>>
>>>>>>> ```java
>>>>>>>
>>>>>>> if (taskInfo.getIndexOfThisSubtask() == 0 &&
>>>>>>> taskInfo.getAttemptNumber() <= 0) {
>>>>>>>
>>>>>>>   Thread.*sleep*(Integer.*MAX_VALUE*);
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>> ```
>>>>>>>
>>>>>>> Is being hit 3 times:
>>>>>>>
>>>>>>>    1. The first time this `taskInfo.getIndexOfThisSubtask() == 0 &&
>>>>>>>    taskInfo.getAttemptNumber() <= 0` evaluates to true, so we Sleep 
>>>>>>> forever
>>>>>>>    2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1, so
>>>>>>>    we skip the infinite sleep and we return the row
>>>>>>>    3. The third time (the speculative try),
>>>>>>>    `taskInfo.getAttemptNumber()` is 1, so we skip the infinite sleep, 
>>>>>>> and
>>>>>>>    return the row.
>>>>>>>
>>>>>>> And then, the unit test passes.
>>>>>>>
>>>>>>>
>>>>>>> However, with `IcebergSink`, I only see this code hit 2 times.
>>>>>>>
>>>>>>>    1. The first time this `taskInfo.getIndexOfThisSubtask() == 0 &&
>>>>>>>    taskInfo.getAttemptNumber() <= 0` evaluates to true, so we Sleep 
>>>>>>> forever
>>>>>>>    2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1, so
>>>>>>>    we return the row.
>>>>>>>
>>>>>>> There’s no third time, which makes me believe that the speculative
>>>>>>> mode is not kicking in, as the slow (infinite waiting) task is not being
>>>>>>> retried.
>>>>>>>
>>>>>>>
>>>>>>> This could mean 2 things:
>>>>>>>
>>>>>>>    1. IcebergSink not working as expected with Speculative Mode
>>>>>>>    2. The unit test needs to be adjusted for IcebergSink
>>>>>>>
>>>>>>> While reviewing the code, I found @stevenzu commit from August 2024
>>>>>>> [1] which is related to this unit test also hanging and that adding
>>>>>>> `taskInfo.getIndexOfThisSubtask() == 0` was necessary for the unit test 
>>>>>>> to
>>>>>>> pass.
>>>>>>>
>>>>>>>
>>>>>>> I am still digging deeper, but so far I haven’t been able to
>>>>>>> pinpoint the problem. Will continue on this, but there’s a possibility 
>>>>>>> that
>>>>>>> this won’t be ready in time for this to be considered to be part of 1.10
>>>>>>> release.
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Rodrigo
>>>>>>>
>>>>>>>
>>>>>>> [1]
>>>>>>> https://github.com/apache/iceberg/pull/10832#discussion_r1701015316
>>>>>>>
>>>>>>> On Tue, Jun 24, 2025 at 3:05 AM Maximilian Michels <m...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> +1 Great proposal.
>>>>>>>>
>>>>>>>> What about moving the timeline one release ahead? There isn't any
>>>>>>>> more
>>>>>>>> planned work for IcebergSink between the 1.10.0 and the 1.11.0
>>>>>>>> release. IcebergSink already got enough exposure and we won't gain
>>>>>>>> anything from waiting longer to make it the default. Thus, I would
>>>>>>>> suggest to already deprecate FlinkSink in Iceberg 1.10.0 and promote
>>>>>>>> IcebergSink to be the default implementation. Users will have the
>>>>>>>> option to switch back to the old sink anytime via the configuration
>>>>>>>> option Rodrigo mentioned. If there are no issues, we can proceed to
>>>>>>>> remove FlinkSink in Iceberg 1.11.0.
>>>>>>>>
>>>>>>>> What do you think?
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Max
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Jun 23, 2025 at 10:15 PM Péter Váry <
>>>>>>>> peter.vary.apa...@gmail.com> wrote:
>>>>>>>> >
>>>>>>>> > +1
>>>>>>>> >
>>>>>>>> > On Mon, Jun 23, 2025, 22:04 Őrhidi Mátyás <
>>>>>>>> matyas.orh...@gmail.com> wrote:
>>>>>>>> >>
>>>>>>>> >> +1, sounds reasonable to me
>>>>>>>> >>
>>>>>>>> >> Thanks,
>>>>>>>> >> Matyas
>>>>>>>> >>
>>>>>>>> >> On Mon, Jun 23, 2025 at 11:28 AM Rodrigo Meneses <
>>>>>>>> rmene...@gmail.com> wrote:
>>>>>>>> >>>
>>>>>>>> >>> Hi devs,
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>> I’d like to start a discussion about the current and future
>>>>>>>> state of our Flink Sink Connectors.
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>> As it stands today, we currently have 3 sink implementations:
>>>>>>>> >>>
>>>>>>>> >>> FlinkSink [1]
>>>>>>>> >>> IcebergSink [2]
>>>>>>>> >>> DynamicSink [3]
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>> FlinkSink [1] is the current and default implementation of the
>>>>>>>> Flink Sink Connector.
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>> IcebergSink [2] is another implementation of the Flink Sink
>>>>>>>> Connector which was introduced in
>>>>>>>> https://github.com/apache/iceberg/pull/10179. It leverages the
>>>>>>>> latest SinkV2 interfaces in Flink, and it offers the possibility of 
>>>>>>>> adding
>>>>>>>> cleanup tasks by the way of implementing the `PostCommitTopology`
>>>>>>>> interface. There is already some work in progress to enable this
>>>>>>>> functionality: https://github.com/apache/iceberg/pull/12979
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>> DynamicSink [3] has been recently contributed in
>>>>>>>> https://github.com/apache/iceberg/pull/13304 and it can be used to
>>>>>>>> write to any number of tables, dynamically creating and updating 
>>>>>>>> tables and
>>>>>>>> dynamically updating the schema and partition spec of tables.
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>> Currently, `IcebergSink` is marked as `@Experimental` and it
>>>>>>>> already offers feature parity with `FlinkSink` (the missing RANGE
>>>>>>>> distribution was recently merged
>>>>>>>> https://github.com/apache/iceberg/pull/12071).
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>> With https://github.com/apache/iceberg/pull/11244, users have
>>>>>>>> the choice of specifying which Sink Implementation ([1] or [2]) they 
>>>>>>>> want
>>>>>>>> to use for Flink SQL.
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>> With all this said, we’re proposing the following for Iceberg
>>>>>>>> 1.11:
>>>>>>>> >>>
>>>>>>>> >>> @Deprecate FlinkSink
>>>>>>>> >>> Promote IcebergSink from @Experimental to @PublicEvolving
>>>>>>>> >>> Make IcebergSink the default implementation in Flink SQL.
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>> Then in Iceberg 1.12 we will:
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>> Remove the `FlinkSink` implementation.
>>>>>>>> >>> Remove @PublicEvolving from IcebergSink
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>> What do you think about this plan?
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>> >>> Thanks,
>>>>>>>> >>>
>>>>>>>> >>> Rodrigo
>>>>>>>> >>>
>>>>>>>> >>>
>>>>>>>>
>>>>>>>

Reply via email to