Hey Rod,

IcebergSink does not currently support speculative batch execution. Modern
sinks need to implement SupportsConcurrentExecutionAttempts [1] in order to
work with speculative (batch) execution. This is merely a marker interface.
I've verified that after adding it to IcebergSink, the test passes fine.

Hey Steven,

>I will reiterate one point that I mentioned before. For the Flink
connector, sink is the dominant use case (compared source). Regardless of
the unit test issue, it is probably better to go a little slower on this
switch of sink implementation.

Agreed, but I'm still a bit skeptical, that leaving IcebergSink disabled by
default we will reach greater maturity. We will have a similar situation
for the Iceberg 1.11.0 release, where we again need to decide whether to
make the switch. The truth is, only after the sink is out and about, we can
find any potential issues. Since we have the option for users to switch
back to the old implementation, we at least won't be blocking users. That
said, I hear your concerns and I'm perfectly fine with waiting until
Iceberg 1.11.0 to make IcebergSink the default Flink sink.

-Max

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/speculative_execution/#enable-sinks-for-speculative-execution

On Thu, Jun 26, 2025 at 5:38 AM Steven Wu <stevenz...@gmail.com> wrote:

> I will reiterate one point that I mentioned before. For the Flink
> connector, sink is the dominant use case (compared source). Regardless of
> the unit test issue, it is probably better to go a little slower on this
> switch of sink implementation.
>
> On Wed, Jun 25, 2025 at 4:22 PM Rodrigo Meneses <rmene...@gmail.com>
> wrote:
>
>> Hello!
>>
>>
>> I wanted to give you folks an update about this, especially because of
>> Max's suggestion of releasing this change in 1.10, as I know the release
>> branch will be cut soon, and without this unit test passing, I don’t feel
>> comfortable releasing this.
>>
>>
>> During the implementation of this feature (which makes the IcebergSink
>> the default sink for SQL), there’s a unit test that is failing that is
>> covering Speculative Execution Mode.
>>
>> `
>>
>> TestIcebergSpeculativeExecutionSupport.testSpeculativeExecution()
>>
>> `
>>
>> The test is timing out. I debugged it while using `FlinkSink` as the sink
>> engine, and it works
>>
>> With FlinkSink, in the debugger, I observed that this code:
>>
>> ```java
>>
>> if (taskInfo.getIndexOfThisSubtask() == 0 && taskInfo.getAttemptNumber()
>> <= 0) {
>>
>>   Thread.*sleep*(Integer.*MAX_VALUE*);
>>
>> }
>>
>> ```
>>
>> Is being hit 3 times:
>>
>>    1. The first time this `taskInfo.getIndexOfThisSubtask() == 0 &&
>>    taskInfo.getAttemptNumber() <= 0` evaluates to true, so we Sleep forever
>>    2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1, so we
>>    skip the infinite sleep and we return the row
>>    3. The third time (the speculative try),
>>    `taskInfo.getAttemptNumber()` is 1, so we skip the infinite sleep, and
>>    return the row.
>>
>> And then, the unit test passes.
>>
>>
>> However, with `IcebergSink`, I only see this code hit 2 times.
>>
>>    1. The first time this `taskInfo.getIndexOfThisSubtask() == 0 &&
>>    taskInfo.getAttemptNumber() <= 0` evaluates to true, so we Sleep forever
>>    2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1, so we
>>    return the row.
>>
>> There’s no third time, which makes me believe that the speculative mode
>> is not kicking in, as the slow (infinite waiting) task is not being retried.
>>
>>
>>
>> This could mean 2 things:
>>
>>    1. IcebergSink not working as expected with Speculative Mode
>>    2. The unit test needs to be adjusted for IcebergSink
>>
>> While reviewing the code, I found @stevenzu commit from August 2024 [1]
>> which is related to this unit test also hanging and that adding
>> `taskInfo.getIndexOfThisSubtask() == 0` was necessary for the unit test to
>> pass.
>>
>>
>> I am still digging deeper, but so far I haven’t been able to pinpoint the
>> problem. Will continue on this, but there’s a possibility that this won’t
>> be ready in time for this to be considered to be part of 1.10 release.
>>
>>
>> Thanks,
>>
>> Rodrigo
>>
>>
>> [1] https://github.com/apache/iceberg/pull/10832#discussion_r1701015316
>>
>> On Tue, Jun 24, 2025 at 3:05 AM Maximilian Michels <m...@apache.org>
>> wrote:
>>
>>> +1 Great proposal.
>>>
>>> What about moving the timeline one release ahead? There isn't any more
>>> planned work for IcebergSink between the 1.10.0 and the 1.11.0
>>> release. IcebergSink already got enough exposure and we won't gain
>>> anything from waiting longer to make it the default. Thus, I would
>>> suggest to already deprecate FlinkSink in Iceberg 1.10.0 and promote
>>> IcebergSink to be the default implementation. Users will have the
>>> option to switch back to the old sink anytime via the configuration
>>> option Rodrigo mentioned. If there are no issues, we can proceed to
>>> remove FlinkSink in Iceberg 1.11.0.
>>>
>>> What do you think?
>>>
>>> Cheers,
>>> Max
>>>
>>>
>>> On Mon, Jun 23, 2025 at 10:15 PM Péter Váry <peter.vary.apa...@gmail.com>
>>> wrote:
>>> >
>>> > +1
>>> >
>>> > On Mon, Jun 23, 2025, 22:04 Őrhidi Mátyás <matyas.orh...@gmail.com>
>>> wrote:
>>> >>
>>> >> +1, sounds reasonable to me
>>> >>
>>> >> Thanks,
>>> >> Matyas
>>> >>
>>> >> On Mon, Jun 23, 2025 at 11:28 AM Rodrigo Meneses <rmene...@gmail.com>
>>> wrote:
>>> >>>
>>> >>> Hi devs,
>>> >>>
>>> >>>
>>> >>> I’d like to start a discussion about the current and future state of
>>> our Flink Sink Connectors.
>>> >>>
>>> >>>
>>> >>> As it stands today, we currently have 3 sink implementations:
>>> >>>
>>> >>> FlinkSink [1]
>>> >>> IcebergSink [2]
>>> >>> DynamicSink [3]
>>> >>>
>>> >>>
>>> >>> FlinkSink [1] is the current and default implementation of the Flink
>>> Sink Connector.
>>> >>>
>>> >>>
>>> >>> IcebergSink [2] is another implementation of the Flink Sink
>>> Connector which was introduced in
>>> https://github.com/apache/iceberg/pull/10179. It leverages the latest
>>> SinkV2 interfaces in Flink, and it offers the possibility of adding cleanup
>>> tasks by the way of implementing the `PostCommitTopology` interface. There
>>> is already some work in progress to enable this functionality:
>>> https://github.com/apache/iceberg/pull/12979
>>> >>>
>>> >>>
>>> >>> DynamicSink [3] has been recently contributed in
>>> https://github.com/apache/iceberg/pull/13304 and it can be used to
>>> write to any number of tables, dynamically creating and updating tables and
>>> dynamically updating the schema and partition spec of tables.
>>> >>>
>>> >>>
>>> >>> Currently, `IcebergSink` is marked as `@Experimental` and it already
>>> offers feature parity with `FlinkSink` (the missing RANGE distribution was
>>> recently merged https://github.com/apache/iceberg/pull/12071).
>>> >>>
>>> >>>
>>> >>> With https://github.com/apache/iceberg/pull/11244, users have the
>>> choice of specifying which Sink Implementation ([1] or [2]) they want to
>>> use for Flink SQL.
>>> >>>
>>> >>>
>>> >>> With all this said, we’re proposing the following for Iceberg 1.11:
>>> >>>
>>> >>> @Deprecate FlinkSink
>>> >>> Promote IcebergSink from @Experimental to @PublicEvolving
>>> >>> Make IcebergSink the default implementation in Flink SQL.
>>> >>>
>>> >>>
>>> >>> Then in Iceberg 1.12 we will:
>>> >>>
>>> >>>
>>> >>> Remove the `FlinkSink` implementation.
>>> >>> Remove @PublicEvolving from IcebergSink
>>> >>>
>>> >>>
>>> >>> What do you think about this plan?
>>> >>>
>>> >>>
>>> >>> Thanks,
>>> >>>
>>> >>> Rodrigo
>>> >>>
>>> >>>
>>>
>>

Reply via email to