+1 for the more conservative removal time

Maximilian Michels <m...@apache.org> ezt írta (időpont: 2025. jún. 26., Cs,
15:54):

> Hey Rod,
>
> IcebergSink does not currently support speculative batch execution. Modern
> sinks need to implement SupportsConcurrentExecutionAttempts [1] in order
> to work with speculative (batch) execution. This is merely a marker
> interface. I've verified that after adding it to IcebergSink, the test
> passes fine.
>
> Hey Steven,
>
> >I will reiterate one point that I mentioned before. For the Flink
> connector, sink is the dominant use case (compared source). Regardless of
> the unit test issue, it is probably better to go a little slower on this
> switch of sink implementation.
>
> Agreed, but I'm still a bit skeptical, that leaving IcebergSink disabled
> by default we will reach greater maturity. We will have a similar situation
> for the Iceberg 1.11.0 release, where we again need to decide whether to
> make the switch. The truth is, only after the sink is out and about, we can
> find any potential issues. Since we have the option for users to switch
> back to the old implementation, we at least won't be blocking users. That
> said, I hear your concerns and I'm perfectly fine with waiting until
> Iceberg 1.11.0 to make IcebergSink the default Flink sink.
>
> -Max
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/speculative_execution/#enable-sinks-for-speculative-execution
>
> On Thu, Jun 26, 2025 at 5:38 AM Steven Wu <stevenz...@gmail.com> wrote:
>
>> I will reiterate one point that I mentioned before. For the Flink
>> connector, sink is the dominant use case (compared source). Regardless of
>> the unit test issue, it is probably better to go a little slower on this
>> switch of sink implementation.
>>
>> On Wed, Jun 25, 2025 at 4:22 PM Rodrigo Meneses <rmene...@gmail.com>
>> wrote:
>>
>>> Hello!
>>>
>>>
>>> I wanted to give you folks an update about this, especially because of
>>> Max's suggestion of releasing this change in 1.10, as I know the release
>>> branch will be cut soon, and without this unit test passing, I don’t feel
>>> comfortable releasing this.
>>>
>>>
>>> During the implementation of this feature (which makes the IcebergSink
>>> the default sink for SQL), there’s a unit test that is failing that is
>>> covering Speculative Execution Mode.
>>>
>>> `
>>>
>>> TestIcebergSpeculativeExecutionSupport.testSpeculativeExecution()
>>>
>>> `
>>>
>>> The test is timing out. I debugged it while using `FlinkSink` as the
>>> sink engine, and it works
>>>
>>> With FlinkSink, in the debugger, I observed that this code:
>>>
>>> ```java
>>>
>>> if (taskInfo.getIndexOfThisSubtask() == 0 && taskInfo.getAttemptNumber()
>>> <= 0) {
>>>
>>>   Thread.*sleep*(Integer.*MAX_VALUE*);
>>>
>>> }
>>>
>>> ```
>>>
>>> Is being hit 3 times:
>>>
>>>    1. The first time this `taskInfo.getIndexOfThisSubtask() == 0 &&
>>>    taskInfo.getAttemptNumber() <= 0` evaluates to true, so we Sleep forever
>>>    2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1, so we
>>>    skip the infinite sleep and we return the row
>>>    3. The third time (the speculative try),
>>>    `taskInfo.getAttemptNumber()` is 1, so we skip the infinite sleep, and
>>>    return the row.
>>>
>>> And then, the unit test passes.
>>>
>>>
>>> However, with `IcebergSink`, I only see this code hit 2 times.
>>>
>>>    1. The first time this `taskInfo.getIndexOfThisSubtask() == 0 &&
>>>    taskInfo.getAttemptNumber() <= 0` evaluates to true, so we Sleep forever
>>>    2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1, so we
>>>    return the row.
>>>
>>> There’s no third time, which makes me believe that the speculative mode
>>> is not kicking in, as the slow (infinite waiting) task is not being retried.
>>>
>>>
>>>
>>> This could mean 2 things:
>>>
>>>    1. IcebergSink not working as expected with Speculative Mode
>>>    2. The unit test needs to be adjusted for IcebergSink
>>>
>>> While reviewing the code, I found @stevenzu commit from August 2024 [1]
>>> which is related to this unit test also hanging and that adding
>>> `taskInfo.getIndexOfThisSubtask() == 0` was necessary for the unit test to
>>> pass.
>>>
>>>
>>> I am still digging deeper, but so far I haven’t been able to pinpoint
>>> the problem. Will continue on this, but there’s a possibility that this
>>> won’t be ready in time for this to be considered to be part of 1.10 release.
>>>
>>>
>>> Thanks,
>>>
>>> Rodrigo
>>>
>>>
>>> [1] https://github.com/apache/iceberg/pull/10832#discussion_r1701015316
>>>
>>> On Tue, Jun 24, 2025 at 3:05 AM Maximilian Michels <m...@apache.org>
>>> wrote:
>>>
>>>> +1 Great proposal.
>>>>
>>>> What about moving the timeline one release ahead? There isn't any more
>>>> planned work for IcebergSink between the 1.10.0 and the 1.11.0
>>>> release. IcebergSink already got enough exposure and we won't gain
>>>> anything from waiting longer to make it the default. Thus, I would
>>>> suggest to already deprecate FlinkSink in Iceberg 1.10.0 and promote
>>>> IcebergSink to be the default implementation. Users will have the
>>>> option to switch back to the old sink anytime via the configuration
>>>> option Rodrigo mentioned. If there are no issues, we can proceed to
>>>> remove FlinkSink in Iceberg 1.11.0.
>>>>
>>>> What do you think?
>>>>
>>>> Cheers,
>>>> Max
>>>>
>>>>
>>>> On Mon, Jun 23, 2025 at 10:15 PM Péter Váry <
>>>> peter.vary.apa...@gmail.com> wrote:
>>>> >
>>>> > +1
>>>> >
>>>> > On Mon, Jun 23, 2025, 22:04 Őrhidi Mátyás <matyas.orh...@gmail.com>
>>>> wrote:
>>>> >>
>>>> >> +1, sounds reasonable to me
>>>> >>
>>>> >> Thanks,
>>>> >> Matyas
>>>> >>
>>>> >> On Mon, Jun 23, 2025 at 11:28 AM Rodrigo Meneses <rmene...@gmail.com>
>>>> wrote:
>>>> >>>
>>>> >>> Hi devs,
>>>> >>>
>>>> >>>
>>>> >>> I’d like to start a discussion about the current and future state
>>>> of our Flink Sink Connectors.
>>>> >>>
>>>> >>>
>>>> >>> As it stands today, we currently have 3 sink implementations:
>>>> >>>
>>>> >>> FlinkSink [1]
>>>> >>> IcebergSink [2]
>>>> >>> DynamicSink [3]
>>>> >>>
>>>> >>>
>>>> >>> FlinkSink [1] is the current and default implementation of the
>>>> Flink Sink Connector.
>>>> >>>
>>>> >>>
>>>> >>> IcebergSink [2] is another implementation of the Flink Sink
>>>> Connector which was introduced in
>>>> https://github.com/apache/iceberg/pull/10179. It leverages the latest
>>>> SinkV2 interfaces in Flink, and it offers the possibility of adding cleanup
>>>> tasks by the way of implementing the `PostCommitTopology` interface. There
>>>> is already some work in progress to enable this functionality:
>>>> https://github.com/apache/iceberg/pull/12979
>>>> >>>
>>>> >>>
>>>> >>> DynamicSink [3] has been recently contributed in
>>>> https://github.com/apache/iceberg/pull/13304 and it can be used to
>>>> write to any number of tables, dynamically creating and updating tables and
>>>> dynamically updating the schema and partition spec of tables.
>>>> >>>
>>>> >>>
>>>> >>> Currently, `IcebergSink` is marked as `@Experimental` and it
>>>> already offers feature parity with `FlinkSink` (the missing RANGE
>>>> distribution was recently merged
>>>> https://github.com/apache/iceberg/pull/12071).
>>>> >>>
>>>> >>>
>>>> >>> With https://github.com/apache/iceberg/pull/11244, users have the
>>>> choice of specifying which Sink Implementation ([1] or [2]) they want to
>>>> use for Flink SQL.
>>>> >>>
>>>> >>>
>>>> >>> With all this said, we’re proposing the following for Iceberg 1.11:
>>>> >>>
>>>> >>> @Deprecate FlinkSink
>>>> >>> Promote IcebergSink from @Experimental to @PublicEvolving
>>>> >>> Make IcebergSink the default implementation in Flink SQL.
>>>> >>>
>>>> >>>
>>>> >>> Then in Iceberg 1.12 we will:
>>>> >>>
>>>> >>>
>>>> >>> Remove the `FlinkSink` implementation.
>>>> >>> Remove @PublicEvolving from IcebergSink
>>>> >>>
>>>> >>>
>>>> >>> What do you think about this plan?
>>>> >>>
>>>> >>>
>>>> >>> Thanks,
>>>> >>>
>>>> >>> Rodrigo
>>>> >>>
>>>> >>>
>>>>
>>>

Reply via email to