Hello!

I wanted to give you folks an update about this, especially because of
Max's suggestion of releasing this change in 1.10, as I know the release
branch will be cut soon, and without this unit test passing, I don’t feel
comfortable releasing this.


During the implementation of this feature (which makes the IcebergSink the
default sink for SQL), there’s a unit test that is failing that is covering
Speculative Execution Mode.

`

TestIcebergSpeculativeExecutionSupport.testSpeculativeExecution()

`

The test is timing out. I debugged it while using `FlinkSink` as the sink
engine, and it works

With FlinkSink, in the debugger, I observed that this code:

```java

if (taskInfo.getIndexOfThisSubtask() == 0 && taskInfo.getAttemptNumber() <=
0) {

  Thread.*sleep*(Integer.*MAX_VALUE*);

}

```

Is being hit 3 times:

   1. The first time this `taskInfo.getIndexOfThisSubtask() == 0 &&
   taskInfo.getAttemptNumber() <= 0` evaluates to true, so we Sleep forever
   2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1, so we skip
   the infinite sleep and we return the row
   3. The third time (the speculative try), `taskInfo.getAttemptNumber()`
   is 1, so we skip the infinite sleep, and return the row.

And then, the unit test passes.


However, with `IcebergSink`, I only see this code hit 2 times.

   1. The first time this `taskInfo.getIndexOfThisSubtask() == 0 &&
   taskInfo.getAttemptNumber() <= 0` evaluates to true, so we Sleep forever
   2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1, so we
   return the row.

There’s no third time, which makes me believe that the speculative mode is
not kicking in, as the slow (infinite waiting) task is not being retried.


This could mean 2 things:

   1. IcebergSink not working as expected with Speculative Mode
   2. The unit test needs to be adjusted for IcebergSink

While reviewing the code, I found @stevenzu commit from August 2024 [1]
which is related to this unit test also hanging and that adding
`taskInfo.getIndexOfThisSubtask() == 0` was necessary for the unit test to
pass.


I am still digging deeper, but so far I haven’t been able to pinpoint the
problem. Will continue on this, but there’s a possibility that this won’t
be ready in time for this to be considered to be part of 1.10 release.


Thanks,

Rodrigo


[1] https://github.com/apache/iceberg/pull/10832#discussion_r1701015316

On Tue, Jun 24, 2025 at 3:05 AM Maximilian Michels <m...@apache.org> wrote:

> +1 Great proposal.
>
> What about moving the timeline one release ahead? There isn't any more
> planned work for IcebergSink between the 1.10.0 and the 1.11.0
> release. IcebergSink already got enough exposure and we won't gain
> anything from waiting longer to make it the default. Thus, I would
> suggest to already deprecate FlinkSink in Iceberg 1.10.0 and promote
> IcebergSink to be the default implementation. Users will have the
> option to switch back to the old sink anytime via the configuration
> option Rodrigo mentioned. If there are no issues, we can proceed to
> remove FlinkSink in Iceberg 1.11.0.
>
> What do you think?
>
> Cheers,
> Max
>
>
> On Mon, Jun 23, 2025 at 10:15 PM Péter Váry <peter.vary.apa...@gmail.com>
> wrote:
> >
> > +1
> >
> > On Mon, Jun 23, 2025, 22:04 Őrhidi Mátyás <matyas.orh...@gmail.com>
> wrote:
> >>
> >> +1, sounds reasonable to me
> >>
> >> Thanks,
> >> Matyas
> >>
> >> On Mon, Jun 23, 2025 at 11:28 AM Rodrigo Meneses <rmene...@gmail.com>
> wrote:
> >>>
> >>> Hi devs,
> >>>
> >>>
> >>> I’d like to start a discussion about the current and future state of
> our Flink Sink Connectors.
> >>>
> >>>
> >>> As it stands today, we currently have 3 sink implementations:
> >>>
> >>> FlinkSink [1]
> >>> IcebergSink [2]
> >>> DynamicSink [3]
> >>>
> >>>
> >>> FlinkSink [1] is the current and default implementation of the Flink
> Sink Connector.
> >>>
> >>>
> >>> IcebergSink [2] is another implementation of the Flink Sink Connector
> which was introduced in https://github.com/apache/iceberg/pull/10179. It
> leverages the latest SinkV2 interfaces in Flink, and it offers the
> possibility of adding cleanup tasks by the way of implementing the
> `PostCommitTopology` interface. There is already some work in progress to
> enable this functionality: https://github.com/apache/iceberg/pull/12979
> >>>
> >>>
> >>> DynamicSink [3] has been recently contributed in
> https://github.com/apache/iceberg/pull/13304 and it can be used to write
> to any number of tables, dynamically creating and updating tables and
> dynamically updating the schema and partition spec of tables.
> >>>
> >>>
> >>> Currently, `IcebergSink` is marked as `@Experimental` and it already
> offers feature parity with `FlinkSink` (the missing RANGE distribution was
> recently merged https://github.com/apache/iceberg/pull/12071).
> >>>
> >>>
> >>> With https://github.com/apache/iceberg/pull/11244, users have the
> choice of specifying which Sink Implementation ([1] or [2]) they want to
> use for Flink SQL.
> >>>
> >>>
> >>> With all this said, we’re proposing the following for Iceberg 1.11:
> >>>
> >>> @Deprecate FlinkSink
> >>> Promote IcebergSink from @Experimental to @PublicEvolving
> >>> Make IcebergSink the default implementation in Flink SQL.
> >>>
> >>>
> >>> Then in Iceberg 1.12 we will:
> >>>
> >>>
> >>> Remove the `FlinkSink` implementation.
> >>> Remove @PublicEvolving from IcebergSink
> >>>
> >>>
> >>> What do you think about this plan?
> >>>
> >>>
> >>> Thanks,
> >>>
> >>> Rodrigo
> >>>
> >>>
>

Reply via email to