FYI, I've updated the PR (https://github.com/apache/airflow/pull/62922)
description, which now contains "concrete" examples (using the Pokemon API
lol), there it's still using the old partition name, but that can be changed
once we have concensus.
Here is an example with "partitioning" (but which should be changed ofc, it's
just to give a usage example) to clarify as Jens asked previously and to avoid
any confusion regarding usage:
from airflow.sdk import dag, task
from airflow.providers.http.hooks.http import HttpHook, HttpAsyncHook
from pendulum import datetime
@dag(
start_date=datetime(2025, 1, 1),
schedule=None,
catchup=False,
)
def pokemon_partitioned_iteration():
@task
def list_pokemon() -> list[str]:
response = HttpHook(
http_conn_id="pokeapi",
method="GET",
).run(
endpoint="api/v2/pokemon?limit=100",
)
return [
pokemon["url"].replace("https://pokeapi.co/", "")
for pokemon in response.json()["results"]
]
@task(
retries=3,
task_concurrency=2,
show_return_value_in_logs=False,
)
async def get_pokemon(url: str):
async with HttpAsyncHook(
http_conn_id="pokeapi",
method="GET",
).session() as session:
response = await session.run(endpoint=url)
return await response.json()
get_pokemon.partition(size=2).iterate(
url=list_pokemon(),
)
pokemon_partitioned_iteration()
Side note: In the meantime, I also improved how XCom's are being pushed to the
API server by doing it asynchronously, which is better as it doesn't block the
event_loop and the comm supervisor already supports it on the client side
(thanks for the tip Amogh), this has nothing to do with the dedicate XCom route
I was talking about to get multiple XCom's via different keys, which would be a
future improvement.
Kr,
David
________________________________
From: Jarek Potiuk <[email protected]>
Sent: Friday, June 05, 2026 00:18
To: [email protected] <[email protected]>
Subject: Re: [DISCUSS] Choose a better name for Dynamic Task Partitioning and
thus the partition primitive
EXTERNAL MAIL: Indien je de afzender van deze e-mail niet kent en deze niet
vertrouwt, klik niet op een link of open geen bijlages. Bij twijfel, stuur deze
e-mail als bijlage naar [email protected]<mailto:[email protected]>.
Great minds think alike....?? Maybe your AIP And David's AIP have some very
common points? We have not seen your AIP yet :)
On Fri, Jun 5, 2026 at 12:15 AM Kaxil Naik <[email protected]> wrote:
> Funny you should say "step", I have an AIP coming along on that front, so
> please don't name it step -- specificall about "Task Steps". I should have
> it out soon.
>
> On Thu, 4 Jun 2026 at 23:03, Jarek Potiuk <[email protected]> wrote:
>
> > Hi David and everyone,
> >
> > I agree with Ash; the "Dynamic" part of the current naming is quite
> > misleading. I’ve been giving this some thought, and I believe we might be
> > looking for the wrong thing to name (batch) because we are looking at it
> > from the wrong angle.
> >
> > (And yes a lot of this comes after some chatting with Claude about my gut
> > feelings).
> >
> > To address concerns about communicating resilience and durability, we
> > should consider this from the perspective of the "durability unit."
> > Currently, Airflow has three such units:
> >
> > 1. Task: Regular tasks.
> > 2. Mapped Task: Individual elements in a task array.
> > 3. Deferred Task: Sub-task parts stored as state in the Triggerer DB.
> >
> > In all three cases, if the infrastructure fails, the entire unit is lost
> > and must restart from the beginning. David's new construct idea doesn't
> > change these execution-layer properties. Whether it’s a single task or a
> > mapped task, the "unit of durability" remains the same, even if the "unit
> > of work" within it changes.
> >
> > Therefore, we should use a different name for that "construct"—one that
> > clearly indicates these sequential actions are not tasks and do not carry
> > task-level durability guarantees. After exploring some options, I think
> > "step" is the most effective term. It has strong precedent in CI systems
> > (GitHub Actions, GitLab) and AWS Step Functions, where a job/task is the
> > durable unit and steps are subordinate, sequential, and ephemeral. Other
> > alternatives like "action," "leg," or "segment" are possible, but "step"
> > offers instant comprehension.
> >
> > Using "step" would also allow for a clean syntax that distinguishes it
> from
> > the @task decorator. For example:
> >
> > @step(retries=2)
> > def load(item):
> > warehouse.write(item)
> >
> > @task
> > def ingest_orders(data):
> > load.map(data)
> >
> > Or even do things asynchronously - not sequentially
> >
> > @step(retries=2)
> > async def load(item):
> > await warehouse.write(item)
> >
> > @task
> > async def ingest_orders(data):
> > load.map(data) # map normally is synchronous but we could make it
> work
> > with async steps
> >
> > This approach allows for invisible retries within a task and clearly
> > communicates that if a step fails, the task is the recovery point. It
> also
> > clarifies the "chunking" concept we were discussing (BTW. "chunk" is
> > probably what were looking for - but in this concept we don't have to
> name
> > it as a concept in Airflow):
> >
> > @step(retries=2)
> > async def load(item):
> > await warehouse.write(item)
> >
> > @task
> > async def process_chunk(chunk):
> > load.map(chunk)
> >
> > # Note! This is just an example of how we can add chunking - we could
> make
> > it better without a separate task, materialization and with some other
> > syntactic sugar
> > @task
> > def make_chunks(items, size=100):
> > return [items[i:i+size] for i in range(0, len(items), size)
> >
> > process_chunk.expand(chunk=make_chunks(get_items()))
> >
> > By introducing "step" as a distinct unit, we can more accurately
> > communicate that it lacks the independent durability of a task while
> > providing users with a familiar mental model.
> >
> > What do you think?
> >
> > Best regards,
> > Jarek
> >
>