Quick comment (as I'm still mostly on paternity leave):

Storing wheels in the db sounds like a bad Idea to me, especially if we need to 
store deps in there too (and if we don't store deps, then they are incomplete) 
- they could get very large, and I've stored blobs of ~10mb in postgres before: 
I don't recommend it. It "works" but operating it is tricky.



> the API could simply accept "Wheel file + the Dag id"

This sounds like a huge security risk.


My main concern with this idea is that it seems a lot of complexity we are 
putting on users. Doubly so if they are already using docker where there 
already exists an Ideal packaging and distribution that could contain dag + 
needed code.

(Sorry for the brevity)

-ash


On 2 August 2020 08:47:39 BST, Jarek Potiuk <jarek.pot...@polidea.com> wrote:
>Few points from my sid (and proposal!):
>
>1) Agree with Max -  with a rather strong NO for pickles (however,
>indeed cloudpickle solves some of the problems). Pickles came up in
>our discussion in Polidea recently and the overall message was "no". I
>agree with Max here - if we can ship python code, turning that into
>pickle for transit makes little sense to me and brings a plethora of
>problems.
>
>2) I think indeed the versioning solution should treat the "DagRun"
>structure atomically. While I see why we would like to go with the
>UI/Scheduler only first rather than implementing them in the workers,
>adding the "mixed version" is where it breaks down IMHO. Reasoning
>about such "mixed version" dag is next to impossible. The current
>behavior is not well defined and non-deterministic (depends on
>scheduler delays, syncing, type of deployment, restarts of the works
>etc.) we are moving it up to UI (thus users) rather than solving the
>problem. So I am not a big fan of this and would rather solve it
>"well" with atomicity.
>
>3) I see the point of Dan as well - we had many discussions and many
>times the idea about "submitting" the DAG for execution via the API
>came up - and it makes sense IMHO.
>
>Proposal: Implement full versioning with code shipping via DB wheels
>BLOB (akin to serialized DAGs).
>
>I understand that the big issue is how to actually "ship" the code to
>the worker. And - maybe a wild idea - we can kill several birds with
>the same stone.
>
>There were plenty of discussions on how we could do that but one was
>never truly explored - using wheel packages.
>
>For those who do not know them, there is the PEP:
>https://www.python.org/dev/peps/pep-0427/
>
>Wheels allow to "package" python code in a standard way. They are
>portable ("purelib" + contain .py rather than .pyc code), they have
>metadata, versioning information, they can be signed for security,
>They can contain other packages or python code, Why don't we let
>scheduler to pack the fingerprinted version of the DAG in a .whl and
>store it as a blob in a DB next to the serialized form?
>
>There were concerns about the size of the code to keep in the DB - but
>we already use the DB for serialized DAGs and it works fine (I believe
>we only need to add compressing of the JSon serialized form - as we've
>learned from AirBnb during their talk at the Airflow Summit - wheels
>are already compressed). Also - each task will only need the
>particular "version" of one DAG so even if we keep many of them in the
>DB, the old version will pretty soon go "cold" and will never be
>retrieved (and most DBs will handle it well with caching/indexes).
>
>And if we want to add "callables" from other files - there is nothing
>to stop the person who defines dag to add list of files that should be
>packaged together with the main DAG file (additional_python_files =
>["common/my_fantastic_library.py"] in DAG constructor). Or we could
>auto-add all files after the DAG gets imported (i.e. package
>automatically all files that are imported for that particular DAG from
>the "dags" folder"). That should be rather easy.
>
>This way we could ship the code to workers for the exact version that
>the DagRun uses. And they can be cached and unpacked/installed to a
>virtualenv for the execution of that single task. That should be super
>quick. Such virtualenv can be wiped out after execution.
>
>Then we got what Max wants (atomicity of DagRuns) and what Dan wants
>(the API could simply accept "Wheel file + the Dag id". We have the
>isolation between tasks running on the same worker (based on
>virtualenv) so that each process in the same worker can run a
>different version of the same Dag. We have much less confusion for the
>UI.
>
>Extra bonus 1: we can expand it to package different dependencies in
>the wheels as well - so that if an operator requires a different
>(newer) version of a python library, it could be packaged together
>with the DAG in the same .whl file. This is also a highly requested
>feature.
>Extra bonus 2: workers will stop depending on the DAG file mount (!)
>which was our long term goal and indeed as Dan mentioned - a great
>step towards multi-tenancy.
>
>J.
>
>
>
>
>
>
>On Fri, Jul 31, 2020 at 6:41 AM Maxime Beauchemin
><maximebeauche...@gmail.com> wrote:
>>
>> Having tried it early on, I'd advocate pretty strongly against
>pickles and
>> would rather not get too deep into the why here. Short story is they
>can
>> pull the entire memory space or much more than you want, and it's
>> impossible to reason about where they end. For that reason and other
>> reasons, they're a security issue. Oh and some objects are not
>picklable
>> (Jinja templates! to name a problematic one...). I've also seen
>> secret-related classes that raise when pickled (thank god!).
>>
>> About callback and other things like that, it's quite a puzzle in
>python.
>> One solution would be to point to a python namespace
>> callback="preset.airflow_utils.slack_callback" and assume the
>function has
>> to exist in the remote interpreter. Personally I like the DagFetcher
>idea
>> (it could be great to get a pointer to that mailing list thread
>here),
>> specifically the GitDagFetcher. I don't know how [un]reasonable it
>is, but
>> I hate pickles so much that shipping source code around seems much
>more
>> reasonable to me. I think out there there's a talk from Mike Star
>about
>> Dataswarm at FB and he may mention how their workers may git shallow
>clone
>> the pipeline repo. Or maybe they use that "beautifully ugly" hack to
>use
>> a gitfs fuse [file system in user space] on the worker [could get
>deeper
>> into that, not sure how reasonable that is either].
>>
>> About fingerprints, a simple `start_date = datetime.now() -
>timedelta(1)`
>> may lead to a never-repeating fingerprint. From memory the spec
>doesn't
>> list out the properties considered to build the hash. It be helpful
>to
>> specify and review that list.
>>
>> Max
>>
>> On Wed, Jul 29, 2020 at 5:20 AM Kaxil Naik <kaxiln...@gmail.com>
>wrote:
>>
>> > Thanks, both Max and Dan for your comments, please check my reply
>below:
>> >
>> >
>> > >  Personally I vote for a DAG version to be pinned and consistent
>for the
>> > > duration of the DAG run. Some of the reasons why:
>> > > - it's easier to reason about, and therefore visualize and
>troubleshoot
>> > > - it prevents some cases where dependencies are never met
>> > > - it prevents the explosion of artifact/metadata (one
>serialization per
>> > > dagrun as opposed to one per scheduler cycle) in the case of a
>dynamic
>> > DAG
>> > > whose fingerprint is never the same.
>> >
>> >
>> > In this AIP, we were only looking to fix the current "Viewing
>behaviour"
>> > and
>> > we were intentionally not changing the execution behaviour.
>> > The change you are suggesting means we need to introduce DAG
>Versioning for
>> > the
>> > workers too. This will need more work as can't use the Serialised
>> > Representation
>> > to run the task since users could use custom modules in a different
>part of
>> > code,
>> > example the PythonOperator has python_callable that allows running
>any
>> > arbitrary code.
>> > A similar case is with the *on_*_callbacks* defined on DAG.
>> >
>> > Based on the current scope of the AIP, we still plan to use the
>actual DAG
>> > files for the
>> > execution and not use Serialized DAGs for the workers.
>> >
>> > To account for all the custom modules we will have to start looking
>at
>> > pickle (cloudpickle).
>> >
>> > I'm certain that there are lots of
>> > > those DAGs out there, and that it will overwhelm the metadata
>database,
>> > and
>> > > confuse the users. For an hourly DAG is would mean 24 artifact
>per day
>> > > instead of 1000+
>> >
>> >
>> > What kind of dynamic DAGs are we talking about here, I would think
>the DAG
>> > signature won't change
>> > but I might be wrong, can you give an example, please.
>> >
>> > If backwards compatibility in behavior is a concern, I'd recommend
>adding a
>> > > flag to the DAG class and/or config and make sure we're doing the
>right
>> > > thing by default. People who want backward compatibility would
>have to
>> > > change that default. But again, that's a lot of extra and
>confusing
>> > > complexity that will likely be the source of bugs and user
>confusion.
>> > > Having a clear, easy to reason about execution model is super
>important.
>> >
>> > Think about visualizing a DAG that shapeshifted 5 times during its
>> > > execution, how does anyone make sense of that?
>> >
>> >
>> > Wouldn't that be an edge case? How often would someone change the
>DAG
>> > structure in the middle of
>> > a DAG execution. And since if they do change, the Graph View should
>show
>> > all the tasks that were
>> > run, if it just shows based on the latest version, the behaviour
>would be
>> > the same as now.
>> >
>> > --------
>> >
>> > Strongly agree with Max's points, also I feel the right way to go
>about
>> > > this is instead of Airflow schedulers/webservers/workers reading
>DAG
>> > Python
>> > > files, they would instead read from serialized representations of
>the
>> > DAGs
>> > > (e.g. json representation in the Airflow DB). Instead of DAG
>owners
>> > pushing
>> > > their DAG files to the Airflow components via varying mechanisms
>(e.g.
>> > > git), they would instead call an Airflow CLI to push the
>serialized DAG
>> > > representations to the DB, and for things like dynamic DAGs you
>could
>> > > populate them from a DAG or another service.
>> >
>> >
>> > Airflow Webserver and the Scheduler will definitely read from the
>> > Serialized representation as
>> > they don't need all the code from the DAG files.
>> >
>> > While the workers definitely need access to DAG files as the
>> > tasks/operators would be using
>> > code form custom modules and classes which are required to run the
>tasks.
>> >
>> > If we do want to go down that route we will have to use something
>like
>> > cloudpickle that serializes
>> > entire DAG file and their dependencies. And also ensure that
>someone is not
>> > able to change the pickled
>> > source when sending from executor to the worker as that poses a big
>> > security risk.
>> >
>> > - Kaxil
>> >
>> > On Wed, Jul 29, 2020 at 12:43 PM Jacob Ward <jw...@brandwatch.com>
>wrote:
>> >
>> > > I came here to say what Max has said, only less eloquently.
>> > >
>> > > I do have one concern with locking the version for a single run.
>> > Currently
>> > > it is possible for a user to create a dag which intentionally
>changes as
>> > a
>> > > dag executes, i.e. dynamically creating a task for the dag during
>a run
>> > by
>> > > modifying external data, but this change would prevent that. I'm
>of the
>> > > opinion that this situation is bad practice anyway so it doesn't
>matter
>> > if
>> > > we make it impossible to do, but others may disagree.
>> > >
>> > > On Tue, 28 Jul 2020 at 17:08, Dan Davydov
><ddavy...@twitter.com.invalid>
>> > > wrote:
>> > >
>> > > > Strongly agree with Max's points, also I feel the right way to
>go about
>> > > > this is instead of Airflow schedulers/webservers/workers
>reading DAG
>> > > Python
>> > > > files, they would instead read from serialized representations
>of the
>> > > DAGs
>> > > > (e.g. json representation in the Airflow DB). Instead of DAG
>owners
>> > > pushing
>> > > > their DAG files to the Airflow components via varying
>mechanisms (e.g.
>> > > > git), they would instead call an Airflow CLI to push the
>serialized DAG
>> > > > representations to the DB, and for things like dynamic DAGs you
>could
>> > > > populate them from a DAG or another service.
>> > > >
>> > > > This would also enable other features like stronger
>> > > security/multi-tenancy.
>> > > >
>> > > > On Tue, Jul 28, 2020 at 6:44 PM Maxime Beauchemin <
>> > > > maximebeauche...@gmail.com> wrote:
>> > > >
>> > > > > > "mixed version"
>> > > > >
>> > > > > Personally I vote for a DAG version to be pinned and
>consistent for
>> > the
>> > > > > duration of the DAG run. Some of the reasons why:
>> > > > > - it's easier to reason about, and therefore visualize and
>> > troubleshoot
>> > > > > - it prevents some cases where dependencies are never met
>> > > > > - it prevents the explosion of artifact/metadata (one
>serialization
>> > per
>> > > > > dagrun as opposed to one per scheduler cycle) in the case of
>a
>> > dynamic
>> > > > DAG
>> > > > > whose fingerprint is never the same. I'm certain that there
>are lots
>> > of
>> > > > > those DAGs out there, and that it will overwhelm the metadata
>> > database,
>> > > > and
>> > > > > confuse the users. For an hourly DAG is would mean 24
>artifact per
>> > day
>> > > > > instead of 1000+
>> > > > >
>> > > > > If backwards compatibility in behavior is a concern, I'd
>recommend
>> > > > adding a
>> > > > > flag to the DAG class and/or config and make sure we're doing
>the
>> > right
>> > > > > thing by default. People who want backward compatibility
>would have
>> > to
>> > > > > change that default. But again, that's a lot of extra and
>confusing
>> > > > > complexity that will likely be the source of bugs and user
>confusion.
>> > > > > Having a clear, easy to reason about execution model is super
>> > > important.
>> > > > >
>> > > > > Think about visualizing a DAG that shapeshifted 5 times
>during its
>> > > > > execution, how does anyone make sense of that?
>> > > > >
>> > > > > Max
>> > > > >
>> > > > > On Tue, Jul 28, 2020 at 3:14 AM Kaxil Naik
><kaxiln...@gmail.com>
>> > > wrote:
>> > > > >
>> > > > > > Thanks Max for your comments.
>> > > > > >
>> > > > > >
>> > > > > > *DAG Fingerprinting: *this can be tricky, especially in
>regards to
>> > > > > dynamic
>> > > > > > > DAGs, where in some cases each parsing of the DAG can
>result in a
>> > > > > > different
>> > > > > > > fingerprint. I think DAG and tasks attributes are left
>out from
>> > the
>> > > > > > > proposal that should be considered as part of the
>fingerprint,
>> > like
>> > > > > > trigger
>> > > > > > > rules or task start/end datetime. We should do a full
>pass of all
>> > > DAG
>> > > > > > > arguments and make sure we're not forgetting anything
>that can
>> > > change
>> > > > > > > scheduling logic. Also, let's be careful that something
>as simple
>> > > as
>> > > > a
>> > > > > > > dynamic start or end date on a task could lead to a
>different
>> > > version
>> > > > > > each
>> > > > > > > time you parse.
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > The short version of Dag Fingerprinting would be
>> > > > > > just a hash of the Serialized DAG.
>> > > > > >
>> > > > > > *Example DAG*: https://imgur.com/TVuoN3p
>> > > > > > *Example Serialized DAG*: https://imgur.com/LmA2Bpr
>> > > > > >
>> > > > > > It contains all the task & DAG parameters. When they
>change,
>> > > Scheduler
>> > > > > > writes
>> > > > > > a new version of Serialized DAGs to the DB. The Webserver
>then
>> > reads
>> > > > the
>> > > > > > DAGs from the DB.
>> > > > > >
>> > > > > > I'd recommend limiting serialization/storage of one version
>> > > > > > > per DAG Run, as opposed to potentially everytime the DAG
>is
>> > parsed
>> > > -
>> > > > > once
>> > > > > > > the version for a DAG run is pinned, fingerprinting is
>not
>> > > > re-evaluated
>> > > > > > > until the next DAG run is ready to get created.
>> > > > > >
>> > > > > >
>> > > > > > This is to handle Scenario 3 where a DAG structure is
>changed
>> > > mid-way.
>> > > > > > Since we don't intend to
>> > > > > > change the execution behaviour, if we limit Storage of 1
>version
>> > per
>> > > > DAG,
>> > > > > > it won't actually show what
>> > > > > > was run.
>> > > > > >
>> > > > > > Example Dag v1: Task A -> Task B -> Task C
>> > > > > > The worker has completed the execution of Task B and is
>just about
>> > to
>> > > > > > complete the execution of Task B.
>> > > > > >
>> > > > > > The 2nd version of DAG is deployed: Task A -> Task D
>> > > > > > Now Scheduler queued Task D and it will run to completion.
>(Task C
>> > > > won't
>> > > > > > run)
>> > > > > >
>> > > > > > In this case, "the actual representation of the DAG" that
>run is
>> > > > neither
>> > > > > v1
>> > > > > > nor v2 but a "mixed version"
>> > > > > >  (Task A -> Task B -> Task D). The plan is that the
>Scheduler will
>> > > > create
>> > > > > > this "mixed version" based on what ran
>> > > > > > and the Graph View would show this "mixed version".
>> > > > > >
>> > > > > > There would also be a toggle button on the Graph View to
>select v1
>> > or
>> > > > v2
>> > > > > > where the tasks will be highlighted to show
>> > > > > > that a particular task was in v1 or v2 as shown in
>> > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>https://cwiki.apache.org/confluence/download/attachments/158868919/Picture%201.png?version=2&modificationDate=1595612863000&api=v2
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > *Visualizing change in the tree view:* I think this is very
>complex
>> > > and
>> > > > > > > many things can make this view impossible to render (task
>> > > dependency
>> > > > > > > reversal, cycles across versions, ...). Maybe a better
>visual
>> > > > approach
>> > > > > > > would be to render independent, individual tree views for
>each
>> > DAG
>> > > > > > version
>> > > > > > > (side by side), and doing best effort aligning the tasks
>across
>> > > > blocks
>> > > > > > and
>> > > > > > > "linking" tasks with lines across blocks when necessary.
>> > > > > >
>> > > > > >
>> > > > > > Agreed, the plan is to do the best effort aligning.
>> > > > > > At this point in time, task additions to the end of the DAG
>are
>> > > > expected
>> > > > > to
>> > > > > > be compatible,
>> > > > > > but changes to task structure within the DAG may cause the
>tree
>> > view
>> > > > not
>> > > > > to
>> > > > > > incorporate “old” and “new” in the same view, hence that
>won't be
>> > > > shown.
>> > > > > >
>> > > > > > Regards,
>> > > > > > Kaxil
>> > > > > >
>> > > > > > On Mon, Jul 27, 2020 at 6:02 PM Maxime Beauchemin <
>> > > > > > maximebeauche...@gmail.com> wrote:
>> > > > > >
>> > > > > > > Some notes and ideas:
>> > > > > > >
>> > > > > > > *DAG Fingerprinting: *this can be tricky, especially in
>regards
>> > to
>> > > > > > dynamic
>> > > > > > > DAGs, where in some cases each parsing of the DAG can
>result in a
>> > > > > > different
>> > > > > > > fingerprint. I think DAG and tasks attributes are left
>out from
>> > the
>> > > > > > > proposal that should be considered as part of the
>fingerprint,
>> > like
>> > > > > > trigger
>> > > > > > > rules or task start/end datetime. We should do a full
>pass of all
>> > > DAG
>> > > > > > > arguments and make sure we're not forgetting anything
>that can
>> > > change
>> > > > > > > scheduling logic. Also, let's be careful that something
>as simple
>> > > as
>> > > > a
>> > > > > > > dynamic start or end date on a task could lead to a
>different
>> > > version
>> > > > > > each
>> > > > > > > time you parse. I'd recommend limiting
>serialization/storage of
>> > one
>> > > > > > version
>> > > > > > > per DAG Run, as opposed to potentially everytime the DAG
>is
>> > parsed
>> > > -
>> > > > > once
>> > > > > > > the version for a DAG run is pinned, fingerprinting is
>not
>> > > > re-evaluated
>> > > > > > > until the next DAG run is ready to get created.
>> > > > > > >
>> > > > > > > *Visualizing change in the tree view:* I think this is
>very
>> > complex
>> > > > and
>> > > > > > > many things can make this view impossible to render (task
>> > > dependency
>> > > > > > > reversal, cycles across versions, ...). Maybe a better
>visual
>> > > > approach
>> > > > > > > would be to render independent, individual tree views for
>each
>> > DAG
>> > > > > > version
>> > > > > > > (side by side), and doing best effort aligning the tasks
>across
>> > > > blocks
>> > > > > > and
>> > > > > > > "linking" tasks with lines across blocks when necessary.
>> > > > > > >
>> > > > > > > On Fri, Jul 24, 2020 at 12:46 PM Vikram Koka <
>> > vik...@astronomer.io
>> > > >
>> > > > > > wrote:
>> > > > > > >
>> > > > > > > > Team,
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > We just created 'AIP-36 DAG Versioning' on Confluence
>and would
>> > > > very
>> > > > > > much
>> > > > > > > > appreciate feedback and suggestions from the community.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-36+DAG+Versioning
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > The DAG Versioning concept has been discussed on
>multiple
>> > > occasions
>> > > > > in
>> > > > > > > the
>> > > > > > > > past and has been a topic highlighted as part of
>Airflow 2.0 as
>> > > > well.
>> > > > > > We
>> > > > > > > at
>> > > > > > > > Astronomer have heard data engineers at several
>enterprises ask
>> > > > about
>> > > > > > > this
>> > > > > > > > feature as well, for easier debugging when changes are
>made to
>> > > DAGs
>> > > > > as
>> > > > > > a
>> > > > > > > > result of evolving business needs.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > As described in the AIP, we have a proposal focused on
>ensuring
>> > > > that
>> > > > > > the
>> > > > > > > > visibility behaviour of Airflow is correct, without
>changing
>> > the
>> > > > > > > execution
>> > > > > > > > behaviour. We considered changing the execution
>behaviour as
>> > > well,
>> > > > > but
>> > > > > > > > decided that the risks in changing execution behavior
>were too
>> > > high
>> > > > > as
>> > > > > > > > compared to the benefits and therefore decided to limit
>the
>> > scope
>> > > > to
>> > > > > > only
>> > > > > > > > making sure that the visibility was correct.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > We would like to attempt this based on our experience
>running
>> > > > Airflow
>> > > > > > as
>> > > > > > > a
>> > > > > > > > service. We believe that this benefits Airflow as a
>project and
>> > > the
>> > > > > > > > development experience of data engineers using Airflow
>across
>> > the
>> > > > > > world.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >  Any feedback, suggestions, and comments would be
>greatly
>> > > > > appreciated.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > Best Regards,
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > Kaxil Naik, Ryan Hamilton, Ash Berlin-Taylor, and
>Vikram Koka
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> > >
>> > > --
>> > >
>> > > Jacob Ward    |    Graduate Data Infrastructure Engineer
>> > >
>> > > jw...@brandwatch.com
>> > >
>> > >
>> > > NEW YORK   | BOSTON   | BRIGHTON   | LONDON   | BERLIN |  
>STUTTGART |
>> > > PARIS   | SINGAPORE | SYDNEY
>> > >
>> >
>
>
>
>--
>
>Jarek Potiuk
>Polidea | Principal Software Engineer
>
>M: +48 660 796 129

Reply via email to