No, you don't have to use TaskFlow API (i.e. you can use the classic
operator style) to create mapped tasks.  But it does have to be a *task* i.e.
it can't just be a python function.  At least that's how it is now.  If I
misunderstood your question perhaps follow up with an example.

On Sun, Jun 19, 2022 at 2:52 PM Joe Auty <[email protected]> wrote:

> I've gotten this working now, thanks very much!
>
> My problem was my DAG wasn't using the TaskFlow API. My understanding is
> that the context object is provided as a freebie via the @task decorator?
>
> Would it be accurate to say that the Taskflow API is a virtual requirement
> for dynamic task mapping? I say "virtual", because any sort of workflow
> that requires multi-tenancy/concurrency is going to need to some sort of
> unique ID/dag run to tap into, and without reinventing the wheel this is
> best provided through the Airflow context?
>
> Just clarifying this for my documentation PR...
>
>
>
> Daniel Standish wrote on 2022-06-15 6:09 PM:
>
> it doesn't look like you've made schema_dump_input_cmds a task... maybe
> provide the full dag
>
> Accessing context variables in taskflow api is documented here:
> https://airflow.apache.org/docs/apache-airflow/stable/tutorial_taskflow_api.html#accessing-context-variables-in-decorated-tasks
>
> On Wed, Jun 15, 2022 at 5:07 PM Joe Auty <[email protected]> wrote:
>
>> It looks like the context is not passed through:
>>
>> def schema_dump_input_cmds(**context):
>> print(context)
>>
>> schema_dump = KubernetesPodOperator.partial(
>> task_id="schema-dump-input",
>> namespace=NAMESPACE,
>> image=REGISTRY_URL + "/postgres-client:12-" + AGENT_VERSION,
>> .. snipped
>> max_active_tis_per_dag=1,
>> dag=dag
>> ).expand(
>> cmds=schema_dump_input_cmds(),
>> env_vars=schema_dump_input_env_vars()
>> )
>>
>>
>> I'm seeing an empty object in my logs. Any idea why, or any suggestions
>> here?
>>
>>
>>
>>
>> Daniel Standish wrote on 2022-06-15 12:57 PM:
>>
>> Turns out this issue is actually documented here:
>> https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html#how-do-templated-fields-and-mapped-arguments-interact
>>
>> On Wed, Jun 15, 2022 at 12:33 PM Daniel Standish <
>> [email protected]> wrote:
>>
>>> Yeah it does seem that templating does not work with expanded params at
>>> the moment.  No promises but I would bet this will change at some point.
>>> Seems reasonable and I can't think of a technical problem.
>>>
>>> Tal's approach looks good.  I came up with basically the same thing when
>>> testing this out:
>>>
>>> @dag.taskdef gen_vars(ds=None):
>>>     return [{"MY_VAR": ds} for x in range(3)]
>>>
>>> op2 = BashOperator.partial(task_id='task_mapped', bash_command="echo 
>>> $MY_VAR",).expand(
>>>     env=gen_vars(),)
>>>
>>> One thing I noticed in your example was, it appears, maybe you are
>>> mapping from two upstream tasks, for two mapped arguments.  You may have
>>> noticed, this will result in a cartesian product (maybe this is desirable
>>> for you?).  This is an area we are actively working on, so that you can
>>> provide multiple kwargs to map from a single task.
>>>
>>>
>>>
>>> On Wed, Jun 15, 2022 at 3:36 AM Tal Nagar <[email protected]> wrote:
>>>
>>>> Hi Joe
>>>>
>>>> Not sure if it's the recommended way to do it, however what worked for
>>>> me is rendering the templates in a python task and not directly in the
>>>> KubernetesPodOperator.
>>>>
>>>> For example:
>>>>
>>>>
>>>>
>>>>     @task()
>>>>
>>>>     def create_cmds(**context):
>>>>
>>>>         run_params = context["params"]
>>>>
>>>>         size = int(run_params['size'])
>>>>
>>>>         arr = []
>>>>
>>>>         for s in range(int(size)):
>>>>
>>>>             arr.append([f'echo hi {s}'])
>>>>
>>>>         return arr
>>>>
>>>>
>>>>
>>>>     say_hi = KubernetesPodOperator.partial(
>>>>
>>>>         image="alpine",
>>>>
>>>>         cmds=["/bin/sh", "-c"],
>>>>
>>>>         task_id='test',
>>>>
>>>>         name='test'
>>>>
>>>>     ).expand(arguments=create_cmds())
>>>>
>>>>
>>>>
>>>> *From:* Joe Auty <[email protected]>
>>>> *Sent:* Tuesday, June 14, 2022 10:03 AM
>>>> *To:* [email protected]
>>>> *Subject:* Templated fields and dynamic task mapping
>>>>
>>>>
>>>>
>>>> Hello,
>>>>
>>>> I'm trying to understand the docs here:
>>>> https://airflow.apache.org/docs/apache-airflow/2.3.2/concepts/dynamic-task-mapping.html#how-do-templated-fields-and-mapped-arguments-interact
>>>> , specifically this section:
>>>>
>>>>
>>>> If you want to interpolate values either call task.render_template 
>>>> yourself,
>>>> or use interpolation:
>>>>
>>>>
>>>> In the example in the previous section of what doesn't work we have:
>>>>
>>>> printer.expand(val=make_list())
>>>>
>>>>
>>>> What should the corrected version of this line be? IOW, how would I
>>>> call make_list passing in the context so that I can send templated fields
>>>> to my mapping function? Here is a more specific use case:
>>>>
>>>>
>>>> KubernetesPodOperator.partial(
>>>>         task_id="schema-dump-input",
>>>>         namespace=NAMESPACE,
>>>>         image=REGISTRY_URL + "/postgres-client:12",
>>>>         name="pg-schemadump",
>>>>         in_cluster=True,
>>>>         hostnetwork=False,
>>>>         max_active_tis_per_dag=1,
>>>>         dag=dag
>>>>     ).expand(
>>>>         cmds=schema_dump_input_cmds(ds),
>>>>         env_vars=schema_dump_input_env_vars(ds)
>>>>     )
>>>>
>>>> In this example, ds has no value of course because it is not defined
>>>> anywhere, and of course {{ ds }} doesn't work either (this doesn't get
>>>> interpolated and is registered as literally "{{ ds }}"). How can I pass in
>>>> a templated field, such as {{ ds }}?
>>>>
>>>> Thanks very much in advance!
>>>>
>>>>
>>>>
>>>> This message contains information that may be confidential. If you are
>>>> not the intended recipient, please delete it immediately and notify us at
>>>> [email protected]. Please note that any disclosure or copying of its
>>>> content is forbidden
>>>>
>>>
>>
>

Reply via email to