I have one more general comment, that's worth discussing. While
revising AIP-67 Multi-team, I realised that likely one of the aspects
of "isolation" is not handled here - i. e. DAGFileProcessor parsing.
When converting AIP-67 to use AIP-72 kind of API instead of AIP-44 I
see that there is (likely) a gap to handle there. Of course - in
secure environment -  we cannot allow DAG file parsing to execute
without isolation (same as tasks ) as it provides very similar kind of
security access problems as running task with access to the database
-the code for DAG parsing executed as subprocesses of Scheduler can
potentially access the DB (because it has access to scheduler
configuration) so the whole benefit from having tasks isolated is
gone.

So the question  Ash and others I would love to hear your opinion on
how we should handle this.

1) Should we allow Airflow to run DAGFileProcessor as part of the scheduler ?
2) How should we treat a standalone dag file processor in terms of the
Task API (obviously if we want to keep DB isolation, it can't access
the DB as it does today) ?
3) What do we do with the callbacks that are currently run by the
DAGFileProcessor ?

I believe - this is my line of thought at least that DAG FileProcessor
parsing DAG file should be a specialized kind of task (And I think one
of the comments in the examples in AIP-72 of Ash is also suggesting
that ("but allows for "ParseDAGFile" etc. in future"). I believe that
was the intention, I am bit worried about the "future" part though,
because I think without isolating DAGFileProcessor the whole "Task
API" approach is kinda "toothless" - leaving a big gaping security
hole - so i think the "future" of making DAGFileProcessor talk through
AIP-72 is "Airflow 3.0".

I would love Ash's and others thoughts about it. I made a reference
about it in AIP-67
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-67+Multi-team+deployment+of+Airflow+components
leaving TODO for the result of this discussion. And I can also briefly
talk about it tomorrow.

J.

On Tue, Jul 9, 2024 at 5:27 PM Michał Modras
<michalmod...@google.com.invalid> wrote:
>
> Thanks for work on this Ash, great to see how the proposal is getting more
> material. Overall I'm positive about the general direction, I left a few
> comments (and questions) on some of the sections.
>
> On Mon, Jul 8, 2024 at 6:11 PM Jarek Potiuk <ja...@potiuk.com> wrote:
>
> > > I’d love to support websockets/async but having a UI action “instantly”
> > (or even near instantly) be sent to the task on the worker is a very
> > difficult thing to achieve (you need some broadcast or message bus type
> > process) so I don’t want to tie this AIP on to needing that.
> >
> > Fair.
> >
> > On Mon, Jul 8, 2024 at 6:08 PM Ash Berlin-Taylor <a...@apache.org> wrote:
> > >
> > > My view is that Async/websockets or not is (largely) an implementation
> > detail and not something we have to worry about at this stage.
> > >
> > > By that I mean that the messages that flow back and forth between server
> > and client are unchanged (and it’s either JSON/msgspec/gRPC etc). In the
> > specific case of a task termination request I’d say that we can just have
> > the next time the task heartbeats it can get cancelled, as all task status
> > changes are asynchronous in action by definition.
> > >
> > > I’d love to support websockets/async but having a UI action “instantly”
> > (or even near instantly) be sent to the task on the worker is a very
> > difficult thing to achieve (you need some broadcast or message bus type
> > process) so I don’t want to tie this AIP on to needing that.
> > >
> > > Ash
> > >
> > > > On 5 Jul 2024, at 17:31, Jarek Potiuk <ja...@potiuk.com> wrote:
> > > >
> > > > I have a comment there - originated from Jens' question in the
> > > > document - related to some basic setup of the API and specifically
> > > > async vs. sync approach. I have a feeling that the API for tasks would
> > > > benefit a lot from using websockets and async-first approach.
> > > > Previously we've been doing heartbeating on our own, while websockets
> > > > have built in capability of heartbeating opened connecitions, and by
> > > > the fact that websocket communication is bi-directional, it would
> > > > allow for things like almost instantaneous killing of running tasks
> > > > rather than waiting for heartbeats.
> > > >
> > > > I'd say now is a good time to think about it - and maybe some of us
> > > > have bigger experience with async api / websockets to be able to share
> > > > their experiences, but since we are moving to "Http" interface for
> > > > tasks, async way of communication via websockets is out there for
> > > > quite a while and it has some undeniable advantages, and there are a
> > > > number of frameworks (including FastAPI) that support it and possibly
> > > > it's the best time to consider it.
> > > >
> > > > J.
> > > >
> > > >
> > > > On Wed, Jul 3, 2024 at 3:39 PM Ash Berlin-Taylor <a...@apache.org>
> > wrote:
> > > >>
> > > >> Hi all,
> > > >>
> > > >> I’ve made some small changes to this AIP and I’m now happy with the
> > state of it.
> > > >>
> > > >> First, a general point: I’ve tried to not overly-specify too many of
> > the details on this one — for instance how viewing in-progress log will be
> > handled is a TBD, but we know the constraints and the final details can
> > shake our during implementation.
> > > >>
> > > >> A summary of changes since the previous link:
> > > >>
> > > >> - Added a section on "Extend Executor interface” to tidy up the
> > executor interface and move us away from the “run this command string”
> > approach. I’ve named this new thing “Activity”. (In the past we have thrown
> > around the name “workload”, but that is too close to “workflow” which is
> > analogous to a DAG so I’ve picked a different name)
> > > >> - Add an example of how Celery tasks might get context injected (
> > > >> - Note that triggerers won’t be allowed direct DB access anymore
> > either, they run user code so are all just workers
> > > >> - Add some simple version/feature introspection idea to the API so
> > that it’s easier to build forward/backwards compatibility in to workers if
> > need.
> > > >>
> > > >>
> > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=311626182&originalVersion=16&revisedVersion=20
> > > >>
> > > >>
> > > >> I’d like to start a vote on this soon, but given the 4th July holiday
> > in the US I suspect we will be a bit reduced in presences of people, so
> > I’ll give people until Tuesday 9th July to comment and will start the vote
> > then if there are no major items outstanding.
> > > >>
> > > >> Thanks,
> > > >> Ash
> > > >>
> > > >>
> > > >>> On 14 Jun 2024, at 19:55, Jarek Potiuk <ja...@potiuk.com> wrote:
> > > >>>
> > > >>> First pass done - especially around security aspects of it, Looks
> > great.
> > > >>>
> > > >>> On Fri, Jun 14, 2024 at 2:55 PM Ash Berlin-Taylor <a...@apache.org>
> > wrote:
> > > >>>
> > > >>>> I’ve written up a lot more of the implementation details into an AIP
> > > >>>> https://cwiki.apache.org/confluence/x/xgmTEg
> > > >>>>
> > > >>>> It’s still marked as Draft/Work In Progress for now as there are few
> > > >>>> details we know we need to cover before the doc is complete.
> > > >>>>
> > > >>>> (There was also some discussion in the dev call about a different
> > name for
> > > >>>> this AIP)
> > > >>>>
> > > >>>>> On 7 Jun 2024, at 19:25, Ash Berlin-Taylor <a...@apache.org> wrote:
> > > >>>>>
> > > >>>>>> IMHO - if we do not want to support DB access at all from workers,
> > > >>>>> triggerrers and DAG file processors, we should replace the current
> > "DB"
> > > >>>>> bound interface with a new one specifically designed for this
> > > >>>>> bi-directional direct communication Executor <-> Workers,
> > > >>>>>
> > > >>>>> That is exactly what I was thinking too (both that no DB should be
> > the
> > > >>>> only option in v3, and that we need a bidirectional purpose designed
> > > >>>> interface) and am working up the details.
> > > >>>>>
> > > >>>>> One of the key features of this will be giving each task try a
> > "strong
> > > >>>> identity" that the API server can use to identify and trust the
> > requests,
> > > >>>> likely some form of signed JWT.
> > > >>>>>
> > > >>>>> I just need to finish off some other work before I can move over to
> > > >>>> focus Airflow fully
> > > >>>>>
> > > >>>>> -a
> > > >>>>>
> > > >>>>> On 7 June 2024 18:01:56 BST, Jarek Potiuk <ja...@potiuk.com>
> > wrote:
> > > >>>>>> I added some comments here and I think there is one big thing
> > that
> > > >>>> should
> > > >>>>>> be clarified when we get to "task isolation" - mainly dependance
> > of it
> > > >>>> on
> > > >>>>>> AIP-44.
> > > >>>>>>
> > > >>>>>> The Internal gRPC API (AIP-44) was only designed in the way it was
> > > >>>> designed
> > > >>>>>> to allow using the same codebase to be used with/without DB. It's
> > based
> > > >>>> on
> > > >>>>>> the assumption that a limited set of changes will be needed (that
> > was
> > > >>>>>> underestimated) in order to support both DB and GRPC ways of
> > > >>>> communication
> > > >>>>>> between workers/triggerers/DAG file processors at the same time.
> > That
> > > >>>> was a
> > > >>>>>> basic assumption for AIP-44 - that we will want to keep both ways
> > and
> > > >>>>>> maximum backwards compatibility (including "pull" model of worker
> > > >>>> getting
> > > >>>>>> connections, variables, and updating task state in the Airflow
> > DB). We
> > > >>>> are
> > > >>>>>> still using "DB" as a way to communicate between those components
> > and
> > > >>>> this
> > > >>>>>> does not change with AIP-44.
> > > >>>>>>
> > > >>>>>> But for Airflow 3 the whole context is changed. If we go with the
> > > >>>>>> assumption that Airflow 3 will only have isolated tasks and no DB
> > > >>>> "option",
> > > >>>>>> I personally think using AIP-44 for that is a mistake. AIP-44 is
> > merely
> > > >>>> a
> > > >>>>>> wrapper over existing DB calls designed to be kept updated
> > together with
> > > >>>>>> the DB code, and the whole synchronisation of state, heartbeats,
> > > >>>> variables
> > > >>>>>> and connection access still uses the same "DB communication"
> > model and
> > > >>>>>> there is basically no way we can get it more scalable this way.
> > We will
> > > >>>>>> still have the same limitations on the DB - where a number of DB
> > > >>>>>> connections will be replaced with a number of GRPC connections,
> > > >>>> Essentially
> > > >>>>>> - more scalability and performance has never been the goal of
> > AIP-44-
> > > >>>> all
> > > >>>>>> the assumptions are that it only brings isolation but nothing
> > more will
> > > >>>>>> change. So I think it does not address some of the fundamental
> > problems
> > > >>>>>> stated in this "isolation" document.
> > > >>>>>>
> > > >>>>>> Essentially AIP-44 merely exposes a small-ish number of methods
> > (bigger
> > > >>>>>> than initially anticipated) but it only wraps around the existing
> > DB
> > > >>>>>> mechanism. Essentially from the performance and scalability point
> > of
> > > >>>> view -
> > > >>>>>> we do not get much more than currently when using pgbouncer. This
> > one
> > > >>>>>> essentially turns a big number of connections coming from workers
> > into a
> > > >>>>>> smaller number of pooled connections that pgbounder manages
> > internal and
> > > >>>>>> multiplexes the calls over. With the difference that unlike AIP-44
> > > >>>> Internal
> > > >>>>>> API server, pgbouncer does not limit the operations you can do
> > from the
> > > >>>>>> worker/triggerer/dag file processor - that's the main difference
> > between
> > > >>>>>> using pgbouncer and using our own Internal-API server.
> > > >>>>>>
> > > >>>>>> IMHO - if we do not want to support DB access at all from workers,
> > > >>>>>> triggerrers and DAG file processors, we should replace the
> > current "DB"
> > > >>>>>> bound interface with a new one specifically designed for this
> > > >>>>>> bi-directional direct communication Executor <-> Workers, more in
> > line
> > > >>>> with
> > > >>>>>> what Jens described in AIP-69 (and for example WebSocket and
> > > >>>> asynchronous
> > > >>>>>> communication comes immediately to my mind if I did not have to
> > use DB
> > > >>>> for
> > > >>>>>> that communication). This is also why I put the AIP-67 on hold
> > because
> > > >>>> IF
> > > >>>>>> we go that direction that we have "new" interface between worker,
> > > >>>> triggerer
> > > >>>>>> , dag file processor - it might be way easier (and safer) to
> > introduce
> > > >>>>>> multi-team in Airflow 3 rather than 2 (or we can implement it
> > > >>>> differently
> > > >>>>>> in Airflow 2 and differently in Airflow 3).
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On Tue, Jun 4, 2024 at 3:58 PM Vikram Koka
> > <vik...@astronomer.io.invalid
> > > >>>>>
> > > >>>>>> wrote:
> > > >>>>>>
> > > >>>>>>> Fellow Airflowers,
> > > >>>>>>>
> > > >>>>>>> I am following up on some of the proposed changes in the Airflow
> > 3
> > > >>>> proposal
> > > >>>>>>> <
> > > >>>>>>>
> > > >>>>
> > https://docs.google.com/document/d/1MTr53101EISZaYidCUKcR6mRKshXGzW6DZFXGzetG3E/
> > > >>>>>>>> ,
> > > >>>>>>> where more information was requested by the community,
> > specifically
> > > >>>> around
> > > >>>>>>> the injection of Task Execution Secrets. This topic has been
> > discussed
> > > >>>> at
> > > >>>>>>> various times with a variety of names, but here is a holistic
> > proposal
> > > >>>>>>> around the whole task context mechanism.
> > > >>>>>>>
> > > >>>>>>> This is not yet a full fledged AIP, but is intended to
> > facilitate a
> > > >>>>>>> structured discussion, which will then be followed up with a
> > formal AIP
> > > >>>>>>> within the next two weeks. I have included most of the text
> > here, but
> > > >>>>>>> please give detailed feedback in the attached document
> > > >>>>>>> <
> > > >>>>>>>
> > > >>>>
> > https://docs.google.com/document/d/1BG8f4X2YdwNgHTtHoAyxA69SC_X0FFnn17PlzD65ljA/
> > > >>>>>>>> ,
> > > >>>>>>> so that we can have a contextual discussion around specific
> > points
> > > >>>> which
> > > >>>>>>> may need more detail.
> > > >>>>>>> ---
> > > >>>>>>> Motivation
> > > >>>>>>>
> > > >>>>>>> Historically, Airflow’s task execution context has been oriented
> > around
> > > >>>>>>> local execution within a relatively trusted networking cluster.
> > > >>>>>>>
> > > >>>>>>> This includes:
> > > >>>>>>>
> > > >>>>>>> -
> > > >>>>>>>
> > > >>>>>>> the interaction between the Executor and the process of
> > launching a
> > > >>>> task
> > > >>>>>>> on Airflow Workers,
> > > >>>>>>> -
> > > >>>>>>>
> > > >>>>>>> the interaction between the Workers and the Airflow
> > meta-database for
> > > >>>>>>> connection and environment information as part of initial task
> > > >>>> startup,
> > > >>>>>>> -
> > > >>>>>>>
> > > >>>>>>> the interaction between the Airflow Workers and the rest of
> > Airflow
> > > >>>> for
> > > >>>>>>> heartbeat information, and so on.
> > > >>>>>>>
> > > >>>>>>> This has been accomplished by colocating all of the Airflow task
> > > >>>> execution
> > > >>>>>>> code with the user task code in the same container and process.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> For Airflow users at scale i.e. supporting multiple data teams,
> > this
> > > >>>> has
> > > >>>>>>> posed many operational challenges:
> > > >>>>>>>
> > > >>>>>>> -
> > > >>>>>>>
> > > >>>>>>> Dependency conflicts for administrators supporting data teams
> > using
> > > >>>>>>> different versions of providers, libraries, or python packages
> > > >>>>>>> -
> > > >>>>>>>
> > > >>>>>>> Security challenge in the running of customer-defined code (task
> > code
> > > >>>>>>> within the DAGs) for multiple customers within the same operating
> > > >>>>>>> environment and service accounts
> > > >>>>>>> -
> > > >>>>>>>
> > > >>>>>>> Scalability of Airflow since one of the core Airflow scalability
> > > >>>>>>> limitations has been the number of concurrent database
> > connections
> > > >>>>>>> supported by the underlying database instance. To alleviate this
> > > >>>>>>> problem,
> > > >>>>>>> we have consistently, as an Airflow community, recommended the
> > use of
> > > >>>>>>> PgBouncer for connection pooling, as part of an Airflow
> > deployment.
> > > >>>>>>> -
> > > >>>>>>>
> > > >>>>>>> Operational issues caused by unintentional reliance on internal
> > > >>>> Airflow
> > > >>>>>>> constructs within the DAG/Task code, which only and unexpectedly
> > show
> > > >>>>>>> up as
> > > >>>>>>> part of Airflow production operations, coincidentally with, but
> > not
> > > >>>>>>> limited
> > > >>>>>>> to upgrades and migrations.
> > > >>>>>>> -
> > > >>>>>>>
> > > >>>>>>> Operational management based on the above for Airflow platform
> > teams
> > > >>>> at
> > > >>>>>>> scale, because different data teams naturally operate at
> > different
> > > >>>>>>> velocities. Attempting to support these different teams with a
> > common
> > > >>>>>>> Airflow environment is unnecessarily challenging.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> The internal API to reduce the need for interaction between the
> > Airflow
> > > >>>>>>> Workers and the metadatabase is a big and necessary step forward.
> > > >>>> However,
> > > >>>>>>> it doesn’t fully address the above challenges. The proposal below
> > > >>>> builds on
> > > >>>>>>> the internal API proposal and goes significantly further to not
> > only
> > > >>>>>>> address these challenges above, but also enable the following
> > key use
> > > >>>>>>> cases:
> > > >>>>>>>
> > > >>>>>>> 1.
> > > >>>>>>>
> > > >>>>>>> Ensure that this interface reduces the interaction between the
> > code
> > > >>>>>>> running within the Task and the rest of Airflow. This is to
> > address
> > > >>>>>>> unintended ripple effects from core Airflow changes which has
> > caused
> > > >>>>>>> numerous Airflow upgrade issues, because Task (i.e. DAG) code
> > relied
> > > >>>> on
> > > >>>>>>> Core Airflow abstractions. This has been a common problem pointed
> > > >>>> out by
> > > >>>>>>> numerous Airflow users including early adopters.
> > > >>>>>>> 2.
> > > >>>>>>>
> > > >>>>>>> Enable quick, performant execution of tasks on local, trusted
> > > >>>> networks,
> > > >>>>>>> without requiring the Airflow workers / tasks to connect to the
> > > >>>> Airflow
> > > >>>>>>> database to obtain all the information required for task startup,
> > > >>>>>>> 3.
> > > >>>>>>>
> > > >>>>>>> Enable remote execution of Airflow tasks across network
> > boundaries,
> > > >>>> by
> > > >>>>>>> establishing a clean interface for Airflow workers on remote
> > networks
> > > >>>>>>> to be
> > > >>>>>>> able to connect back to a central Airflow service to access all
> > > >>>>>>> information
> > > >>>>>>> needed for task execution. This is foundational work for remote
> > > >>>>>>> execution.
> > > >>>>>>> 4.
> > > >>>>>>>
> > > >>>>>>> Enable a clean language agnostic interface for task execution,
> > with
> > > >>>>>>> support for multiple language bindings, so that Airflow tasks
> > can be
> > > >>>>>>> written in languages beyond Python.
> > > >>>>>>>
> > > >>>>>>> Proposal
> > > >>>>>>>
> > > >>>>>>> The proposal here has multiple parts as detailed below.
> > > >>>>>>>
> > > >>>>>>> 1.
> > > >>>>>>>
> > > >>>>>>> Formally split out the Task Execution Interface as the Airflow
> > Task
> > > >>>> SDK
> > > >>>>>>> (possibly name it as the Airflow SDK), which would be the only
> > > >>>>>>> interface to
> > > >>>>>>> and from Airflow Task User code to the Airflow system components
> > > >>>>>>> including
> > > >>>>>>> the meta-database, Airflow Executor, etc.
> > > >>>>>>> 2.
> > > >>>>>>>
> > > >>>>>>> Disable all direct database interaction from the Airflow Workers
> > > >>>>>>> including Tasks being run on those Airflow Workers and the
> > Airflow
> > > >>>>>>> meta-database.
> > > >>>>>>> 3.
> > > >>>>>>>
> > > >>>>>>> The Airflow Task SDK will include interfaces for:
> > > >>>>>>> -
> > > >>>>>>>
> > > >>>>>>>    Access to needed Airflow Connections, Variables, and XCom
> > values
> > > >>>>>>>    -
> > > >>>>>>>
> > > >>>>>>>    Report heartbeat
> > > >>>>>>>    -
> > > >>>>>>>
> > > >>>>>>>    Record logs
> > > >>>>>>>    -
> > > >>>>>>>
> > > >>>>>>>    Report metrics
> > > >>>>>>>    4.
> > > >>>>>>>
> > > >>>>>>> The Airflow Task SDK will support a Push mechanism for speedy
> > local
> > > >>>>>>> execution in trusted environments.
> > > >>>>>>> 5.
> > > >>>>>>>
> > > >>>>>>> The Airflow Task SDK will also support a Pull mechanism for the
> > > >>>> remote
> > > >>>>>>> Task execution environments to access information from an Airflow
> > > >>>>>>> instance
> > > >>>>>>> over network boundaries.
> > > >>>>>>> 6.
> > > >>>>>>>
> > > >>>>>>> The Airflow Task SDK will be designed to support multiple
> > language
> > > >>>>>>> bindings, with the first language binding of course being Python.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> Assumption: The existing AIP for Internal API covers the
> > interaction
> > > >>>>>>> between the Airflow workers and Airflow metadatabase for
> > heartbeat
> > > >>>>>>> information, persisting XComs, and so on.
> > > >>>>>>> --
> > > >>>>>>>
> > > >>>>>>> Best regards,
> > > >>>>>>>
> > > >>>>>>> Vikram Koka, Ash Berlin-Taylor, Kaxil Naik, and Constance
> > Martineau
> > > >>>>>>>
> > > >>>>
> > > >>>>
> > > >>
> > > >
> > > > ---------------------------------------------------------------------
> > > > To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org
> > > > For additional commands, e-mail: dev-h...@airflow.apache.org
> > > >
> > >
> > >
> > > ---------------------------------------------------------------------
> > > To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org
> > > For additional commands, e-mail: dev-h...@airflow.apache.org
> > >
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org
> > For additional commands, e-mail: dev-h...@airflow.apache.org
> >
> >

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@airflow.apache.org
For additional commands, e-mail: dev-h...@airflow.apache.org

Reply via email to