Oh.. I missed that. That's a bit better, but I'm still not sure. Do we know
of internal users who are already on the new Sink? What was the experience
there?

Maximilian Michels <m...@apache.org> ezt írta (időpont: 2025. jún. 27., P,
13:49):

> Peter, I was suggesting not to remove earlier, but to switch the default
> sink implementation earlier. Is that what you mean?
>
> -Max
>
> On Thu, Jun 26, 2025 at 5:19 PM Péter Váry <peter.vary.apa...@gmail.com>
> wrote:
>
>> +1 for the more conservative removal time
>>
>> Maximilian Michels <m...@apache.org> ezt írta (időpont: 2025. jún. 26.,
>> Cs, 15:54):
>>
>>> Hey Rod,
>>>
>>> IcebergSink does not currently support speculative batch execution.
>>> Modern sinks need to implement SupportsConcurrentExecutionAttempts [1]
>>> in order to work with speculative (batch) execution. This is merely a
>>> marker interface. I've verified that after adding it to IcebergSink, the
>>> test passes fine.
>>>
>>> Hey Steven,
>>>
>>> >I will reiterate one point that I mentioned before. For the Flink
>>> connector, sink is the dominant use case (compared source). Regardless of
>>> the unit test issue, it is probably better to go a little slower on this
>>> switch of sink implementation.
>>>
>>> Agreed, but I'm still a bit skeptical, that leaving IcebergSink disabled
>>> by default we will reach greater maturity. We will have a similar situation
>>> for the Iceberg 1.11.0 release, where we again need to decide whether to
>>> make the switch. The truth is, only after the sink is out and about, we can
>>> find any potential issues. Since we have the option for users to switch
>>> back to the old implementation, we at least won't be blocking users. That
>>> said, I hear your concerns and I'm perfectly fine with waiting until
>>> Iceberg 1.11.0 to make IcebergSink the default Flink sink.
>>>
>>> -Max
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/speculative_execution/#enable-sinks-for-speculative-execution
>>>
>>> On Thu, Jun 26, 2025 at 5:38 AM Steven Wu <stevenz...@gmail.com> wrote:
>>>
>>>> I will reiterate one point that I mentioned before. For the Flink
>>>> connector, sink is the dominant use case (compared source). Regardless of
>>>> the unit test issue, it is probably better to go a little slower on this
>>>> switch of sink implementation.
>>>>
>>>> On Wed, Jun 25, 2025 at 4:22 PM Rodrigo Meneses <rmene...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello!
>>>>>
>>>>>
>>>>> I wanted to give you folks an update about this, especially because of
>>>>> Max's suggestion of releasing this change in 1.10, as I know the release
>>>>> branch will be cut soon, and without this unit test passing, I don’t feel
>>>>> comfortable releasing this.
>>>>>
>>>>>
>>>>> During the implementation of this feature (which makes the IcebergSink
>>>>> the default sink for SQL), there’s a unit test that is failing that is
>>>>> covering Speculative Execution Mode.
>>>>>
>>>>> `
>>>>>
>>>>> TestIcebergSpeculativeExecutionSupport.testSpeculativeExecution()
>>>>>
>>>>> `
>>>>>
>>>>> The test is timing out. I debugged it while using `FlinkSink` as the
>>>>> sink engine, and it works
>>>>>
>>>>> With FlinkSink, in the debugger, I observed that this code:
>>>>>
>>>>> ```java
>>>>>
>>>>> if (taskInfo.getIndexOfThisSubtask() == 0 &&
>>>>> taskInfo.getAttemptNumber() <= 0) {
>>>>>
>>>>>   Thread.*sleep*(Integer.*MAX_VALUE*);
>>>>>
>>>>> }
>>>>>
>>>>> ```
>>>>>
>>>>> Is being hit 3 times:
>>>>>
>>>>>    1. The first time this `taskInfo.getIndexOfThisSubtask() == 0 &&
>>>>>    taskInfo.getAttemptNumber() <= 0` evaluates to true, so we Sleep 
>>>>> forever
>>>>>    2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1, so
>>>>>    we skip the infinite sleep and we return the row
>>>>>    3. The third time (the speculative try),
>>>>>    `taskInfo.getAttemptNumber()` is 1, so we skip the infinite sleep, and
>>>>>    return the row.
>>>>>
>>>>> And then, the unit test passes.
>>>>>
>>>>>
>>>>> However, with `IcebergSink`, I only see this code hit 2 times.
>>>>>
>>>>>    1. The first time this `taskInfo.getIndexOfThisSubtask() == 0 &&
>>>>>    taskInfo.getAttemptNumber() <= 0` evaluates to true, so we Sleep 
>>>>> forever
>>>>>    2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1, so
>>>>>    we return the row.
>>>>>
>>>>> There’s no third time, which makes me believe that the speculative
>>>>> mode is not kicking in, as the slow (infinite waiting) task is not being
>>>>> retried.
>>>>>
>>>>>
>>>>> This could mean 2 things:
>>>>>
>>>>>    1. IcebergSink not working as expected with Speculative Mode
>>>>>    2. The unit test needs to be adjusted for IcebergSink
>>>>>
>>>>> While reviewing the code, I found @stevenzu commit from August 2024
>>>>> [1] which is related to this unit test also hanging and that adding
>>>>> `taskInfo.getIndexOfThisSubtask() == 0` was necessary for the unit test to
>>>>> pass.
>>>>>
>>>>>
>>>>> I am still digging deeper, but so far I haven’t been able to pinpoint
>>>>> the problem. Will continue on this, but there’s a possibility that this
>>>>> won’t be ready in time for this to be considered to be part of 1.10 
>>>>> release.
>>>>>
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Rodrigo
>>>>>
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/iceberg/pull/10832#discussion_r1701015316
>>>>>
>>>>> On Tue, Jun 24, 2025 at 3:05 AM Maximilian Michels <m...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> +1 Great proposal.
>>>>>>
>>>>>> What about moving the timeline one release ahead? There isn't any more
>>>>>> planned work for IcebergSink between the 1.10.0 and the 1.11.0
>>>>>> release. IcebergSink already got enough exposure and we won't gain
>>>>>> anything from waiting longer to make it the default. Thus, I would
>>>>>> suggest to already deprecate FlinkSink in Iceberg 1.10.0 and promote
>>>>>> IcebergSink to be the default implementation. Users will have the
>>>>>> option to switch back to the old sink anytime via the configuration
>>>>>> option Rodrigo mentioned. If there are no issues, we can proceed to
>>>>>> remove FlinkSink in Iceberg 1.11.0.
>>>>>>
>>>>>> What do you think?
>>>>>>
>>>>>> Cheers,
>>>>>> Max
>>>>>>
>>>>>>
>>>>>> On Mon, Jun 23, 2025 at 10:15 PM Péter Váry <
>>>>>> peter.vary.apa...@gmail.com> wrote:
>>>>>> >
>>>>>> > +1
>>>>>> >
>>>>>> > On Mon, Jun 23, 2025, 22:04 Őrhidi Mátyás <matyas.orh...@gmail.com>
>>>>>> wrote:
>>>>>> >>
>>>>>> >> +1, sounds reasonable to me
>>>>>> >>
>>>>>> >> Thanks,
>>>>>> >> Matyas
>>>>>> >>
>>>>>> >> On Mon, Jun 23, 2025 at 11:28 AM Rodrigo Meneses <
>>>>>> rmene...@gmail.com> wrote:
>>>>>> >>>
>>>>>> >>> Hi devs,
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> I’d like to start a discussion about the current and future state
>>>>>> of our Flink Sink Connectors.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> As it stands today, we currently have 3 sink implementations:
>>>>>> >>>
>>>>>> >>> FlinkSink [1]
>>>>>> >>> IcebergSink [2]
>>>>>> >>> DynamicSink [3]
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> FlinkSink [1] is the current and default implementation of the
>>>>>> Flink Sink Connector.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> IcebergSink [2] is another implementation of the Flink Sink
>>>>>> Connector which was introduced in
>>>>>> https://github.com/apache/iceberg/pull/10179. It leverages the
>>>>>> latest SinkV2 interfaces in Flink, and it offers the possibility of 
>>>>>> adding
>>>>>> cleanup tasks by the way of implementing the `PostCommitTopology`
>>>>>> interface. There is already some work in progress to enable this
>>>>>> functionality: https://github.com/apache/iceberg/pull/12979
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> DynamicSink [3] has been recently contributed in
>>>>>> https://github.com/apache/iceberg/pull/13304 and it can be used to
>>>>>> write to any number of tables, dynamically creating and updating tables 
>>>>>> and
>>>>>> dynamically updating the schema and partition spec of tables.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> Currently, `IcebergSink` is marked as `@Experimental` and it
>>>>>> already offers feature parity with `FlinkSink` (the missing RANGE
>>>>>> distribution was recently merged
>>>>>> https://github.com/apache/iceberg/pull/12071).
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> With https://github.com/apache/iceberg/pull/11244, users have
>>>>>> the choice of specifying which Sink Implementation ([1] or [2]) they want
>>>>>> to use for Flink SQL.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> With all this said, we’re proposing the following for Iceberg
>>>>>> 1.11:
>>>>>> >>>
>>>>>> >>> @Deprecate FlinkSink
>>>>>> >>> Promote IcebergSink from @Experimental to @PublicEvolving
>>>>>> >>> Make IcebergSink the default implementation in Flink SQL.
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> Then in Iceberg 1.12 we will:
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> Remove the `FlinkSink` implementation.
>>>>>> >>> Remove @PublicEvolving from IcebergSink
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> What do you think about this plan?
>>>>>> >>>
>>>>>> >>>
>>>>>> >>> Thanks,
>>>>>> >>>
>>>>>> >>> Rodrigo
>>>>>> >>>
>>>>>> >>>
>>>>>>
>>>>>

Reply via email to