Took a look at the doc as well as this thread.

Am I right to say the proposal has the following high-level goals:

- Perform cheap maintenance actions periodically after commits using the
same cluster (suitable for things like rewriting manifests, compacting tiny
data files).
- Offer an ability to run a Flink monitor service that would listen to
table changes and trigger appropriate actions (suitable for more expensive
operations like merging equality deletes into data files, removing orphan
files)
- Support a way to prevent running duplicated and conflicting maintenance
actions.

If my understanding is correct, can we elaborate on the exact use cases
when we will schedule duplicated and conflicting maintenance actions? Are
we talking about conflicts between data operations and maintenance or
between different maintenance actions?

- Anton

вт, 6 серп. 2024 р. о 08:47 Péter Váry <peter.vary.apa...@gmail.com> пише:

> > If all the maintenance tasks are created from a single Flink job, is it
> possible to simply skip new maintenance task if there’s already running
> task? The running maintenance tasks could be recorded in the JM?
>
> The operator scheduling the tasks doesn't know when the actual tasks are
> finished. We need an information loop which is not something which is
> supported by Flink itself. We need external tools, like Iceberg or some
> other external dependency to push this information upstream (back to the
> scheduler operator)
>
> Xianjin YE <xian...@apache.org> ezt írta (időpont: 2024. aug. 6., K,
> 15:16):
>
>> > DataFile rewrite will create a new manifest file. This means if a
>> DataFile rewrite task is finished and committed, and there is a concurrent
>> ManifestFile rewrite then the ManifestFile rewrite will fail. I have played
>> around with serializing the Maintenance Tasks (resulted in a very ugly/hard
>> to maintain, but working code). This serialization mitigated some of the
>> issues, but there was still one remaining problem. If there was a delay in
>> executing the rewrites (job restart/resource deprivation/longer than
>> expected Maintenance Task run), sometimes a consecutively scheduled
>> DataFile/ManifestFile rewrite is already started when the previous
>> DataFile/ManifestFile rewrite was still working. This again resulted in
>> failures.
>>
>> If all the maintenance tasks are created from a single Flink job, is it
>> possible to simply skip new maintenance task if there’s already running
>> task? The running maintenance tasks could be recorded in the JM?
>>
>> On Aug 6, 2024, at 17:27, Péter Váry <peter.vary.apa...@gmail.com> wrote:
>>
>> > > We can make sure that the Tasks can tolerate concurrent runs, but as
>> mentioned in the doc, in most cases having concurrent runs are a waste of
>> resources, because of the commit conflicts.
>> >
>> > Is the problem that users may configure multiple jobs that are all
>> trying to run maintenance procedures? If so, isn't this a user-level
>> problem? If you don't want maintenance job conflicts, only run one
>> maintenance job.
>>
>> There are conflicts even when a single streaming job schedules the tasks.
>> One example:
>> - Streaming job writes rows to an append only table, and the Table
>> Maintenance is scheduled in the PostCommitTopology
>> - DataFile rewrite is scheduled on X commits - to concatenate small files
>> - ManifestFile rewrite is scheduled on Y commits - to decrease the number
>> of manifest files
>>
>> DataFile rewrite will create a new manifest file. This means if a
>> DataFile rewrite task is finished and committed, and there is a concurrent
>> ManifestFile rewrite then the ManifestFile rewrite will fail. I have played
>> around with serializing the Maintenance Tasks (resulted in a very ugly/hard
>> to maintain, but working code). This serialization mitigated some of the
>> issues, but there was still one remaining problem. If there was a delay in
>> executing the rewrites (job restart/resource deprivation/longer than
>> expected Maintenance Task run), sometimes a consecutively scheduled
>> DataFile/ManifestFile rewrite is already started when the previous
>> DataFile/ManifestFile rewrite was still working. This again resulted in
>> failures.
>>
>> If we accept that locking is a requirement, then we can have:
>> - Simpler code for the flow - easy to understand flow
>> - Simpler code for the tasks - no need to separate messages for the
>> different runs
>> - No failures in the logs - problematic when swallowed, making hard to
>> identify real issues when logged
>>
>> > I don't think that an Iceberg solution is a good choice here.
>> Coordination in a system that does work on Iceberg tables does not need to
>> rely on locking in the Iceberg tables. It should have a way to coordinate
>> externally.
>> [..]
>> > I agree with Ryan that an Iceberg solution is not a good choice here.
>>
>> I agree that we don't need to *rely* on locking in the Iceberg tables.
>> The `TriggerLockFactory.Lock` interface is specifically designed to allow
>> the user to choose the prefered type of locking. I was trying to come up
>> with a solution where the Apache Iceberg users don't need to rely on one
>> more external system for the Flink Table Maintenance to work.
>>
>> I understand that this discussion is very similar to the HadoopCatalog
>> situation, where we have a hacky "solution" which is working in some cases,
>> but suboptimal. And I understand if we don't want to support it.
>>
>> If nobody else has a better idea, then I will add a default JDBC based
>> locking implementation to the PR [1].
>>
>> Thanks,
>> Peter
>>
>> [1] https://github.com/apache/iceberg/pull/10484
>>
>> Manu Zhang <owenzhang1...@gmail.com> ezt írta (időpont: 2024. aug. 6.,
>> K, 6:25):
>>
>>> Hi Peter,
>>>
>>> We rely on Airflow to schedule and coordinate maintenance Spark jobs. I
>>> agree with Ryan that an Iceberg solution is not a good choice here.
>>>
>>> Thanks,
>>> Manu
>>>
>>> On Tue, Aug 6, 2024 at 1:07 AM Ryan Blue <b...@databricks.com.invalid>
>>> wrote:
>>>
>>>> > We can make sure that the Tasks can tolerate concurrent runs, but as
>>>> mentioned in the doc, in most cases having concurrent runs are a waste of
>>>> resources, because of the commit conflicts.
>>>>
>>>> Is the problem that users may configure multiple jobs that are all
>>>> trying to run maintenance procedures? If so, isn't this a user-level
>>>> problem? If you don't want maintenance job conflicts, only run one
>>>> maintenance job.
>>>>
>>>> > If Spark/Kafka/Flink users all need some kind of locking it might
>>>> worth considering coming up with an Iceberg solution for it.
>>>>
>>>> I don't think that an Iceberg solution is a good choice here.
>>>> Coordination in a system that does work on Iceberg tables does not need to
>>>> rely on locking in the Iceberg tables. It should have a way to coordinate
>>>> externally.
>>>>
>>>> On Sun, Aug 4, 2024 at 10:29 PM Péter Váry <peter.vary.apa...@gmail.com>
>>>> wrote:
>>>>
>>>>> Thanks everyone for your answers! I really appreciate it, especially
>>>>> since these come into during the weekend, using your own time.
>>>>>
>>>>> @Manu, during our initial discussion, you have mentioned that you had
>>>>> similar issues with Spark compactions. You needed locking there. Is it
>>>>> still an issue?
>>>>>
>>>>> If Spark/Kafka/Flink users all need some kind of locking it might
>>>>> worth considering coming up with an Iceberg solution for it.
>>>>> I see the following possibilities:
>>>>> 1. Create a locking feature using the current Catalog API: We could
>>>>> create a 'lock' instead of 'tag', and we could use the Catalogs atomic
>>>>> change requirement to make locking atomic. My main concern with this
>>>>> approach is that it relies on the linear history of the table and produces
>>>>> more contention in write side. I was OK with this in the very specific
>>>>> use-case with Flink Table Maintenance (few changes, controlled times), but
>>>>> as a general solution, I would look somewhere else.
>>>>> 2. Add locking to the Catalog API as a requirement: We could widen the
>>>>> Catalog API with a lock/unlock optional feature. I'm not sure that we see
>>>>> enough use cases to merit such a big change. OTOH this could be a good
>>>>> additional feature to the REST Catalog as well, and maybe something we
>>>>> would like to add to our Catalog sooner or later.
>>>>>
>>>>> About the specific case: if we want to integrate Flink Table
>>>>> Maintenance to the streaming Flink sinks - and I consider that one of the
>>>>> main use cases - we can't rely on external schedulers to prevent 
>>>>> concurrent
>>>>> writes. We can make sure that the Tasks can tolerate concurrent runs, but
>>>>> as mentioned in the doc, in most cases having concurrent runs are a waste
>>>>> of resources, because of the commit conflicts.
>>>>>
>>>>> If we decide to pursue the Flink only solution, I would implement a
>>>>> JDBC based locking implementation for the LockFactory interface based on
>>>>> the feedback. This seems like the most natural and available external
>>>>> storage for locks. Later we could add more implementations based on the
>>>>> user needs.
>>>>>
>>>>> Thanks for the feedback and discussion,
>>>>> Peter
>>>>>
>>>>> On Mon, Aug 5, 2024, 05:18 Manu Zhang <owenzhang1...@gmail.com> wrote:
>>>>>
>>>>>> Not familiar with Flink, I'm wondering how Flink resolves concurrency
>>>>>> issues in common Flink use cases. For example, how does Flink prevent two
>>>>>> jobs from writing to the same file?
>>>>>>
>>>>>> On the other hand, an Iceberg tag is eventually an atomic change to a
>>>>>> file. It's the same as using a file lock. I don't think we can handle 
>>>>>> such
>>>>>> cases without relying on external dependencies. We may define interfaces 
>>>>>> in
>>>>>> Iceberg and let users choose whatever services they can use to implement
>>>>>> it. My $0.01.
>>>>>>
>>>>>> Manu
>>>>>>
>>>>>>
>>>>>> On Mon, Aug 5, 2024 at 8:38 AM Steven Wu <stevenz...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I also don't feel it is the best fit to use tags to implement locks
>>>>>>> for passing control messages. This is the main sticking point for me 
>>>>>>> from
>>>>>>> the design doc. However, we haven't been able to come up with a better
>>>>>>> solution yet. Maybe we need to go back to the drawing board again.
>>>>>>>
>>>>>>> I am also not sure using Kafka topics to send control messages is a
>>>>>>> good fit either. It would introduce a dependency on Kafka to run Flink
>>>>>>> maintenance jobs. It works for Kafka connect sink, because that is for
>>>>>>> Kafka env anyway.
>>>>>>>
>>>>>>>
>>>>>>> On Sun, Aug 4, 2024 at 1:12 PM Ryan Blue <b...@databricks.com.invalid>
>>>>>>> wrote:
>>>>>>>
>>>>>>>>  Hi Péter, thanks for bringing this up.
>>>>>>>>
>>>>>>>> I don't think using a tag to "lock" a table is a good idea. The doc
>>>>>>>> calls out that this is necessary "Since Flink doesn’t provide an out 
>>>>>>>> of the
>>>>>>>> box solution for downstream operators sending feedback to upstream
>>>>>>>> operators" so this feels like using Iceberg metadata as a side-channel
>>>>>>>> within a Flink application. That doesn't seem like a good idea to me.
>>>>>>>>
>>>>>>>> Why not use a separate Kafka topic to send control messages for
>>>>>>>> this purpose, like what is done in the Kafka Connect sink? I think 
>>>>>>>> that's a
>>>>>>>> cleaner way to solve the problem if there is not going to be a way to 
>>>>>>>> fix
>>>>>>>> it in Flink.
>>>>>>>>
>>>>>>>> Ryan
>>>>>>>>
>>>>>>>> On Wed, Jul 31, 2024 at 7:45 AM Péter Váry <
>>>>>>>> peter.vary.apa...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Team,
>>>>>>>>>
>>>>>>>>> During the discussion around the Flink Table Maintenance [1], [2],
>>>>>>>>> I have highlighted that one of the main decision points is the way we
>>>>>>>>> prevent concurrent Maintenance Tasks from happening concurrently.
>>>>>>>>>
>>>>>>>>> At that time we did not find better solution than providing an
>>>>>>>>> interface for locking, and provide a basic implementation which is 
>>>>>>>>> based on
>>>>>>>>> Iceberg tags [3]:
>>>>>>>>>
>>>>>>>>> *"For convenience an Iceberg tag based solution is provided, so no
>>>>>>>>> external dependencies are needed. On lock creation a tag named
>>>>>>>>> '__flink_maitenance' is created for the current snapshot, and on lock
>>>>>>>>> removal this tag is removed. This solution is not ideal, as it creates
>>>>>>>>> multiple new versions of the table metadata, and mixes infrastructural
>>>>>>>>> metadata with user metadata. If this is not acceptable then other
>>>>>>>>> implementations could be created which could use external components 
>>>>>>>>> like
>>>>>>>>> JDBC/Kafka/ZooKeeper."*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> We are in the implementation phase, and we agreed with Steven to
>>>>>>>>> take one final round with the community, to see if anyone has a better
>>>>>>>>> suggestion, or we could proceed with the originally agreed one.
>>>>>>>>>
>>>>>>>>> So if you have a better idea, please share.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Peter
>>>>>>>>>
>>>>>>>>> [1] -
>>>>>>>>> https://lists.apache.org/thread/qjf83v2xj6lxb9sr8z0v9p0979f8wsmf
>>>>>>>>> [2] -
>>>>>>>>> https://lists.apache.org/thread/vjf8m5wg840o58yz4y3q35k2mfhbm49l
>>>>>>>>> [3] -
>>>>>>>>> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit#heading=h.lt9eaimi6zyz
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Ryan Blue
>>>>>>>> Databricks
>>>>>>>>
>>>>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Databricks
>>>>
>>>
>>

Reply via email to