Thanks everyone for your answers! I really appreciate it, especially since these come into during the weekend, using your own time.
@Manu, during our initial discussion, you have mentioned that you had similar issues with Spark compactions. You needed locking there. Is it still an issue? If Spark/Kafka/Flink users all need some kind of locking it might worth considering coming up with an Iceberg solution for it. I see the following possibilities: 1. Create a locking feature using the current Catalog API: We could create a 'lock' instead of 'tag', and we could use the Catalogs atomic change requirement to make locking atomic. My main concern with this approach is that it relies on the linear history of the table and produces more contention in write side. I was OK with this in the very specific use-case with Flink Table Maintenance (few changes, controlled times), but as a general solution, I would look somewhere else. 2. Add locking to the Catalog API as a requirement: We could widen the Catalog API with a lock/unlock optional feature. I'm not sure that we see enough use cases to merit such a big change. OTOH this could be a good additional feature to the REST Catalog as well, and maybe something we would like to add to our Catalog sooner or later. About the specific case: if we want to integrate Flink Table Maintenance to the streaming Flink sinks - and I consider that one of the main use cases - we can't rely on external schedulers to prevent concurrent writes. We can make sure that the Tasks can tolerate concurrent runs, but as mentioned in the doc, in most cases having concurrent runs are a waste of resources, because of the commit conflicts. If we decide to pursue the Flink only solution, I would implement a JDBC based locking implementation for the LockFactory interface based on the feedback. This seems like the most natural and available external storage for locks. Later we could add more implementations based on the user needs. Thanks for the feedback and discussion, Peter On Mon, Aug 5, 2024, 05:18 Manu Zhang <owenzhang1...@gmail.com> wrote: > Not familiar with Flink, I'm wondering how Flink resolves concurrency > issues in common Flink use cases. For example, how does Flink prevent two > jobs from writing to the same file? > > On the other hand, an Iceberg tag is eventually an atomic change to a > file. It's the same as using a file lock. I don't think we can handle such > cases without relying on external dependencies. We may define interfaces in > Iceberg and let users choose whatever services they can use to implement > it. My $0.01. > > Manu > > > On Mon, Aug 5, 2024 at 8:38 AM Steven Wu <stevenz...@gmail.com> wrote: > >> I also don't feel it is the best fit to use tags to implement locks for >> passing control messages. This is the main sticking point for me from the >> design doc. However, we haven't been able to come up with a better solution >> yet. Maybe we need to go back to the drawing board again. >> >> I am also not sure using Kafka topics to send control messages is a >> good fit either. It would introduce a dependency on Kafka to run Flink >> maintenance jobs. It works for Kafka connect sink, because that is for >> Kafka env anyway. >> >> >> On Sun, Aug 4, 2024 at 1:12 PM Ryan Blue <b...@databricks.com.invalid> >> wrote: >> >>> Hi Péter, thanks for bringing this up. >>> >>> I don't think using a tag to "lock" a table is a good idea. The doc >>> calls out that this is necessary "Since Flink doesn’t provide an out of the >>> box solution for downstream operators sending feedback to upstream >>> operators" so this feels like using Iceberg metadata as a side-channel >>> within a Flink application. That doesn't seem like a good idea to me. >>> >>> Why not use a separate Kafka topic to send control messages for this >>> purpose, like what is done in the Kafka Connect sink? I think that's a >>> cleaner way to solve the problem if there is not going to be a way to fix >>> it in Flink. >>> >>> Ryan >>> >>> On Wed, Jul 31, 2024 at 7:45 AM Péter Váry <peter.vary.apa...@gmail.com> >>> wrote: >>> >>>> Hi Team, >>>> >>>> During the discussion around the Flink Table Maintenance [1], [2], I >>>> have highlighted that one of the main decision points is the way we prevent >>>> concurrent Maintenance Tasks from happening concurrently. >>>> >>>> At that time we did not find better solution than providing an >>>> interface for locking, and provide a basic implementation which is based on >>>> Iceberg tags [3]: >>>> >>>> *"For convenience an Iceberg tag based solution is provided, so no >>>> external dependencies are needed. On lock creation a tag named >>>> '__flink_maitenance' is created for the current snapshot, and on lock >>>> removal this tag is removed. This solution is not ideal, as it creates >>>> multiple new versions of the table metadata, and mixes infrastructural >>>> metadata with user metadata. If this is not acceptable then other >>>> implementations could be created which could use external components like >>>> JDBC/Kafka/ZooKeeper."* >>>> >>>> >>>> We are in the implementation phase, and we agreed with Steven to take >>>> one final round with the community, to see if anyone has a better >>>> suggestion, or we could proceed with the originally agreed one. >>>> >>>> So if you have a better idea, please share. >>>> >>>> Thanks, >>>> Peter >>>> >>>> [1] - https://lists.apache.org/thread/qjf83v2xj6lxb9sr8z0v9p0979f8wsmf >>>> [2] - https://lists.apache.org/thread/vjf8m5wg840o58yz4y3q35k2mfhbm49l >>>> [3] - >>>> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit#heading=h.lt9eaimi6zyz >>>> >>> >>> >>> -- >>> Ryan Blue >>> Databricks >>> >>