Oh.. I missed that. That's a bit better, but I'm still not sure. Do we know of internal users who are already on the new Sink? What was the experience there?
Maximilian Michels <m...@apache.org> ezt írta (időpont: 2025. jún. 27., P, 13:49): > Peter, I was suggesting not to remove earlier, but to switch the default > sink implementation earlier. Is that what you mean? > > -Max > > On Thu, Jun 26, 2025 at 5:19 PM Péter Váry <peter.vary.apa...@gmail.com> > wrote: > >> +1 for the more conservative removal time >> >> Maximilian Michels <m...@apache.org> ezt írta (időpont: 2025. jún. 26., >> Cs, 15:54): >> >>> Hey Rod, >>> >>> IcebergSink does not currently support speculative batch execution. >>> Modern sinks need to implement SupportsConcurrentExecutionAttempts [1] >>> in order to work with speculative (batch) execution. This is merely a >>> marker interface. I've verified that after adding it to IcebergSink, the >>> test passes fine. >>> >>> Hey Steven, >>> >>> >I will reiterate one point that I mentioned before. For the Flink >>> connector, sink is the dominant use case (compared source). Regardless of >>> the unit test issue, it is probably better to go a little slower on this >>> switch of sink implementation. >>> >>> Agreed, but I'm still a bit skeptical, that leaving IcebergSink disabled >>> by default we will reach greater maturity. We will have a similar situation >>> for the Iceberg 1.11.0 release, where we again need to decide whether to >>> make the switch. The truth is, only after the sink is out and about, we can >>> find any potential issues. Since we have the option for users to switch >>> back to the old implementation, we at least won't be blocking users. That >>> said, I hear your concerns and I'm perfectly fine with waiting until >>> Iceberg 1.11.0 to make IcebergSink the default Flink sink. >>> >>> -Max >>> >>> [1] >>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/speculative_execution/#enable-sinks-for-speculative-execution >>> >>> On Thu, Jun 26, 2025 at 5:38 AM Steven Wu <stevenz...@gmail.com> wrote: >>> >>>> I will reiterate one point that I mentioned before. For the Flink >>>> connector, sink is the dominant use case (compared source). Regardless of >>>> the unit test issue, it is probably better to go a little slower on this >>>> switch of sink implementation. >>>> >>>> On Wed, Jun 25, 2025 at 4:22 PM Rodrigo Meneses <rmene...@gmail.com> >>>> wrote: >>>> >>>>> Hello! >>>>> >>>>> >>>>> I wanted to give you folks an update about this, especially because of >>>>> Max's suggestion of releasing this change in 1.10, as I know the release >>>>> branch will be cut soon, and without this unit test passing, I don’t feel >>>>> comfortable releasing this. >>>>> >>>>> >>>>> During the implementation of this feature (which makes the IcebergSink >>>>> the default sink for SQL), there’s a unit test that is failing that is >>>>> covering Speculative Execution Mode. >>>>> >>>>> ` >>>>> >>>>> TestIcebergSpeculativeExecutionSupport.testSpeculativeExecution() >>>>> >>>>> ` >>>>> >>>>> The test is timing out. I debugged it while using `FlinkSink` as the >>>>> sink engine, and it works >>>>> >>>>> With FlinkSink, in the debugger, I observed that this code: >>>>> >>>>> ```java >>>>> >>>>> if (taskInfo.getIndexOfThisSubtask() == 0 && >>>>> taskInfo.getAttemptNumber() <= 0) { >>>>> >>>>> Thread.*sleep*(Integer.*MAX_VALUE*); >>>>> >>>>> } >>>>> >>>>> ``` >>>>> >>>>> Is being hit 3 times: >>>>> >>>>> 1. The first time this `taskInfo.getIndexOfThisSubtask() == 0 && >>>>> taskInfo.getAttemptNumber() <= 0` evaluates to true, so we Sleep >>>>> forever >>>>> 2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1, so >>>>> we skip the infinite sleep and we return the row >>>>> 3. The third time (the speculative try), >>>>> `taskInfo.getAttemptNumber()` is 1, so we skip the infinite sleep, and >>>>> return the row. >>>>> >>>>> And then, the unit test passes. >>>>> >>>>> >>>>> However, with `IcebergSink`, I only see this code hit 2 times. >>>>> >>>>> 1. The first time this `taskInfo.getIndexOfThisSubtask() == 0 && >>>>> taskInfo.getAttemptNumber() <= 0` evaluates to true, so we Sleep >>>>> forever >>>>> 2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1, so >>>>> we return the row. >>>>> >>>>> There’s no third time, which makes me believe that the speculative >>>>> mode is not kicking in, as the slow (infinite waiting) task is not being >>>>> retried. >>>>> >>>>> >>>>> This could mean 2 things: >>>>> >>>>> 1. IcebergSink not working as expected with Speculative Mode >>>>> 2. The unit test needs to be adjusted for IcebergSink >>>>> >>>>> While reviewing the code, I found @stevenzu commit from August 2024 >>>>> [1] which is related to this unit test also hanging and that adding >>>>> `taskInfo.getIndexOfThisSubtask() == 0` was necessary for the unit test to >>>>> pass. >>>>> >>>>> >>>>> I am still digging deeper, but so far I haven’t been able to pinpoint >>>>> the problem. Will continue on this, but there’s a possibility that this >>>>> won’t be ready in time for this to be considered to be part of 1.10 >>>>> release. >>>>> >>>>> >>>>> Thanks, >>>>> >>>>> Rodrigo >>>>> >>>>> >>>>> [1] >>>>> https://github.com/apache/iceberg/pull/10832#discussion_r1701015316 >>>>> >>>>> On Tue, Jun 24, 2025 at 3:05 AM Maximilian Michels <m...@apache.org> >>>>> wrote: >>>>> >>>>>> +1 Great proposal. >>>>>> >>>>>> What about moving the timeline one release ahead? There isn't any more >>>>>> planned work for IcebergSink between the 1.10.0 and the 1.11.0 >>>>>> release. IcebergSink already got enough exposure and we won't gain >>>>>> anything from waiting longer to make it the default. Thus, I would >>>>>> suggest to already deprecate FlinkSink in Iceberg 1.10.0 and promote >>>>>> IcebergSink to be the default implementation. Users will have the >>>>>> option to switch back to the old sink anytime via the configuration >>>>>> option Rodrigo mentioned. If there are no issues, we can proceed to >>>>>> remove FlinkSink in Iceberg 1.11.0. >>>>>> >>>>>> What do you think? >>>>>> >>>>>> Cheers, >>>>>> Max >>>>>> >>>>>> >>>>>> On Mon, Jun 23, 2025 at 10:15 PM Péter Váry < >>>>>> peter.vary.apa...@gmail.com> wrote: >>>>>> > >>>>>> > +1 >>>>>> > >>>>>> > On Mon, Jun 23, 2025, 22:04 Őrhidi Mátyás <matyas.orh...@gmail.com> >>>>>> wrote: >>>>>> >> >>>>>> >> +1, sounds reasonable to me >>>>>> >> >>>>>> >> Thanks, >>>>>> >> Matyas >>>>>> >> >>>>>> >> On Mon, Jun 23, 2025 at 11:28 AM Rodrigo Meneses < >>>>>> rmene...@gmail.com> wrote: >>>>>> >>> >>>>>> >>> Hi devs, >>>>>> >>> >>>>>> >>> >>>>>> >>> I’d like to start a discussion about the current and future state >>>>>> of our Flink Sink Connectors. >>>>>> >>> >>>>>> >>> >>>>>> >>> As it stands today, we currently have 3 sink implementations: >>>>>> >>> >>>>>> >>> FlinkSink [1] >>>>>> >>> IcebergSink [2] >>>>>> >>> DynamicSink [3] >>>>>> >>> >>>>>> >>> >>>>>> >>> FlinkSink [1] is the current and default implementation of the >>>>>> Flink Sink Connector. >>>>>> >>> >>>>>> >>> >>>>>> >>> IcebergSink [2] is another implementation of the Flink Sink >>>>>> Connector which was introduced in >>>>>> https://github.com/apache/iceberg/pull/10179. It leverages the >>>>>> latest SinkV2 interfaces in Flink, and it offers the possibility of >>>>>> adding >>>>>> cleanup tasks by the way of implementing the `PostCommitTopology` >>>>>> interface. There is already some work in progress to enable this >>>>>> functionality: https://github.com/apache/iceberg/pull/12979 >>>>>> >>> >>>>>> >>> >>>>>> >>> DynamicSink [3] has been recently contributed in >>>>>> https://github.com/apache/iceberg/pull/13304 and it can be used to >>>>>> write to any number of tables, dynamically creating and updating tables >>>>>> and >>>>>> dynamically updating the schema and partition spec of tables. >>>>>> >>> >>>>>> >>> >>>>>> >>> Currently, `IcebergSink` is marked as `@Experimental` and it >>>>>> already offers feature parity with `FlinkSink` (the missing RANGE >>>>>> distribution was recently merged >>>>>> https://github.com/apache/iceberg/pull/12071). >>>>>> >>> >>>>>> >>> >>>>>> >>> With https://github.com/apache/iceberg/pull/11244, users have >>>>>> the choice of specifying which Sink Implementation ([1] or [2]) they want >>>>>> to use for Flink SQL. >>>>>> >>> >>>>>> >>> >>>>>> >>> With all this said, we’re proposing the following for Iceberg >>>>>> 1.11: >>>>>> >>> >>>>>> >>> @Deprecate FlinkSink >>>>>> >>> Promote IcebergSink from @Experimental to @PublicEvolving >>>>>> >>> Make IcebergSink the default implementation in Flink SQL. >>>>>> >>> >>>>>> >>> >>>>>> >>> Then in Iceberg 1.12 we will: >>>>>> >>> >>>>>> >>> >>>>>> >>> Remove the `FlinkSink` implementation. >>>>>> >>> Remove @PublicEvolving from IcebergSink >>>>>> >>> >>>>>> >>> >>>>>> >>> What do you think about this plan? >>>>>> >>> >>>>>> >>> >>>>>> >>> Thanks, >>>>>> >>> >>>>>> >>> Rodrigo >>>>>> >>> >>>>>> >>> >>>>>> >>>>>