+1 for the more conservative removal time Maximilian Michels <m...@apache.org> ezt írta (időpont: 2025. jún. 26., Cs, 15:54):
> Hey Rod, > > IcebergSink does not currently support speculative batch execution. Modern > sinks need to implement SupportsConcurrentExecutionAttempts [1] in order > to work with speculative (batch) execution. This is merely a marker > interface. I've verified that after adding it to IcebergSink, the test > passes fine. > > Hey Steven, > > >I will reiterate one point that I mentioned before. For the Flink > connector, sink is the dominant use case (compared source). Regardless of > the unit test issue, it is probably better to go a little slower on this > switch of sink implementation. > > Agreed, but I'm still a bit skeptical, that leaving IcebergSink disabled > by default we will reach greater maturity. We will have a similar situation > for the Iceberg 1.11.0 release, where we again need to decide whether to > make the switch. The truth is, only after the sink is out and about, we can > find any potential issues. Since we have the option for users to switch > back to the old implementation, we at least won't be blocking users. That > said, I hear your concerns and I'm perfectly fine with waiting until > Iceberg 1.11.0 to make IcebergSink the default Flink sink. > > -Max > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/speculative_execution/#enable-sinks-for-speculative-execution > > On Thu, Jun 26, 2025 at 5:38 AM Steven Wu <stevenz...@gmail.com> wrote: > >> I will reiterate one point that I mentioned before. For the Flink >> connector, sink is the dominant use case (compared source). Regardless of >> the unit test issue, it is probably better to go a little slower on this >> switch of sink implementation. >> >> On Wed, Jun 25, 2025 at 4:22 PM Rodrigo Meneses <rmene...@gmail.com> >> wrote: >> >>> Hello! >>> >>> >>> I wanted to give you folks an update about this, especially because of >>> Max's suggestion of releasing this change in 1.10, as I know the release >>> branch will be cut soon, and without this unit test passing, I don’t feel >>> comfortable releasing this. >>> >>> >>> During the implementation of this feature (which makes the IcebergSink >>> the default sink for SQL), there’s a unit test that is failing that is >>> covering Speculative Execution Mode. >>> >>> ` >>> >>> TestIcebergSpeculativeExecutionSupport.testSpeculativeExecution() >>> >>> ` >>> >>> The test is timing out. I debugged it while using `FlinkSink` as the >>> sink engine, and it works >>> >>> With FlinkSink, in the debugger, I observed that this code: >>> >>> ```java >>> >>> if (taskInfo.getIndexOfThisSubtask() == 0 && taskInfo.getAttemptNumber() >>> <= 0) { >>> >>> Thread.*sleep*(Integer.*MAX_VALUE*); >>> >>> } >>> >>> ``` >>> >>> Is being hit 3 times: >>> >>> 1. The first time this `taskInfo.getIndexOfThisSubtask() == 0 && >>> taskInfo.getAttemptNumber() <= 0` evaluates to true, so we Sleep forever >>> 2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1, so we >>> skip the infinite sleep and we return the row >>> 3. The third time (the speculative try), >>> `taskInfo.getAttemptNumber()` is 1, so we skip the infinite sleep, and >>> return the row. >>> >>> And then, the unit test passes. >>> >>> >>> However, with `IcebergSink`, I only see this code hit 2 times. >>> >>> 1. The first time this `taskInfo.getIndexOfThisSubtask() == 0 && >>> taskInfo.getAttemptNumber() <= 0` evaluates to true, so we Sleep forever >>> 2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1, so we >>> return the row. >>> >>> There’s no third time, which makes me believe that the speculative mode >>> is not kicking in, as the slow (infinite waiting) task is not being retried. >>> >>> >>> >>> This could mean 2 things: >>> >>> 1. IcebergSink not working as expected with Speculative Mode >>> 2. The unit test needs to be adjusted for IcebergSink >>> >>> While reviewing the code, I found @stevenzu commit from August 2024 [1] >>> which is related to this unit test also hanging and that adding >>> `taskInfo.getIndexOfThisSubtask() == 0` was necessary for the unit test to >>> pass. >>> >>> >>> I am still digging deeper, but so far I haven’t been able to pinpoint >>> the problem. Will continue on this, but there’s a possibility that this >>> won’t be ready in time for this to be considered to be part of 1.10 release. >>> >>> >>> Thanks, >>> >>> Rodrigo >>> >>> >>> [1] https://github.com/apache/iceberg/pull/10832#discussion_r1701015316 >>> >>> On Tue, Jun 24, 2025 at 3:05 AM Maximilian Michels <m...@apache.org> >>> wrote: >>> >>>> +1 Great proposal. >>>> >>>> What about moving the timeline one release ahead? There isn't any more >>>> planned work for IcebergSink between the 1.10.0 and the 1.11.0 >>>> release. IcebergSink already got enough exposure and we won't gain >>>> anything from waiting longer to make it the default. Thus, I would >>>> suggest to already deprecate FlinkSink in Iceberg 1.10.0 and promote >>>> IcebergSink to be the default implementation. Users will have the >>>> option to switch back to the old sink anytime via the configuration >>>> option Rodrigo mentioned. If there are no issues, we can proceed to >>>> remove FlinkSink in Iceberg 1.11.0. >>>> >>>> What do you think? >>>> >>>> Cheers, >>>> Max >>>> >>>> >>>> On Mon, Jun 23, 2025 at 10:15 PM Péter Váry < >>>> peter.vary.apa...@gmail.com> wrote: >>>> > >>>> > +1 >>>> > >>>> > On Mon, Jun 23, 2025, 22:04 Őrhidi Mátyás <matyas.orh...@gmail.com> >>>> wrote: >>>> >> >>>> >> +1, sounds reasonable to me >>>> >> >>>> >> Thanks, >>>> >> Matyas >>>> >> >>>> >> On Mon, Jun 23, 2025 at 11:28 AM Rodrigo Meneses <rmene...@gmail.com> >>>> wrote: >>>> >>> >>>> >>> Hi devs, >>>> >>> >>>> >>> >>>> >>> I’d like to start a discussion about the current and future state >>>> of our Flink Sink Connectors. >>>> >>> >>>> >>> >>>> >>> As it stands today, we currently have 3 sink implementations: >>>> >>> >>>> >>> FlinkSink [1] >>>> >>> IcebergSink [2] >>>> >>> DynamicSink [3] >>>> >>> >>>> >>> >>>> >>> FlinkSink [1] is the current and default implementation of the >>>> Flink Sink Connector. >>>> >>> >>>> >>> >>>> >>> IcebergSink [2] is another implementation of the Flink Sink >>>> Connector which was introduced in >>>> https://github.com/apache/iceberg/pull/10179. It leverages the latest >>>> SinkV2 interfaces in Flink, and it offers the possibility of adding cleanup >>>> tasks by the way of implementing the `PostCommitTopology` interface. There >>>> is already some work in progress to enable this functionality: >>>> https://github.com/apache/iceberg/pull/12979 >>>> >>> >>>> >>> >>>> >>> DynamicSink [3] has been recently contributed in >>>> https://github.com/apache/iceberg/pull/13304 and it can be used to >>>> write to any number of tables, dynamically creating and updating tables and >>>> dynamically updating the schema and partition spec of tables. >>>> >>> >>>> >>> >>>> >>> Currently, `IcebergSink` is marked as `@Experimental` and it >>>> already offers feature parity with `FlinkSink` (the missing RANGE >>>> distribution was recently merged >>>> https://github.com/apache/iceberg/pull/12071). >>>> >>> >>>> >>> >>>> >>> With https://github.com/apache/iceberg/pull/11244, users have the >>>> choice of specifying which Sink Implementation ([1] or [2]) they want to >>>> use for Flink SQL. >>>> >>> >>>> >>> >>>> >>> With all this said, we’re proposing the following for Iceberg 1.11: >>>> >>> >>>> >>> @Deprecate FlinkSink >>>> >>> Promote IcebergSink from @Experimental to @PublicEvolving >>>> >>> Make IcebergSink the default implementation in Flink SQL. >>>> >>> >>>> >>> >>>> >>> Then in Iceberg 1.12 we will: >>>> >>> >>>> >>> >>>> >>> Remove the `FlinkSink` implementation. >>>> >>> Remove @PublicEvolving from IcebergSink >>>> >>> >>>> >>> >>>> >>> What do you think about this plan? >>>> >>> >>>> >>> >>>> >>> Thanks, >>>> >>> >>>> >>> Rodrigo >>>> >>> >>>> >>> >>>> >>>