I will reiterate one point that I mentioned before. For the Flink connector, sink is the dominant use case (compared source). Regardless of the unit test issue, it is probably better to go a little slower on this switch of sink implementation.
On Wed, Jun 25, 2025 at 4:22 PM Rodrigo Meneses <rmene...@gmail.com> wrote: > Hello! > > > I wanted to give you folks an update about this, especially because of > Max's suggestion of releasing this change in 1.10, as I know the release > branch will be cut soon, and without this unit test passing, I don’t feel > comfortable releasing this. > > > During the implementation of this feature (which makes the IcebergSink the > default sink for SQL), there’s a unit test that is failing that is covering > Speculative Execution Mode. > > ` > > TestIcebergSpeculativeExecutionSupport.testSpeculativeExecution() > > ` > > The test is timing out. I debugged it while using `FlinkSink` as the sink > engine, and it works > > With FlinkSink, in the debugger, I observed that this code: > > ```java > > if (taskInfo.getIndexOfThisSubtask() == 0 && taskInfo.getAttemptNumber() > <= 0) { > > Thread.*sleep*(Integer.*MAX_VALUE*); > > } > > ``` > > Is being hit 3 times: > > 1. The first time this `taskInfo.getIndexOfThisSubtask() == 0 && > taskInfo.getAttemptNumber() <= 0` evaluates to true, so we Sleep forever > 2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1, so we > skip the infinite sleep and we return the row > 3. The third time (the speculative try), `taskInfo.getAttemptNumber()` > is 1, so we skip the infinite sleep, and return the row. > > And then, the unit test passes. > > > However, with `IcebergSink`, I only see this code hit 2 times. > > 1. The first time this `taskInfo.getIndexOfThisSubtask() == 0 && > taskInfo.getAttemptNumber() <= 0` evaluates to true, so we Sleep forever > 2. The second time, `taskInfo.getIndexOfThisSubtask() ` is 1, so we > return the row. > > There’s no third time, which makes me believe that the speculative mode is > not kicking in, as the slow (infinite waiting) task is not being retried. > > > This could mean 2 things: > > 1. IcebergSink not working as expected with Speculative Mode > 2. The unit test needs to be adjusted for IcebergSink > > While reviewing the code, I found @stevenzu commit from August 2024 [1] > which is related to this unit test also hanging and that adding > `taskInfo.getIndexOfThisSubtask() == 0` was necessary for the unit test to > pass. > > > I am still digging deeper, but so far I haven’t been able to pinpoint the > problem. Will continue on this, but there’s a possibility that this won’t > be ready in time for this to be considered to be part of 1.10 release. > > > Thanks, > > Rodrigo > > > [1] https://github.com/apache/iceberg/pull/10832#discussion_r1701015316 > > On Tue, Jun 24, 2025 at 3:05 AM Maximilian Michels <m...@apache.org> wrote: > >> +1 Great proposal. >> >> What about moving the timeline one release ahead? There isn't any more >> planned work for IcebergSink between the 1.10.0 and the 1.11.0 >> release. IcebergSink already got enough exposure and we won't gain >> anything from waiting longer to make it the default. Thus, I would >> suggest to already deprecate FlinkSink in Iceberg 1.10.0 and promote >> IcebergSink to be the default implementation. Users will have the >> option to switch back to the old sink anytime via the configuration >> option Rodrigo mentioned. If there are no issues, we can proceed to >> remove FlinkSink in Iceberg 1.11.0. >> >> What do you think? >> >> Cheers, >> Max >> >> >> On Mon, Jun 23, 2025 at 10:15 PM Péter Váry <peter.vary.apa...@gmail.com> >> wrote: >> > >> > +1 >> > >> > On Mon, Jun 23, 2025, 22:04 Őrhidi Mátyás <matyas.orh...@gmail.com> >> wrote: >> >> >> >> +1, sounds reasonable to me >> >> >> >> Thanks, >> >> Matyas >> >> >> >> On Mon, Jun 23, 2025 at 11:28 AM Rodrigo Meneses <rmene...@gmail.com> >> wrote: >> >>> >> >>> Hi devs, >> >>> >> >>> >> >>> I’d like to start a discussion about the current and future state of >> our Flink Sink Connectors. >> >>> >> >>> >> >>> As it stands today, we currently have 3 sink implementations: >> >>> >> >>> FlinkSink [1] >> >>> IcebergSink [2] >> >>> DynamicSink [3] >> >>> >> >>> >> >>> FlinkSink [1] is the current and default implementation of the Flink >> Sink Connector. >> >>> >> >>> >> >>> IcebergSink [2] is another implementation of the Flink Sink Connector >> which was introduced in https://github.com/apache/iceberg/pull/10179. It >> leverages the latest SinkV2 interfaces in Flink, and it offers the >> possibility of adding cleanup tasks by the way of implementing the >> `PostCommitTopology` interface. There is already some work in progress to >> enable this functionality: https://github.com/apache/iceberg/pull/12979 >> >>> >> >>> >> >>> DynamicSink [3] has been recently contributed in >> https://github.com/apache/iceberg/pull/13304 and it can be used to write >> to any number of tables, dynamically creating and updating tables and >> dynamically updating the schema and partition spec of tables. >> >>> >> >>> >> >>> Currently, `IcebergSink` is marked as `@Experimental` and it already >> offers feature parity with `FlinkSink` (the missing RANGE distribution was >> recently merged https://github.com/apache/iceberg/pull/12071). >> >>> >> >>> >> >>> With https://github.com/apache/iceberg/pull/11244, users have the >> choice of specifying which Sink Implementation ([1] or [2]) they want to >> use for Flink SQL. >> >>> >> >>> >> >>> With all this said, we’re proposing the following for Iceberg 1.11: >> >>> >> >>> @Deprecate FlinkSink >> >>> Promote IcebergSink from @Experimental to @PublicEvolving >> >>> Make IcebergSink the default implementation in Flink SQL. >> >>> >> >>> >> >>> Then in Iceberg 1.12 we will: >> >>> >> >>> >> >>> Remove the `FlinkSink` implementation. >> >>> Remove @PublicEvolving from IcebergSink >> >>> >> >>> >> >>> What do you think about this plan? >> >>> >> >>> >> >>> Thanks, >> >>> >> >>> Rodrigo >> >>> >> >>> >> >