> It's a long long read @Jarek Potiuk <ja...@potiuk.com> ...

You did invite people to comment :). Those were all the areas that came to
my mind :)

> In summary, moving to pathlib.Path enhances flexibility and makes future
integrations, like AIP-63, more manageable while not forcing any immediate
changes to current configurations or codebases.

Yep. I like the Versioned FS abstraction on top of the FSSpec - even if it
is not directly supported by FSSpec and we have to build it ourselves. I
will leave it to the AIP-63 authors to comment on that - if they see it
benefits them then yes, I am a little less torn on it with that explanation
and more in favour of it. Also the "pull" model where the task only reads
the files it needs might be interesting (but would be great to verify if
that's really the case - this is mostly useful as a git-sync replacement as
other solutions based on shared volumes already basically do it this way
under the hood. The Versioned FS layer will have to be rather carefully
designed though if AIP-63 bases on that.

Good thing is that FsSPEC is generally very well adopted and used by pretty
much every other player in the Data space, so chance is, a lot of the
issues with syncing are solved there already and will continue to be solved.

J.




On Mon, May 27, 2024 at 12:17 PM Bolke de Bruin <bdbr...@gmail.com> wrote:

> It's a long long read @Jarek Potiuk <ja...@potiuk.com> ...
>
> In general, you are right. The intention is to eliminate the need for an
> external sync, which is beneficial for Kubernetes (k8s) workers. They would
> only need to load one DAG and its resources, rather than syncing the entire
> DAG folder. This can be done selectively if the task doesn't require all
> the resources in the DAG.
>
> Moreover, moving to pathlib.Path provides flexibility without any
> downsides - there is no forced upgrade required. It does not necessitate
> getting rid of external sync mechanisms. You can still use external sync,
> and it will function as it does now. We can transition to using
> pathlib.Path without the "ephemeral storage" component. Adopting
> pathlib.Path would also improve the fundamentals to enable DAG loading on
> M.S. Windows and offer several optimizations, such as reducing os.stat
> usage and enabling early loop exits for filters.
>
> Using pathlib.Path with local storage does not require changes to existing
> DAG code. If you move to the ephemeral model, it would be advised to use
> the pathlib.Path abstraction in your DAGs, which is the preferred method
> for file access in Python. However, there is no forced upgrade path; you
> can choose if and when to adopt ephemeral storage. Having DAG processing
> work with pathlib.Path will make implementing AIP-63 much easier.
>
> Integration of AIP-63 - Layered Architecture
>
> ---------------------------
> | DAG Processing   |
> ---------------------------
> | pathlib.Path           |
> ---------------------------
> | UPath / ObjStPath |
> ---------------------------
> | VersionedFS*        |
> ---------------------------
> | S3/Git/GCS/Local |
> ---------------------------
> | FSSpec                 |
> ---------------------------
>
> This architecture suggests extending or using FSSpec to define a
> VersionedFS, similar to ZipFS, by leveraging the versioning capabilities of
> underlying storage systems like Git, S3, etc. Using pathlib.Path as an
> abstraction is essential to make this possible. Relying on "os" would
> require building all versioning logic into the DAG processing or
> externalizing it to a syncer. This architecture is simpler and easier to
> maintain due to lighter coupling.
>
> Strategies for Loading Additional Resources with Ephemeral Storage:
>
>     1. Resource loading through a custom Resource Loader (preferred, best
> practice)
>     2. Zip file containing all resources for a DAG
>     3. Using pathlib.Path / ObjectStoragePath directly
>     4. Downloading resources by the custom module loader by convention
> (e.g., package_name/resources/)
>     5. Reading a manifest by the custom module loader and loading from
> there
>
> If using local storage with pathlib.Path, you can rely on the old
> strategies like direct loading.
>
> While my previous reply to Elad seemed to be lost due to Apache's
> mailer-daemon, but Github (remote) and GitFS (local) are already supported,
> offering extensive versioning capabilities. Other storage solutions like
> GCS, S3, and ADLS have limited versioning support - as you mentioned - and
> may require bundling methods such as sparse bundles or zip files. Having
> this abstracted away in VersionedFS gives us much flexibility.
>
> In summary, moving to pathlib.Path enhances flexibility and makes future
> integrations, like AIP-63, more manageable while not forcing any immediate
> changes to current configurations or codebases.
>
> Cheers
> Bolke
>
>
>
>
> ====
>
>
>
>
> On Sun, 26 May 2024 at 14:56, Jarek Potiuk <ja...@potiuk.com> wrote:
>
>> It is an interesting read indeed. And might be a good direction. But agree
>> with Ash that just Python loading is quite not enough (but might be
>> somewhat solvable in a possibly breaking way with ResourceLoader).
>>
>> > A sync component will still be required to sync from git to S3/GCS/Other
>> storage
>> and this AIP solves only the part that Airflow machines will be able to
>> fetch the files from storage. Is that correct?
>>
>> Looking at Elad's comment - first - let me rephrase how I understand the
>> proposal :).
>>
>> As I understand (and Bolke to confirm) - this change  would allow us to
>> get
>> rid of the sync component (or shared volumes) altogether. In the case you
>> describe Elad - there is no need to sync Git to S3/GCS/Other -
>> GitFileSystem could be used directly (very similarly to deployment where
>> git-sync is used currently).
>>
>> If I understand the idea - we would have to have a custom source loader
>>
>> https://docs.python.org/3/library/importlib.html#importlib.abc.SourceLoader
>> that would allow DAGFileProcessor to import the files using one of the
>> selected FSspec backends. Also to what Ash wrote - we could also implement
>> custom Resource Loader:
>>
>> https://docs.python.org/3/library/importlib.html#importlib.abc.ResourceLoader
>> which
>> might partially solve the problem of sql /yaml files. But it might require
>> some fixes in our code for templated fields/files, and custom code will
>> likely be broken if the files are read directly from filesystem rather
>> than
>> via Python resource API - I think most people will not use resource API to
>> read the files, they will fall back to direct file reading by constructing
>> their path so this will be rather heavily breaking for those cases
>> (unfortunately people use `__file__' to find and read files traditionally
>> and __file__ is a string not `Path`).
>>
>> This means that basically the synchronization component in the deployment
>> is not needed at all. Currently the responsibility to sync the files is
>> delegated out to the deployment (git-sync, s3fs, shared filesystem etc).
>> If we switch to FSSpec, Airflow ("DAG file processor" basically) will take
>> over the responsibility (and processing/io/etc. needed) to synchronize and
>> load the files from such remote storage (via fsspec library of choice.
>> Currently airflow is always reading files from a local "filesystem"
>> without
>> worrying about how it has been synced. Airflow assumes someone else synced
>> the files.
>>
>> This all is done in user space (so we cannot utilize any kernel space
>> optimizations that shared volumes provide). This also means that
>> credentials to the remote file system must be stored in Airflow
>> Connections
>> (where currently the whole syncing process, including authentication is
>> done externally). And we can fall-back to the current behaviour with Local
>> File System fsspec implementation (and keep shared volumes/git sync as
>> something people could still continue to use) - so this only "adds"
>> features to airflow syncing rather than replacing it (possibly Local
>> Filesystem fsspec will be backwards-compatible ).
>>
>> So for example what happens when GIT FS is used, when SourceLoader is
>> invoked, it will transparently pull the right version from git. It would
>> be
>> interesting to see the maturity and characteristics of such a solution and
>> cache effectiveness.
>>
>> Do I understand it correctly, Bolke?
>>
>> BTW. I would not be surprised if someone already implemented such loaders,
>> Seems this could be a generic need people might have.
>>
>> Comments:
>>
>> In the current solution we already use caching coming from the OS -
>> basically when files are stored in a local filesystem and cached in any of
>> the current syncing solutions. None of them will really go all the way to
>> download all the files if they have not changed - at most they will check
>> if they did not change). POSIX os will cache the files - they are
>> memory-mapped and basically reading multiple times from the same file is
>> really reading them from memory. So caching as an advantage is possibly
>> not
>> giving us much. Especially that fsspec does
>>
>> https://filesystem-spec.readthedocs.io/en/latest/features.html#caching-files-locally
>> -> since we are going to read whole python files not part of files,
>> essentially "filecache" needs to be used rather than block cache, so we
>> effectively have the same effect as what current "git-sync", or "s3fs" or
>> "gcs-fuse", or "shared filesystem" but this is just incorporated into
>> particular fsspec library that is embedded in our code, rather than being
>> done by the external tool. All of them have various levels of caching
>> files
>> locally.
>>
>> There might be other performance benefits of using FSSpec though - for
>> example it might speed up git-sync case - if I understand correctly GitFS
>> will only fetch individual files when asked for them so for example task
>> execution might be faster as it will only pull one or few files needed
>> when
>> running in not "warmed up" instance - for example in K8S executor. But it
>> needs to be verified if that really brings some improvements.
>>
>> The versioning option is indeed interesting. For AIP-63 we will have to be
>> able to pass some "snapshot version" identification to tasks. And it has
>> to
>> be done "internally" in airflow - so relying on (and being decoupled from)
>> external sync is not going to work. Basically Airflow will have to have a
>> way to "get" a snapshot of all relevant Python files to execute - and when
>> syncing is done externally, Airflow "task" will have to know how to
>> retrieve the right snapshot of the files.
>>
>> In this context passing <ref> to
>>
>> https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.implementations.git.GitFileSystem
>> will do the job transparently. But this only works for Git. For S3FS for
>> example https://s3fs.readthedocs.io/en/latest/#bucket-version-awareness -
>> versioning works on "file" level not on "snapshot" level (it's how s3
>> bucket versioning works). GSC FsSpec does not support versioning at all.
>> Other FS-es might also support snapshotting - but then if AIP-63 will rely
>> on FsSpec providing versioning - it will only work with those filesystems
>> that support "snapshots" of multiple files. If we would like to support S3
>> or GCS for example anyhow implement some additional snapshot "bundling"
>> mechanism - in which case FSSpec versioning capability is not useful. I
>> believe we **must** have "DAG folder snapshot" versioning support for
>> AIP-63 - there is no way we can base it on file versioning, but will leave
>> it to the AIP-63 authors to discuss it.
>>
>> General feeling:
>>
>> To be honest I am a bit torn on that one.
>>
>> Unlike FSSPec in DAG Authoring, this one only really changes the
>> deployment
>> option (trading off external syncing with internal fsspec library syncing)
>> while not providing us much. I think caching is not an advantage, there
>> might be other performance benefits - but I agree if used in AIP-63
>> context, it **might** help quite a bit for Git - maybe few other
>> filesystems that support snapshotting - if Jed and other authors of AIP-63
>> will also see it useful. And its versioning support is very limited and
>> only for some filesystems (but maybe this is what we should anyhow limit
>> AIp-63 to?).
>>
>> The drawback I see is that we are giving Airflow more responsibilities
>> than
>> it currently has - which yes - might simplify the deployment, but also
>> increases internal complexity (and makes us more vulnerable to bugs and
>> synchronization issues that normally should be handled by external
>> components and deployment managers for those components).
>>
>> It goes a bit backwards from "let's think about what we should remove from
>> Airflow and delegate rather than add to it", but might be worth it if we
>> get other benefits for free (versioning and performance IMHO are worth
>> looking at).
>>
>> J.
>>
>> On Sun, May 26, 2024 at 1:33 PM Ash Berlin-Taylor <a...@apache.org> wrote:
>>
>> > > Non DAG, Non module assets as part of the DAG folder are out of
>> scope. So
>> > say for example for some reason you include a GIF. This will not
>> > automatically be available without changes to your code.
>> >
>> > What about SQL files a task uses, either as a template or via something
>> > else such as dbt? How about YAML based dag generators?
>> >
>> > (This might be mentioned in the wiki page, but it's not loading for me
>> > right now)
>> >
>> > -ash
>> >
>> > On 26 May 2024 08:55:11 BST, Bolke de Bruin <bdbr...@gmail.com> wrote:
>> > >Hi All,
>> > >
>> > >I would like to discuss a new AIP aimed at enhancing the DAG loading
>> > >mechanism to support reading DAGs from ephemeral storage solutions.
>> This
>> > >proposal is intended to supersede AIP-5 Remote DAG Fetcher and provide
>> a
>> > >more flexible and scalable approach and to prepare for AIP-63.
>> > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-71+Generalizing+DAG+Loader+and+Processor+for+Ephemeral+Storage
>> > >
>> > >*Abstract*
>> > >This proposal aims to generalize the DAG loader and processor to use
>> > >pathlib.Path for file operations instead of assuming direct OS
>> filesystem
>> > >access. It includes implementing a custom module loader that supports
>> > >loading from ObjectStoragePath locations and other Path-like
>> abstractions,
>> > >with caching capabilities provided by fsspec. Furthermore, while this
>> AIP
>> > >does not directly implement DAG versioning, it creates a foundational
>> > layer
>> > >that can be extended to support DAG versioning as outlined in AIP-63.
>> > >
>> > >A work in progress PR can be found here:
>> > >https://github.com/apache/airflow/pull/39647
>> > >
>> > >*Key points for discussion*
>> > >
>> > >Previous proposals, like AIP-5, suggested using a Fetcher mechanism.
>> Kind
>> > >of like an in-process git-sync. This proposal is about making that
>> > >redundant by fully supporting object storage locations by leveraging
>> > >ObjectStoragePath and fsspec caching mechanisms.
>> > >
>> > >Earlier feedback on AIP-5 was that we thought that having an additional
>> > >Fetcher process was out of scope of the project. With the transient
>> > >integration of pathlib.Path and ObjectStoragePath I think this argument
>> > >does not hold anymore and the demand is there. In addition the added
>> > >flexibility allows AIP-63 to be implemented easier (what that looks
>> like
>> > >remains to be seen).
>> > >
>> > >Airflow scans DAGs often. This very likely requires a caching mechanism
>> > for
>> > >both the DAGs and their modules. Fsspec does implement caching, and it
>> is
>> > >planned to leverage this.
>> > >
>> > >Non DAG, Non module assets as part of the DAG folder are out of scope.
>> So
>> > >say for example for some reason you include a GIF. This will not
>> > >automatically be available without changes to your code.
>> > >
>> > >I kindly request your thoughts :-).
>> > >
>> > >Bolke
>> > >
>> > >--
>> > >
>> > >--
>> > >Bolke de Bruin
>> > >bdbr...@gmail.com
>> >
>>
>
>
> --
>
> --
> Bolke de Bruin
> bdbr...@gmail.com
>

Reply via email to