Hi Peter,

We rely on Airflow to schedule and coordinate maintenance Spark jobs. I
agree with Ryan that an Iceberg solution is not a good choice here.

Thanks,
Manu

On Tue, Aug 6, 2024 at 1:07 AM Ryan Blue <b...@databricks.com.invalid>
wrote:

> > We can make sure that the Tasks can tolerate concurrent runs, but as
> mentioned in the doc, in most cases having concurrent runs are a waste of
> resources, because of the commit conflicts.
>
> Is the problem that users may configure multiple jobs that are all trying
> to run maintenance procedures? If so, isn't this a user-level problem? If
> you don't want maintenance job conflicts, only run one maintenance job.
>
> > If Spark/Kafka/Flink users all need some kind of locking it might worth
> considering coming up with an Iceberg solution for it.
>
> I don't think that an Iceberg solution is a good choice here. Coordination
> in a system that does work on Iceberg tables does not need to rely on
> locking in the Iceberg tables. It should have a way to coordinate
> externally.
>
> On Sun, Aug 4, 2024 at 10:29 PM Péter Váry <peter.vary.apa...@gmail.com>
> wrote:
>
>> Thanks everyone for your answers! I really appreciate it, especially
>> since these come into during the weekend, using your own time.
>>
>> @Manu, during our initial discussion, you have mentioned that you had
>> similar issues with Spark compactions. You needed locking there. Is it
>> still an issue?
>>
>> If Spark/Kafka/Flink users all need some kind of locking it might worth
>> considering coming up with an Iceberg solution for it.
>> I see the following possibilities:
>> 1. Create a locking feature using the current Catalog API: We could
>> create a 'lock' instead of 'tag', and we could use the Catalogs atomic
>> change requirement to make locking atomic. My main concern with this
>> approach is that it relies on the linear history of the table and produces
>> more contention in write side. I was OK with this in the very specific
>> use-case with Flink Table Maintenance (few changes, controlled times), but
>> as a general solution, I would look somewhere else.
>> 2. Add locking to the Catalog API as a requirement: We could widen the
>> Catalog API with a lock/unlock optional feature. I'm not sure that we see
>> enough use cases to merit such a big change. OTOH this could be a good
>> additional feature to the REST Catalog as well, and maybe something we
>> would like to add to our Catalog sooner or later.
>>
>> About the specific case: if we want to integrate Flink Table Maintenance
>> to the streaming Flink sinks - and I consider that one of the main use
>> cases - we can't rely on external schedulers to prevent concurrent writes.
>> We can make sure that the Tasks can tolerate concurrent runs, but as
>> mentioned in the doc, in most cases having concurrent runs are a waste of
>> resources, because of the commit conflicts.
>>
>> If we decide to pursue the Flink only solution, I would implement a JDBC
>> based locking implementation for the LockFactory interface based on the
>> feedback. This seems like the most natural and available external storage
>> for locks. Later we could add more implementations based on the user needs.
>>
>> Thanks for the feedback and discussion,
>> Peter
>>
>> On Mon, Aug 5, 2024, 05:18 Manu Zhang <owenzhang1...@gmail.com> wrote:
>>
>>> Not familiar with Flink, I'm wondering how Flink resolves concurrency
>>> issues in common Flink use cases. For example, how does Flink prevent two
>>> jobs from writing to the same file?
>>>
>>> On the other hand, an Iceberg tag is eventually an atomic change to a
>>> file. It's the same as using a file lock. I don't think we can handle such
>>> cases without relying on external dependencies. We may define interfaces in
>>> Iceberg and let users choose whatever services they can use to implement
>>> it. My $0.01.
>>>
>>> Manu
>>>
>>>
>>> On Mon, Aug 5, 2024 at 8:38 AM Steven Wu <stevenz...@gmail.com> wrote:
>>>
>>>> I also don't feel it is the best fit to use tags to implement locks for
>>>> passing control messages. This is the main sticking point for me from the
>>>> design doc. However, we haven't been able to come up with a better solution
>>>> yet. Maybe we need to go back to the drawing board again.
>>>>
>>>> I am also not sure using Kafka topics to send control messages is a
>>>> good fit either. It would introduce a dependency on Kafka to run Flink
>>>> maintenance jobs. It works for Kafka connect sink, because that is for
>>>> Kafka env anyway.
>>>>
>>>>
>>>> On Sun, Aug 4, 2024 at 1:12 PM Ryan Blue <b...@databricks.com.invalid>
>>>> wrote:
>>>>
>>>>>  Hi Péter, thanks for bringing this up.
>>>>>
>>>>> I don't think using a tag to "lock" a table is a good idea. The doc
>>>>> calls out that this is necessary "Since Flink doesn’t provide an out of 
>>>>> the
>>>>> box solution for downstream operators sending feedback to upstream
>>>>> operators" so this feels like using Iceberg metadata as a side-channel
>>>>> within a Flink application. That doesn't seem like a good idea to me.
>>>>>
>>>>> Why not use a separate Kafka topic to send control messages for this
>>>>> purpose, like what is done in the Kafka Connect sink? I think that's a
>>>>> cleaner way to solve the problem if there is not going to be a way to fix
>>>>> it in Flink.
>>>>>
>>>>> Ryan
>>>>>
>>>>> On Wed, Jul 31, 2024 at 7:45 AM Péter Váry <
>>>>> peter.vary.apa...@gmail.com> wrote:
>>>>>
>>>>>> Hi Team,
>>>>>>
>>>>>> During the discussion around the Flink Table Maintenance [1], [2], I
>>>>>> have highlighted that one of the main decision points is the way we 
>>>>>> prevent
>>>>>> concurrent Maintenance Tasks from happening concurrently.
>>>>>>
>>>>>> At that time we did not find better solution than providing an
>>>>>> interface for locking, and provide a basic implementation which is based 
>>>>>> on
>>>>>> Iceberg tags [3]:
>>>>>>
>>>>>> *"For convenience an Iceberg tag based solution is provided, so no
>>>>>> external dependencies are needed. On lock creation a tag named
>>>>>> '__flink_maitenance' is created for the current snapshot, and on lock
>>>>>> removal this tag is removed. This solution is not ideal, as it creates
>>>>>> multiple new versions of the table metadata, and mixes infrastructural
>>>>>> metadata with user metadata. If this is not acceptable then other
>>>>>> implementations could be created which could use external components like
>>>>>> JDBC/Kafka/ZooKeeper."*
>>>>>>
>>>>>>
>>>>>> We are in the implementation phase, and we agreed with Steven to take
>>>>>> one final round with the community, to see if anyone has a better
>>>>>> suggestion, or we could proceed with the originally agreed one.
>>>>>>
>>>>>> So if you have a better idea, please share.
>>>>>>
>>>>>> Thanks,
>>>>>> Peter
>>>>>>
>>>>>> [1] -
>>>>>> https://lists.apache.org/thread/qjf83v2xj6lxb9sr8z0v9p0979f8wsmf
>>>>>> [2] -
>>>>>> https://lists.apache.org/thread/vjf8m5wg840o58yz4y3q35k2mfhbm49l
>>>>>> [3] -
>>>>>> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit#heading=h.lt9eaimi6zyz
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Ryan Blue
>>>>> Databricks
>>>>>
>>>>
>
> --
> Ryan Blue
> Databricks
>

Reply via email to