Peter, I was suggesting not to remove earlier, but to switch the default
sink implementation earlier. Is that what you mean?

-Max

On Thu, Jun 26, 2025 at 5:19 PM Péter Váry <peter.vary.apa...@gmail.com>
wrote:

> +1 for the more conservative removal time
>
> Maximilian Michels <m...@apache.org> ezt írta (időpont: 2025. jún. 26.,
> Cs, 15:54):
>
>> Hey Rod,
>>
>> IcebergSink does not currently support speculative batch execution.
>> Modern sinks need to implement SupportsConcurrentExecutionAttempts [1]
>> in order to work with speculative (batch) execution. This is merely a
>> marker interface. I've verified that after adding it to IcebergSink, the
>> test passes fine.
>>
>> Hey Steven,
>>
>> >I will reiterate one point that I mentioned before. For the Flink
>> connector, sink is the dominant use case (compared source). Regardless of
>> the unit test issue, it is probably better to go a little slower on this
>> switch of sink implementation.
>>
>> Agreed, but I'm still a bit skeptical, that leaving IcebergSink disabled
>> by default we will reach greater maturity. We will have a similar situation
>> for the Iceberg 1.11.0 release, where we again need to decide whether to
>> make the switch. The truth is, only after the sink is out and about, we can
>> find any potential issues. Since we have the option for users to switch
>> back to the old implementation, we at least won't be blocking users. That
>> said, I hear your concerns and I'm perfectly fine with waiting until
>> Iceberg 1.11.0 to make IcebergSink the default Flink sink.
>>
>> -Max
>>
>> [1]
>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/speculative_execution/#enable-sinks-for-speculative-execution
>>
>> On Thu, Jun 26, 2025 at 5:38 AM Steven Wu <stevenz...@gmail.com> wrote:
>>
>>> I will reiterate one point that I mentioned before. For the Flink
>>> connector, sink is the dominant use case (compared source). Regardless of
>>> the unit test issue, it is probably better to go a little slower on this
>>> switch of sink implementation.
>>>
>>> On Wed, Jun 25, 2025 at 4:22 PM Rodrigo Meneses <rmene...@gmail.com>
>>> wrote:
>>>
>>>> Hello!
>>>>
>>>>
>>>> I wanted to give you folks an update about this, especially because of
>>>> Max's suggestion of releasing this change in 1.10, as I know the release
>>>> branch will be cut soon, and without this unit test passing, I don’t feel
>>>> comfortable releasing this.
>>>>
>>>>
>>>> During the implementation of this feature (which makes the IcebergSink
>>>> the default sink for SQL), there’s a unit test that is failing that is
>>>> covering Speculative Execution Mode.
>>>>
>>>> `
>>>>
>>>> TestIcebergSpeculativeExecutionSupport.testSpeculativeExecution()
>>>>
>>>> `
>>>>
>>>> The test is timing out. I debugged it while using `FlinkSink` as the
>>>> sink engine, and it works
>>>>
>>>> With FlinkSink, in the debugger, I observed that this code:
>>>>
>>>> ```java
>>>>
>>>> if (taskInfo.getIndexOfThisSubtask() == 0 &&
>>>> taskInfo.getAttemptNumber() <= 0) {
>>>>
>>>>   Thread.*sleep*(Integer.*MAX_VALUE*);
>>>>
>>>> }
>>>>
>>>> ```
>>>>
>>>> Is being hit 3 times:
>>>>
>>>>    1. The first time this `taskInfo.getIndexOfThisSubtask() == 0 &&
>>>>    taskInfo.getAttemptNumber() <= 0` evaluates to true, so we Sleep forever
>>>>    2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1, so we
>>>>    skip the infinite sleep and we return the row
>>>>    3. The third time (the speculative try),
>>>>    `taskInfo.getAttemptNumber()` is 1, so we skip the infinite sleep, and
>>>>    return the row.
>>>>
>>>> And then, the unit test passes.
>>>>
>>>>
>>>> However, with `IcebergSink`, I only see this code hit 2 times.
>>>>
>>>>    1. The first time this `taskInfo.getIndexOfThisSubtask() == 0 &&
>>>>    taskInfo.getAttemptNumber() <= 0` evaluates to true, so we Sleep forever
>>>>    2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1, so we
>>>>    return the row.
>>>>
>>>> There’s no third time, which makes me believe that the speculative mode
>>>> is not kicking in, as the slow (infinite waiting) task is not being 
>>>> retried.
>>>>
>>>>
>>>>
>>>> This could mean 2 things:
>>>>
>>>>    1. IcebergSink not working as expected with Speculative Mode
>>>>    2. The unit test needs to be adjusted for IcebergSink
>>>>
>>>> While reviewing the code, I found @stevenzu commit from August 2024 [1]
>>>> which is related to this unit test also hanging and that adding
>>>> `taskInfo.getIndexOfThisSubtask() == 0` was necessary for the unit test to
>>>> pass.
>>>>
>>>>
>>>> I am still digging deeper, but so far I haven’t been able to pinpoint
>>>> the problem. Will continue on this, but there’s a possibility that this
>>>> won’t be ready in time for this to be considered to be part of 1.10 
>>>> release.
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Rodrigo
>>>>
>>>>
>>>> [1] https://github.com/apache/iceberg/pull/10832#discussion_r1701015316
>>>>
>>>>
>>>> On Tue, Jun 24, 2025 at 3:05 AM Maximilian Michels <m...@apache.org>
>>>> wrote:
>>>>
>>>>> +1 Great proposal.
>>>>>
>>>>> What about moving the timeline one release ahead? There isn't any more
>>>>> planned work for IcebergSink between the 1.10.0 and the 1.11.0
>>>>> release. IcebergSink already got enough exposure and we won't gain
>>>>> anything from waiting longer to make it the default. Thus, I would
>>>>> suggest to already deprecate FlinkSink in Iceberg 1.10.0 and promote
>>>>> IcebergSink to be the default implementation. Users will have the
>>>>> option to switch back to the old sink anytime via the configuration
>>>>> option Rodrigo mentioned. If there are no issues, we can proceed to
>>>>> remove FlinkSink in Iceberg 1.11.0.
>>>>>
>>>>> What do you think?
>>>>>
>>>>> Cheers,
>>>>> Max
>>>>>
>>>>>
>>>>> On Mon, Jun 23, 2025 at 10:15 PM Péter Váry <
>>>>> peter.vary.apa...@gmail.com> wrote:
>>>>> >
>>>>> > +1
>>>>> >
>>>>> > On Mon, Jun 23, 2025, 22:04 Őrhidi Mátyás <matyas.orh...@gmail.com>
>>>>> wrote:
>>>>> >>
>>>>> >> +1, sounds reasonable to me
>>>>> >>
>>>>> >> Thanks,
>>>>> >> Matyas
>>>>> >>
>>>>> >> On Mon, Jun 23, 2025 at 11:28 AM Rodrigo Meneses <
>>>>> rmene...@gmail.com> wrote:
>>>>> >>>
>>>>> >>> Hi devs,
>>>>> >>>
>>>>> >>>
>>>>> >>> I’d like to start a discussion about the current and future state
>>>>> of our Flink Sink Connectors.
>>>>> >>>
>>>>> >>>
>>>>> >>> As it stands today, we currently have 3 sink implementations:
>>>>> >>>
>>>>> >>> FlinkSink [1]
>>>>> >>> IcebergSink [2]
>>>>> >>> DynamicSink [3]
>>>>> >>>
>>>>> >>>
>>>>> >>> FlinkSink [1] is the current and default implementation of the
>>>>> Flink Sink Connector.
>>>>> >>>
>>>>> >>>
>>>>> >>> IcebergSink [2] is another implementation of the Flink Sink
>>>>> Connector which was introduced in
>>>>> https://github.com/apache/iceberg/pull/10179. It leverages the latest
>>>>> SinkV2 interfaces in Flink, and it offers the possibility of adding 
>>>>> cleanup
>>>>> tasks by the way of implementing the `PostCommitTopology` interface. There
>>>>> is already some work in progress to enable this functionality:
>>>>> https://github.com/apache/iceberg/pull/12979
>>>>> >>>
>>>>> >>>
>>>>> >>> DynamicSink [3] has been recently contributed in
>>>>> https://github.com/apache/iceberg/pull/13304 and it can be used to
>>>>> write to any number of tables, dynamically creating and updating tables 
>>>>> and
>>>>> dynamically updating the schema and partition spec of tables.
>>>>> >>>
>>>>> >>>
>>>>> >>> Currently, `IcebergSink` is marked as `@Experimental` and it
>>>>> already offers feature parity with `FlinkSink` (the missing RANGE
>>>>> distribution was recently merged
>>>>> https://github.com/apache/iceberg/pull/12071).
>>>>> >>>
>>>>> >>>
>>>>> >>> With https://github.com/apache/iceberg/pull/11244, users have the
>>>>> choice of specifying which Sink Implementation ([1] or [2]) they want to
>>>>> use for Flink SQL.
>>>>> >>>
>>>>> >>>
>>>>> >>> With all this said, we’re proposing the following for Iceberg 1.11:
>>>>> >>>
>>>>> >>> @Deprecate FlinkSink
>>>>> >>> Promote IcebergSink from @Experimental to @PublicEvolving
>>>>> >>> Make IcebergSink the default implementation in Flink SQL.
>>>>> >>>
>>>>> >>>
>>>>> >>> Then in Iceberg 1.12 we will:
>>>>> >>>
>>>>> >>>
>>>>> >>> Remove the `FlinkSink` implementation.
>>>>> >>> Remove @PublicEvolving from IcebergSink
>>>>> >>>
>>>>> >>>
>>>>> >>> What do you think about this plan?
>>>>> >>>
>>>>> >>>
>>>>> >>> Thanks,
>>>>> >>>
>>>>> >>> Rodrigo
>>>>> >>>
>>>>> >>>
>>>>>
>>>>

Reply via email to