Hi Peter, We rely on Airflow to schedule and coordinate maintenance Spark jobs. I agree with Ryan that an Iceberg solution is not a good choice here.
Thanks, Manu On Tue, Aug 6, 2024 at 1:07 AM Ryan Blue <b...@databricks.com.invalid> wrote: > > We can make sure that the Tasks can tolerate concurrent runs, but as > mentioned in the doc, in most cases having concurrent runs are a waste of > resources, because of the commit conflicts. > > Is the problem that users may configure multiple jobs that are all trying > to run maintenance procedures? If so, isn't this a user-level problem? If > you don't want maintenance job conflicts, only run one maintenance job. > > > If Spark/Kafka/Flink users all need some kind of locking it might worth > considering coming up with an Iceberg solution for it. > > I don't think that an Iceberg solution is a good choice here. Coordination > in a system that does work on Iceberg tables does not need to rely on > locking in the Iceberg tables. It should have a way to coordinate > externally. > > On Sun, Aug 4, 2024 at 10:29 PM Péter Váry <peter.vary.apa...@gmail.com> > wrote: > >> Thanks everyone for your answers! I really appreciate it, especially >> since these come into during the weekend, using your own time. >> >> @Manu, during our initial discussion, you have mentioned that you had >> similar issues with Spark compactions. You needed locking there. Is it >> still an issue? >> >> If Spark/Kafka/Flink users all need some kind of locking it might worth >> considering coming up with an Iceberg solution for it. >> I see the following possibilities: >> 1. Create a locking feature using the current Catalog API: We could >> create a 'lock' instead of 'tag', and we could use the Catalogs atomic >> change requirement to make locking atomic. My main concern with this >> approach is that it relies on the linear history of the table and produces >> more contention in write side. I was OK with this in the very specific >> use-case with Flink Table Maintenance (few changes, controlled times), but >> as a general solution, I would look somewhere else. >> 2. Add locking to the Catalog API as a requirement: We could widen the >> Catalog API with a lock/unlock optional feature. I'm not sure that we see >> enough use cases to merit such a big change. OTOH this could be a good >> additional feature to the REST Catalog as well, and maybe something we >> would like to add to our Catalog sooner or later. >> >> About the specific case: if we want to integrate Flink Table Maintenance >> to the streaming Flink sinks - and I consider that one of the main use >> cases - we can't rely on external schedulers to prevent concurrent writes. >> We can make sure that the Tasks can tolerate concurrent runs, but as >> mentioned in the doc, in most cases having concurrent runs are a waste of >> resources, because of the commit conflicts. >> >> If we decide to pursue the Flink only solution, I would implement a JDBC >> based locking implementation for the LockFactory interface based on the >> feedback. This seems like the most natural and available external storage >> for locks. Later we could add more implementations based on the user needs. >> >> Thanks for the feedback and discussion, >> Peter >> >> On Mon, Aug 5, 2024, 05:18 Manu Zhang <owenzhang1...@gmail.com> wrote: >> >>> Not familiar with Flink, I'm wondering how Flink resolves concurrency >>> issues in common Flink use cases. For example, how does Flink prevent two >>> jobs from writing to the same file? >>> >>> On the other hand, an Iceberg tag is eventually an atomic change to a >>> file. It's the same as using a file lock. I don't think we can handle such >>> cases without relying on external dependencies. We may define interfaces in >>> Iceberg and let users choose whatever services they can use to implement >>> it. My $0.01. >>> >>> Manu >>> >>> >>> On Mon, Aug 5, 2024 at 8:38 AM Steven Wu <stevenz...@gmail.com> wrote: >>> >>>> I also don't feel it is the best fit to use tags to implement locks for >>>> passing control messages. This is the main sticking point for me from the >>>> design doc. However, we haven't been able to come up with a better solution >>>> yet. Maybe we need to go back to the drawing board again. >>>> >>>> I am also not sure using Kafka topics to send control messages is a >>>> good fit either. It would introduce a dependency on Kafka to run Flink >>>> maintenance jobs. It works for Kafka connect sink, because that is for >>>> Kafka env anyway. >>>> >>>> >>>> On Sun, Aug 4, 2024 at 1:12 PM Ryan Blue <b...@databricks.com.invalid> >>>> wrote: >>>> >>>>> Hi Péter, thanks for bringing this up. >>>>> >>>>> I don't think using a tag to "lock" a table is a good idea. The doc >>>>> calls out that this is necessary "Since Flink doesn’t provide an out of >>>>> the >>>>> box solution for downstream operators sending feedback to upstream >>>>> operators" so this feels like using Iceberg metadata as a side-channel >>>>> within a Flink application. That doesn't seem like a good idea to me. >>>>> >>>>> Why not use a separate Kafka topic to send control messages for this >>>>> purpose, like what is done in the Kafka Connect sink? I think that's a >>>>> cleaner way to solve the problem if there is not going to be a way to fix >>>>> it in Flink. >>>>> >>>>> Ryan >>>>> >>>>> On Wed, Jul 31, 2024 at 7:45 AM Péter Váry < >>>>> peter.vary.apa...@gmail.com> wrote: >>>>> >>>>>> Hi Team, >>>>>> >>>>>> During the discussion around the Flink Table Maintenance [1], [2], I >>>>>> have highlighted that one of the main decision points is the way we >>>>>> prevent >>>>>> concurrent Maintenance Tasks from happening concurrently. >>>>>> >>>>>> At that time we did not find better solution than providing an >>>>>> interface for locking, and provide a basic implementation which is based >>>>>> on >>>>>> Iceberg tags [3]: >>>>>> >>>>>> *"For convenience an Iceberg tag based solution is provided, so no >>>>>> external dependencies are needed. On lock creation a tag named >>>>>> '__flink_maitenance' is created for the current snapshot, and on lock >>>>>> removal this tag is removed. This solution is not ideal, as it creates >>>>>> multiple new versions of the table metadata, and mixes infrastructural >>>>>> metadata with user metadata. If this is not acceptable then other >>>>>> implementations could be created which could use external components like >>>>>> JDBC/Kafka/ZooKeeper."* >>>>>> >>>>>> >>>>>> We are in the implementation phase, and we agreed with Steven to take >>>>>> one final round with the community, to see if anyone has a better >>>>>> suggestion, or we could proceed with the originally agreed one. >>>>>> >>>>>> So if you have a better idea, please share. >>>>>> >>>>>> Thanks, >>>>>> Peter >>>>>> >>>>>> [1] - >>>>>> https://lists.apache.org/thread/qjf83v2xj6lxb9sr8z0v9p0979f8wsmf >>>>>> [2] - >>>>>> https://lists.apache.org/thread/vjf8m5wg840o58yz4y3q35k2mfhbm49l >>>>>> [3] - >>>>>> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit#heading=h.lt9eaimi6zyz >>>>>> >>>>> >>>>> >>>>> -- >>>>> Ryan Blue >>>>> Databricks >>>>> >>>> > > -- > Ryan Blue > Databricks >