No, you don't have to use TaskFlow API (i.e. you can use the classic operator style) to create mapped tasks. But it does have to be a *task* i.e. it can't just be a python function. At least that's how it is now. If I misunderstood your question perhaps follow up with an example.
On Sun, Jun 19, 2022 at 2:52 PM Joe Auty <[email protected]> wrote: > I've gotten this working now, thanks very much! > > My problem was my DAG wasn't using the TaskFlow API. My understanding is > that the context object is provided as a freebie via the @task decorator? > > Would it be accurate to say that the Taskflow API is a virtual requirement > for dynamic task mapping? I say "virtual", because any sort of workflow > that requires multi-tenancy/concurrency is going to need to some sort of > unique ID/dag run to tap into, and without reinventing the wheel this is > best provided through the Airflow context? > > Just clarifying this for my documentation PR... > > > > Daniel Standish wrote on 2022-06-15 6:09 PM: > > it doesn't look like you've made schema_dump_input_cmds a task... maybe > provide the full dag > > Accessing context variables in taskflow api is documented here: > https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html#accessing-context-variables-in-decorated-tasks > > On Wed, Jun 15, 2022 at 5:07 PM Joe Auty <[email protected]> wrote: > >> It looks like the context is not passed through: >> >> def schema_dump_input_cmds(**context): >> print(context) >> >> schema_dump = KubernetesPodOperator.partial( >> task_id="schema-dump-input", >> namespace=NAMESPACE, >> image=REGISTRY_URL + "/postgres-client:12-" + AGENT_VERSION, >> .. snipped >> max_active_tis_per_dag=1, >> dag=dag >> ).expand( >> cmds=schema_dump_input_cmds(), >> env_vars=schema_dump_input_env_vars() >> ) >> >> >> I'm seeing an empty object in my logs. Any idea why, or any suggestions >> here? >> >> >> >> >> Daniel Standish wrote on 2022-06-15 12:57 PM: >> >> Turns out this issue is actually documented here: >> https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html#how-do-templated-fields-and-mapped-arguments-interact >> >> On Wed, Jun 15, 2022 at 12:33 PM Daniel Standish < >> [email protected]> wrote: >> >>> Yeah it does seem that templating does not work with expanded params at >>> the moment. No promises but I would bet this will change at some point. >>> Seems reasonable and I can't think of a technical problem. >>> >>> Tal's approach looks good. I came up with basically the same thing when >>> testing this out: >>> >>> @dag.taskdef gen_vars(ds=None): >>> return [{"MY_VAR": ds} for x in range(3)] >>> >>> op2 = BashOperator.partial(task_id='task_mapped', bash_command="echo >>> $MY_VAR",).expand( >>> env=gen_vars(),) >>> >>> One thing I noticed in your example was, it appears, maybe you are >>> mapping from two upstream tasks, for two mapped arguments. You may have >>> noticed, this will result in a cartesian product (maybe this is desirable >>> for you?). This is an area we are actively working on, so that you can >>> provide multiple kwargs to map from a single task. >>> >>> >>> >>> On Wed, Jun 15, 2022 at 3:36 AM Tal Nagar <[email protected]> wrote: >>> >>>> Hi Joe >>>> >>>> Not sure if it's the recommended way to do it, however what worked for >>>> me is rendering the templates in a python task and not directly in the >>>> KubernetesPodOperator. >>>> >>>> For example: >>>> >>>> >>>> >>>> @task() >>>> >>>> def create_cmds(**context): >>>> >>>> run_params = context["params"] >>>> >>>> size = int(run_params['size']) >>>> >>>> arr = [] >>>> >>>> for s in range(int(size)): >>>> >>>> arr.append([f'echo hi {s}']) >>>> >>>> return arr >>>> >>>> >>>> >>>> say_hi = KubernetesPodOperator.partial( >>>> >>>> image="alpine", >>>> >>>> cmds=["/bin/sh", "-c"], >>>> >>>> task_id='test', >>>> >>>> name='test' >>>> >>>> ).expand(arguments=create_cmds()) >>>> >>>> >>>> >>>> *From:* Joe Auty <[email protected]> >>>> *Sent:* Tuesday, June 14, 2022 10:03 AM >>>> *To:* [email protected] >>>> *Subject:* Templated fields and dynamic task mapping >>>> >>>> >>>> >>>> Hello, >>>> >>>> I'm trying to understand the docs here: >>>> https://airflow.apache.org/docs/apache-airflow/2.3.2/concepts/dynamic-task-mapping.html#how-do-templated-fields-and-mapped-arguments-interact >>>> , specifically this section: >>>> >>>> >>>> If you want to interpolate values either call task.render_template >>>> yourself, >>>> or use interpolation: >>>> >>>> >>>> In the example in the previous section of what doesn't work we have: >>>> >>>> printer.expand(val=make_list()) >>>> >>>> >>>> What should the corrected version of this line be? IOW, how would I >>>> call make_list passing in the context so that I can send templated fields >>>> to my mapping function? Here is a more specific use case: >>>> >>>> >>>> KubernetesPodOperator.partial( >>>> task_id="schema-dump-input", >>>> namespace=NAMESPACE, >>>> image=REGISTRY_URL + "/postgres-client:12", >>>> name="pg-schemadump", >>>> in_cluster=True, >>>> hostnetwork=False, >>>> max_active_tis_per_dag=1, >>>> dag=dag >>>> ).expand( >>>> cmds=schema_dump_input_cmds(ds), >>>> env_vars=schema_dump_input_env_vars(ds) >>>> ) >>>> >>>> In this example, ds has no value of course because it is not defined >>>> anywhere, and of course {{ ds }} doesn't work either (this doesn't get >>>> interpolated and is registered as literally "{{ ds }}"). How can I pass in >>>> a templated field, such as {{ ds }}? >>>> >>>> Thanks very much in advance! >>>> >>>> >>>> >>>> This message contains information that may be confidential. If you are >>>> not the intended recipient, please delete it immediately and notify us at >>>> [email protected]. Please note that any disclosure or copying of its >>>> content is forbidden >>>> >>> >> >
