I will reiterate one point that I mentioned before. For the Flink
connector, sink is the dominant use case (compared source). Regardless of
the unit test issue, it is probably better to go a little slower on this
switch of sink implementation.

On Wed, Jun 25, 2025 at 4:22 PM Rodrigo Meneses <rmene...@gmail.com> wrote:

> Hello!
>
>
> I wanted to give you folks an update about this, especially because of
> Max's suggestion of releasing this change in 1.10, as I know the release
> branch will be cut soon, and without this unit test passing, I don’t feel
> comfortable releasing this.
>
>
> During the implementation of this feature (which makes the IcebergSink the
> default sink for SQL), there’s a unit test that is failing that is covering
> Speculative Execution Mode.
>
> `
>
> TestIcebergSpeculativeExecutionSupport.testSpeculativeExecution()
>
> `
>
> The test is timing out. I debugged it while using `FlinkSink` as the sink
> engine, and it works
>
> With FlinkSink, in the debugger, I observed that this code:
>
> ```java
>
> if (taskInfo.getIndexOfThisSubtask() == 0 && taskInfo.getAttemptNumber()
> <= 0) {
>
>   Thread.*sleep*(Integer.*MAX_VALUE*);
>
> }
>
> ```
>
> Is being hit 3 times:
>
>    1. The first time this `taskInfo.getIndexOfThisSubtask() == 0 &&
>    taskInfo.getAttemptNumber() <= 0` evaluates to true, so we Sleep forever
>    2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1, so we
>    skip the infinite sleep and we return the row
>    3. The third time (the speculative try), `taskInfo.getAttemptNumber()`
>    is 1, so we skip the infinite sleep, and return the row.
>
> And then, the unit test passes.
>
>
> However, with `IcebergSink`, I only see this code hit 2 times.
>
>    1. The first time this `taskInfo.getIndexOfThisSubtask() == 0 &&
>    taskInfo.getAttemptNumber() <= 0` evaluates to true, so we Sleep forever
>    2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1, so we
>    return the row.
>
> There’s no third time, which makes me believe that the speculative mode is
> not kicking in, as the slow (infinite waiting) task is not being retried.
>
>
> This could mean 2 things:
>
>    1. IcebergSink not working as expected with Speculative Mode
>    2. The unit test needs to be adjusted for IcebergSink
>
> While reviewing the code, I found @stevenzu commit from August 2024 [1]
> which is related to this unit test also hanging and that adding
> `taskInfo.getIndexOfThisSubtask() == 0` was necessary for the unit test to
> pass.
>
>
> I am still digging deeper, but so far I haven’t been able to pinpoint the
> problem. Will continue on this, but there’s a possibility that this won’t
> be ready in time for this to be considered to be part of 1.10 release.
>
>
> Thanks,
>
> Rodrigo
>
>
> [1] https://github.com/apache/iceberg/pull/10832#discussion_r1701015316
>
> On Tue, Jun 24, 2025 at 3:05 AM Maximilian Michels <m...@apache.org> wrote:
>
>> +1 Great proposal.
>>
>> What about moving the timeline one release ahead? There isn't any more
>> planned work for IcebergSink between the 1.10.0 and the 1.11.0
>> release. IcebergSink already got enough exposure and we won't gain
>> anything from waiting longer to make it the default. Thus, I would
>> suggest to already deprecate FlinkSink in Iceberg 1.10.0 and promote
>> IcebergSink to be the default implementation. Users will have the
>> option to switch back to the old sink anytime via the configuration
>> option Rodrigo mentioned. If there are no issues, we can proceed to
>> remove FlinkSink in Iceberg 1.11.0.
>>
>> What do you think?
>>
>> Cheers,
>> Max
>>
>>
>> On Mon, Jun 23, 2025 at 10:15 PM Péter Váry <peter.vary.apa...@gmail.com>
>> wrote:
>> >
>> > +1
>> >
>> > On Mon, Jun 23, 2025, 22:04 Őrhidi Mátyás <matyas.orh...@gmail.com>
>> wrote:
>> >>
>> >> +1, sounds reasonable to me
>> >>
>> >> Thanks,
>> >> Matyas
>> >>
>> >> On Mon, Jun 23, 2025 at 11:28 AM Rodrigo Meneses <rmene...@gmail.com>
>> wrote:
>> >>>
>> >>> Hi devs,
>> >>>
>> >>>
>> >>> I’d like to start a discussion about the current and future state of
>> our Flink Sink Connectors.
>> >>>
>> >>>
>> >>> As it stands today, we currently have 3 sink implementations:
>> >>>
>> >>> FlinkSink [1]
>> >>> IcebergSink [2]
>> >>> DynamicSink [3]
>> >>>
>> >>>
>> >>> FlinkSink [1] is the current and default implementation of the Flink
>> Sink Connector.
>> >>>
>> >>>
>> >>> IcebergSink [2] is another implementation of the Flink Sink Connector
>> which was introduced in https://github.com/apache/iceberg/pull/10179. It
>> leverages the latest SinkV2 interfaces in Flink, and it offers the
>> possibility of adding cleanup tasks by the way of implementing the
>> `PostCommitTopology` interface. There is already some work in progress to
>> enable this functionality: https://github.com/apache/iceberg/pull/12979
>> >>>
>> >>>
>> >>> DynamicSink [3] has been recently contributed in
>> https://github.com/apache/iceberg/pull/13304 and it can be used to write
>> to any number of tables, dynamically creating and updating tables and
>> dynamically updating the schema and partition spec of tables.
>> >>>
>> >>>
>> >>> Currently, `IcebergSink` is marked as `@Experimental` and it already
>> offers feature parity with `FlinkSink` (the missing RANGE distribution was
>> recently merged https://github.com/apache/iceberg/pull/12071).
>> >>>
>> >>>
>> >>> With https://github.com/apache/iceberg/pull/11244, users have the
>> choice of specifying which Sink Implementation ([1] or [2]) they want to
>> use for Flink SQL.
>> >>>
>> >>>
>> >>> With all this said, we’re proposing the following for Iceberg 1.11:
>> >>>
>> >>> @Deprecate FlinkSink
>> >>> Promote IcebergSink from @Experimental to @PublicEvolving
>> >>> Make IcebergSink the default implementation in Flink SQL.
>> >>>
>> >>>
>> >>> Then in Iceberg 1.12 we will:
>> >>>
>> >>>
>> >>> Remove the `FlinkSink` implementation.
>> >>> Remove @PublicEvolving from IcebergSink
>> >>>
>> >>>
>> >>> What do you think about this plan?
>> >>>
>> >>>
>> >>> Thanks,
>> >>>
>> >>> Rodrigo
>> >>>
>> >>>
>>
>

Reply via email to