If this is specific to solving the problem that there is no notification
when a task finishes in Flink, then I think it makes sense to use a JDBC
lock. I'd prefer that this not add the tag-based locking strategy because I
think that has the potential to be misunderstood by people using the
library and misused.

On Wed, Aug 7, 2024 at 1:54 AM Péter Váry <peter.vary.apa...@gmail.com>
wrote:

> Hi Anton, nice to hear from you!
> Thanks Ryan for your continued interest!
>
> You can find my answers below:
>
> > Am I right to say the proposal has the following high-level goals:
> > - Perform cheap maintenance actions periodically after commits using the
> same cluster (suitable for things like rewriting manifests, compacting tiny
> data files).
>
> Yes
>
> > - Offer an ability to run a Flink monitor service that would listen to
> table changes and trigger appropriate actions (suitable for more expensive
> operations like merging equality deletes into data files, removing orphan
> files)
>
> I would rephrase this a bit differently: Offer an ability to create a
> Flink Table Maintenance service which will listen to the table changes and
> trigger and execute the appropriate actions. I think the main difference
> here is that the whole monitoring/scheduling/executing is coupled into a
> single job.
>
> > - Support a way to prevent running duplicated and conflicting
> maintenance actions.
>
> This is part of both of the previous goals. For example:
> - Even cheap maintenance actions could conflict, as rewriting manifests
> and compacting tiny data files are conflicting
> - Big maintenance actions could take a long time, and they could easily
> end up in a situation when the next action is scheduled, but would conflict
> with the already running one
>
> > If my understanding is correct, can we elaborate on the exact use cases
> when we will schedule duplicated and conflicting maintenance actions? Are
> we talking about conflicts between data operations and maintenance or
> between different maintenance actions?
>
> Here, we do not talk about conflicts between data operations and
> maintenance tasks. The goal here is to prevent different, conflicting
> maintenance tasks running concurrently.
> One example:
> - Cheap maintenance is enabled on the Flink Sink
> - Compacting tiny files are started
> - Rewriting manifest files is scheduled - but should not start until the
> tiny file compaction is finished. If the tiny file compaction is finished
> first, then the manifest rewrite will not be able to commit, as the current
> list of the manifest files are changed.
>
> Another example:
> - Big maintenance job is running
> - Delete Orphan is started
> - Expire Snapshot is scheduled - but should not start until the Delete
> Orphan files task is finished.  If we start the Expire Snapshots then we
> might remove metadata files which are still read by the Delete Orphan files.
>
> > > If nobody else has a better idea, then I will add a default JDBC based
> locking implementation to the PR
> > Do you mean an implementation of `LockManager` in core or something
> specific to this Flink application?
>
> This would be a Flink-only interface. TriggerLockFactory and
> TriggerLockFactory.Lock [1]. See the proposed JDBC implementation PR [2]
> If other engines want to use it then we could move it to the iceberg-core,
> but I don't see the need for it ATM.
>
> Thanks,
> Peter
>
> [1] -
> https://github.com/apache/iceberg/pull/10484/files#diff-06332fe8ce41f0708e6ab59484ea88dfb86ff5eaa35c9a4e76110e9fb1a290ca
> [2] -
> https://github.com/apache/iceberg/pull/10484/files#diff-5aa5681994bb389833582619220b5f339fa416b60b4535ce7492c5fb32fc417d
>
> Ryan Blue <b...@databricks.com.invalid> ezt írta (időpont: 2024. aug. 6.,
> K, 22:37):
>
>> > If nobody else has a better idea, then I will add a default JDBC based
>> locking implementation to the PR
>>
>> Do you mean an implementation of `LockManager` in core or something
>> specific to this Flink application?
>>
>> On Tue, Aug 6, 2024 at 2:28 AM Péter Váry <peter.vary.apa...@gmail.com>
>> wrote:
>>
>>> > > We can make sure that the Tasks can tolerate concurrent runs, but as
>>> mentioned in the doc, in most cases having concurrent runs are a waste of
>>> resources, because of the commit conflicts.
>>> >
>>> > Is the problem that users may configure multiple jobs that are all
>>> trying to run maintenance procedures? If so, isn't this a user-level
>>> problem? If you don't want maintenance job conflicts, only run one
>>> maintenance job.
>>>
>>> There are conflicts even when a single streaming job schedules the tasks.
>>> One example:
>>> - Streaming job writes rows to an append only table, and the Table
>>> Maintenance is scheduled in the PostCommitTopology
>>> - DataFile rewrite is scheduled on X commits - to concatenate small
>>> files
>>> - ManifestFile rewrite is scheduled on Y commits - to decrease the
>>> number of manifest files
>>>
>>> DataFile rewrite will create a new manifest file. This means if a
>>> DataFile rewrite task is finished and committed, and there is a concurrent
>>> ManifestFile rewrite then the ManifestFile rewrite will fail. I have played
>>> around with serializing the Maintenance Tasks (resulted in a very ugly/hard
>>> to maintain, but working code). This serialization mitigated some of the
>>> issues, but there was still one remaining problem. If there was a delay in
>>> executing the rewrites (job restart/resource deprivation/longer than
>>> expected Maintenance Task run), sometimes a consecutively scheduled
>>> DataFile/ManifestFile rewrite is already started when the previous
>>> DataFile/ManifestFile rewrite was still working. This again resulted in
>>> failures.
>>>
>>> If we accept that locking is a requirement, then we can have:
>>> - Simpler code for the flow - easy to understand flow
>>> - Simpler code for the tasks - no need to separate messages for the
>>> different runs
>>> - No failures in the logs - problematic when swallowed, making hard to
>>> identify real issues when logged
>>>
>>> > I don't think that an Iceberg solution is a good choice here.
>>> Coordination in a system that does work on Iceberg tables does not need to
>>> rely on locking in the Iceberg tables. It should have a way to coordinate
>>> externally.
>>> [..]
>>> > I agree with Ryan that an Iceberg solution is not a good choice here.
>>>
>>> I agree that we don't need to *rely* on locking in the Iceberg tables.
>>> The `TriggerLockFactory.Lock` interface is specifically designed to allow
>>> the user to choose the prefered type of locking. I was trying to come up
>>> with a solution where the Apache Iceberg users don't need to rely on one
>>> more external system for the Flink Table Maintenance to work.
>>>
>>> I understand that this discussion is very similar to the HadoopCatalog
>>> situation, where we have a hacky "solution" which is working in some cases,
>>> but suboptimal. And I understand if we don't want to support it.
>>>
>>> If nobody else has a better idea, then I will add a default JDBC based
>>> locking implementation to the PR [1].
>>>
>>> Thanks,
>>> Peter
>>>
>>> [1] https://github.com/apache/iceberg/pull/10484
>>>
>>> Manu Zhang <owenzhang1...@gmail.com> ezt írta (időpont: 2024. aug. 6.,
>>> K, 6:25):
>>>
>>>> Hi Peter,
>>>>
>>>> We rely on Airflow to schedule and coordinate maintenance Spark jobs. I
>>>> agree with Ryan that an Iceberg solution is not a good choice here.
>>>>
>>>> Thanks,
>>>> Manu
>>>>
>>>> On Tue, Aug 6, 2024 at 1:07 AM Ryan Blue <b...@databricks.com.invalid>
>>>> wrote:
>>>>
>>>>> > We can make sure that the Tasks can tolerate concurrent runs, but as
>>>>> mentioned in the doc, in most cases having concurrent runs are a waste of
>>>>> resources, because of the commit conflicts.
>>>>>
>>>>> Is the problem that users may configure multiple jobs that are all
>>>>> trying to run maintenance procedures? If so, isn't this a user-level
>>>>> problem? If you don't want maintenance job conflicts, only run one
>>>>> maintenance job.
>>>>>
>>>>> > If Spark/Kafka/Flink users all need some kind of locking it might
>>>>> worth considering coming up with an Iceberg solution for it.
>>>>>
>>>>> I don't think that an Iceberg solution is a good choice here.
>>>>> Coordination in a system that does work on Iceberg tables does not need to
>>>>> rely on locking in the Iceberg tables. It should have a way to coordinate
>>>>> externally.
>>>>>
>>>>> On Sun, Aug 4, 2024 at 10:29 PM Péter Váry <
>>>>> peter.vary.apa...@gmail.com> wrote:
>>>>>
>>>>>> Thanks everyone for your answers! I really appreciate it, especially
>>>>>> since these come into during the weekend, using your own time.
>>>>>>
>>>>>> @Manu, during our initial discussion, you have mentioned that you had
>>>>>> similar issues with Spark compactions. You needed locking there. Is it
>>>>>> still an issue?
>>>>>>
>>>>>> If Spark/Kafka/Flink users all need some kind of locking it might
>>>>>> worth considering coming up with an Iceberg solution for it.
>>>>>> I see the following possibilities:
>>>>>> 1. Create a locking feature using the current Catalog API: We could
>>>>>> create a 'lock' instead of 'tag', and we could use the Catalogs atomic
>>>>>> change requirement to make locking atomic. My main concern with this
>>>>>> approach is that it relies on the linear history of the table and 
>>>>>> produces
>>>>>> more contention in write side. I was OK with this in the very specific
>>>>>> use-case with Flink Table Maintenance (few changes, controlled times), 
>>>>>> but
>>>>>> as a general solution, I would look somewhere else.
>>>>>> 2. Add locking to the Catalog API as a requirement: We could widen
>>>>>> the Catalog API with a lock/unlock optional feature. I'm not sure that we
>>>>>> see enough use cases to merit such a big change. OTOH this could be a 
>>>>>> good
>>>>>> additional feature to the REST Catalog as well, and maybe something we
>>>>>> would like to add to our Catalog sooner or later.
>>>>>>
>>>>>> About the specific case: if we want to integrate Flink Table
>>>>>> Maintenance to the streaming Flink sinks - and I consider that one of the
>>>>>> main use cases - we can't rely on external schedulers to prevent 
>>>>>> concurrent
>>>>>> writes. We can make sure that the Tasks can tolerate concurrent runs, but
>>>>>> as mentioned in the doc, in most cases having concurrent runs are a waste
>>>>>> of resources, because of the commit conflicts.
>>>>>>
>>>>>> If we decide to pursue the Flink only solution, I would implement a
>>>>>> JDBC based locking implementation for the LockFactory interface based on
>>>>>> the feedback. This seems like the most natural and available external
>>>>>> storage for locks. Later we could add more implementations based on the
>>>>>> user needs.
>>>>>>
>>>>>> Thanks for the feedback and discussion,
>>>>>> Peter
>>>>>>
>>>>>> On Mon, Aug 5, 2024, 05:18 Manu Zhang <owenzhang1...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Not familiar with Flink, I'm wondering how Flink resolves
>>>>>>> concurrency issues in common Flink use cases. For example, how does 
>>>>>>> Flink
>>>>>>> prevent two jobs from writing to the same file?
>>>>>>>
>>>>>>> On the other hand, an Iceberg tag is eventually an atomic change to
>>>>>>> a file. It's the same as using a file lock. I don't think we can handle
>>>>>>> such cases without relying on external dependencies. We may define
>>>>>>> interfaces in Iceberg and let users choose whatever services they can 
>>>>>>> use
>>>>>>> to implement it. My $0.01.
>>>>>>>
>>>>>>> Manu
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Aug 5, 2024 at 8:38 AM Steven Wu <stevenz...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I also don't feel it is the best fit to use tags to implement locks
>>>>>>>> for passing control messages. This is the main sticking point for me 
>>>>>>>> from
>>>>>>>> the design doc. However, we haven't been able to come up with a better
>>>>>>>> solution yet. Maybe we need to go back to the drawing board again.
>>>>>>>>
>>>>>>>> I am also not sure using Kafka topics to send control messages is a
>>>>>>>> good fit either. It would introduce a dependency on Kafka to run Flink
>>>>>>>> maintenance jobs. It works for Kafka connect sink, because that is for
>>>>>>>> Kafka env anyway.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sun, Aug 4, 2024 at 1:12 PM Ryan Blue
>>>>>>>> <b...@databricks.com.invalid> wrote:
>>>>>>>>
>>>>>>>>>  Hi Péter, thanks for bringing this up.
>>>>>>>>>
>>>>>>>>> I don't think using a tag to "lock" a table is a good idea. The
>>>>>>>>> doc calls out that this is necessary "Since Flink doesn’t provide an 
>>>>>>>>> out of
>>>>>>>>> the box solution for downstream operators sending feedback to upstream
>>>>>>>>> operators" so this feels like using Iceberg metadata as a side-channel
>>>>>>>>> within a Flink application. That doesn't seem like a good idea to me.
>>>>>>>>>
>>>>>>>>> Why not use a separate Kafka topic to send control messages for
>>>>>>>>> this purpose, like what is done in the Kafka Connect sink? I think 
>>>>>>>>> that's a
>>>>>>>>> cleaner way to solve the problem if there is not going to be a way to 
>>>>>>>>> fix
>>>>>>>>> it in Flink.
>>>>>>>>>
>>>>>>>>> Ryan
>>>>>>>>>
>>>>>>>>> On Wed, Jul 31, 2024 at 7:45 AM Péter Váry <
>>>>>>>>> peter.vary.apa...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Team,
>>>>>>>>>>
>>>>>>>>>> During the discussion around the Flink Table Maintenance [1],
>>>>>>>>>> [2], I have highlighted that one of the main decision points is the 
>>>>>>>>>> way we
>>>>>>>>>> prevent concurrent Maintenance Tasks from happening concurrently.
>>>>>>>>>>
>>>>>>>>>> At that time we did not find better solution than providing an
>>>>>>>>>> interface for locking, and provide a basic implementation which is 
>>>>>>>>>> based on
>>>>>>>>>> Iceberg tags [3]:
>>>>>>>>>>
>>>>>>>>>> *"For convenience an Iceberg tag based solution is provided, so
>>>>>>>>>> no external dependencies are needed. On lock creation a tag named
>>>>>>>>>> '__flink_maitenance' is created for the current snapshot, and on lock
>>>>>>>>>> removal this tag is removed. This solution is not ideal, as it 
>>>>>>>>>> creates
>>>>>>>>>> multiple new versions of the table metadata, and mixes 
>>>>>>>>>> infrastructural
>>>>>>>>>> metadata with user metadata. If this is not acceptable then other
>>>>>>>>>> implementations could be created which could use external components 
>>>>>>>>>> like
>>>>>>>>>> JDBC/Kafka/ZooKeeper."*
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> We are in the implementation phase, and we agreed with Steven to
>>>>>>>>>> take one final round with the community, to see if anyone has a 
>>>>>>>>>> better
>>>>>>>>>> suggestion, or we could proceed with the originally agreed one.
>>>>>>>>>>
>>>>>>>>>> So if you have a better idea, please share.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Peter
>>>>>>>>>>
>>>>>>>>>> [1] -
>>>>>>>>>> https://lists.apache.org/thread/qjf83v2xj6lxb9sr8z0v9p0979f8wsmf
>>>>>>>>>> [2] -
>>>>>>>>>> https://lists.apache.org/thread/vjf8m5wg840o58yz4y3q35k2mfhbm49l
>>>>>>>>>> [3] -
>>>>>>>>>> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit#heading=h.lt9eaimi6zyz
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Ryan Blue
>>>>>>>>> Databricks
>>>>>>>>>
>>>>>>>>
>>>>>
>>>>> --
>>>>> Ryan Blue
>>>>> Databricks
>>>>>
>>>>
>>
>> --
>> Ryan Blue
>> Databricks
>>
>

-- 
Ryan Blue
Databricks

Reply via email to