It is an interesting read indeed. And might be a good direction. But agree
with Ash that just Python loading is quite not enough (but might be
somewhat solvable in a possibly breaking way with ResourceLoader).

> A sync component will still be required to sync from git to S3/GCS/Other
storage
and this AIP solves only the part that Airflow machines will be able to
fetch the files from storage. Is that correct?

Looking at Elad's comment - first - let me rephrase how I understand the
proposal :).

As I understand (and Bolke to confirm) - this change  would allow us to get
rid of the sync component (or shared volumes) altogether. In the case you
describe Elad - there is no need to sync Git to S3/GCS/Other -
GitFileSystem could be used directly (very similarly to deployment where
git-sync is used currently).

If I understand the idea - we would have to have a custom source loader
https://docs.python.org/3/library/importlib.html#importlib.abc.SourceLoader
that would allow DAGFileProcessor to import the files using one of the
selected FSspec backends. Also to what Ash wrote - we could also implement
custom Resource Loader:
https://docs.python.org/3/library/importlib.html#importlib.abc.ResourceLoader
which
might partially solve the problem of sql /yaml files. But it might require
some fixes in our code for templated fields/files, and custom code will
likely be broken if the files are read directly from filesystem rather than
via Python resource API - I think most people will not use resource API to
read the files, they will fall back to direct file reading by constructing
their path so this will be rather heavily breaking for those cases
(unfortunately people use `__file__' to find and read files traditionally
and __file__ is a string not `Path`).

This means that basically the synchronization component in the deployment
is not needed at all. Currently the responsibility to sync the files is
delegated out to the deployment (git-sync, s3fs, shared filesystem etc).
If we switch to FSSpec, Airflow ("DAG file processor" basically) will take
over the responsibility (and processing/io/etc. needed) to synchronize and
load the files from such remote storage (via fsspec library of choice.
Currently airflow is always reading files from a local "filesystem" without
worrying about how it has been synced. Airflow assumes someone else synced
the files.

This all is done in user space (so we cannot utilize any kernel space
optimizations that shared volumes provide). This also means that
credentials to the remote file system must be stored in Airflow Connections
(where currently the whole syncing process, including authentication is
done externally). And we can fall-back to the current behaviour with Local
File System fsspec implementation (and keep shared volumes/git sync as
something people could still continue to use) - so this only "adds"
features to airflow syncing rather than replacing it (possibly Local
Filesystem fsspec will be backwards-compatible ).

So for example what happens when GIT FS is used, when SourceLoader is
invoked, it will transparently pull the right version from git. It would be
interesting to see the maturity and characteristics of such a solution and
cache effectiveness.

Do I understand it correctly, Bolke?

BTW. I would not be surprised if someone already implemented such loaders,
Seems this could be a generic need people might have.

Comments:

In the current solution we already use caching coming from the OS -
basically when files are stored in a local filesystem and cached in any of
the current syncing solutions. None of them will really go all the way to
download all the files if they have not changed - at most they will check
if they did not change). POSIX os will cache the files - they are
memory-mapped and basically reading multiple times from the same file is
really reading them from memory. So caching as an advantage is possibly not
giving us much. Especially that fsspec does
https://filesystem-spec.readthedocs.io/en/latest/features.html#caching-files-locally
-> since we are going to read whole python files not part of files,
essentially "filecache" needs to be used rather than block cache, so we
effectively have the same effect as what current "git-sync", or "s3fs" or
"gcs-fuse", or "shared filesystem" but this is just incorporated into
particular fsspec library that is embedded in our code, rather than being
done by the external tool. All of them have various levels of caching files
locally.

There might be other performance benefits of using FSSpec though - for
example it might speed up git-sync case - if I understand correctly GitFS
will only fetch individual files when asked for them so for example task
execution might be faster as it will only pull one or few files needed when
running in not "warmed up" instance - for example in K8S executor. But it
needs to be verified if that really brings some improvements.

The versioning option is indeed interesting. For AIP-63 we will have to be
able to pass some "snapshot version" identification to tasks. And it has to
be done "internally" in airflow - so relying on (and being decoupled from)
external sync is not going to work. Basically Airflow will have to have a
way to "get" a snapshot of all relevant Python files to execute - and when
syncing is done externally, Airflow "task" will have to know how to
retrieve the right snapshot of the files.

In this context passing <ref> to
https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.implementations.git.GitFileSystem
will do the job transparently. But this only works for Git. For S3FS for
example https://s3fs.readthedocs.io/en/latest/#bucket-version-awareness -
versioning works on "file" level not on "snapshot" level (it's how s3
bucket versioning works). GSC FsSpec does not support versioning at all.
Other FS-es might also support snapshotting - but then if AIP-63 will rely
on FsSpec providing versioning - it will only work with those filesystems
that support "snapshots" of multiple files. If we would like to support S3
or GCS for example anyhow implement some additional snapshot "bundling"
mechanism - in which case FSSpec versioning capability is not useful. I
believe we **must** have "DAG folder snapshot" versioning support for
AIP-63 - there is no way we can base it on file versioning, but will leave
it to the AIP-63 authors to discuss it.

General feeling:

To be honest I am a bit torn on that one.

Unlike FSSPec in DAG Authoring, this one only really changes the deployment
option (trading off external syncing with internal fsspec library syncing)
while not providing us much. I think caching is not an advantage, there
might be other performance benefits - but I agree if used in AIP-63
context, it **might** help quite a bit for Git - maybe few other
filesystems that support snapshotting - if Jed and other authors of AIP-63
will also see it useful. And its versioning support is very limited and
only for some filesystems (but maybe this is what we should anyhow limit
AIp-63 to?).

The drawback I see is that we are giving Airflow more responsibilities than
it currently has - which yes - might simplify the deployment, but also
increases internal complexity (and makes us more vulnerable to bugs and
synchronization issues that normally should be handled by external
components and deployment managers for those components).

It goes a bit backwards from "let's think about what we should remove from
Airflow and delegate rather than add to it", but might be worth it if we
get other benefits for free (versioning and performance IMHO are worth
looking at).

J.

On Sun, May 26, 2024 at 1:33 PM Ash Berlin-Taylor <a...@apache.org> wrote:

> > Non DAG, Non module assets as part of the DAG folder are out of scope. So
> say for example for some reason you include a GIF. This will not
> automatically be available without changes to your code.
>
> What about SQL files a task uses, either as a template or via something
> else such as dbt? How about YAML based dag generators?
>
> (This might be mentioned in the wiki page, but it's not loading for me
> right now)
>
> -ash
>
> On 26 May 2024 08:55:11 BST, Bolke de Bruin <bdbr...@gmail.com> wrote:
> >Hi All,
> >
> >I would like to discuss a new AIP aimed at enhancing the DAG loading
> >mechanism to support reading DAGs from ephemeral storage solutions. This
> >proposal is intended to supersede AIP-5 Remote DAG Fetcher and provide a
> >more flexible and scalable approach and to prepare for AIP-63.
> >
> >
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-71+Generalizing+DAG+Loader+and+Processor+for+Ephemeral+Storage
> >
> >*Abstract*
> >This proposal aims to generalize the DAG loader and processor to use
> >pathlib.Path for file operations instead of assuming direct OS filesystem
> >access. It includes implementing a custom module loader that supports
> >loading from ObjectStoragePath locations and other Path-like abstractions,
> >with caching capabilities provided by fsspec. Furthermore, while this AIP
> >does not directly implement DAG versioning, it creates a foundational
> layer
> >that can be extended to support DAG versioning as outlined in AIP-63.
> >
> >A work in progress PR can be found here:
> >https://github.com/apache/airflow/pull/39647
> >
> >*Key points for discussion*
> >
> >Previous proposals, like AIP-5, suggested using a Fetcher mechanism. Kind
> >of like an in-process git-sync. This proposal is about making that
> >redundant by fully supporting object storage locations by leveraging
> >ObjectStoragePath and fsspec caching mechanisms.
> >
> >Earlier feedback on AIP-5 was that we thought that having an additional
> >Fetcher process was out of scope of the project. With the transient
> >integration of pathlib.Path and ObjectStoragePath I think this argument
> >does not hold anymore and the demand is there. In addition the added
> >flexibility allows AIP-63 to be implemented easier (what that looks like
> >remains to be seen).
> >
> >Airflow scans DAGs often. This very likely requires a caching mechanism
> for
> >both the DAGs and their modules. Fsspec does implement caching, and it is
> >planned to leverage this.
> >
> >Non DAG, Non module assets as part of the DAG folder are out of scope. So
> >say for example for some reason you include a GIF. This will not
> >automatically be available without changes to your code.
> >
> >I kindly request your thoughts :-).
> >
> >Bolke
> >
> >--
> >
> >--
> >Bolke de Bruin
> >bdbr...@gmail.com
>

Reply via email to