Re: [I] Bug: Unable to use expand on a TaskGroup containing a @task.kubernetes_cmd task [airflow]
potiuk closed issue #56459: Bug: Unable to use expand on a TaskGroup containing a @task.kubernetes_cmd task URL: https://github.com/apache/airflow/issues/56459 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [I] Bug: Unable to use expand on a TaskGroup containing a @task.kubernetes_cmd task [airflow]
akkik04 commented on issue #56459: URL: https://github.com/apache/airflow/issues/56459#issuecomment-3640392618 Put up a PR for this, looking forward to getting some eyes on it. š cc: @ephraimbuddy @kaxil -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [I] Bug: Unable to use expand on a TaskGroup containing a @task.kubernetes_cmd task [airflow]
akkik04 commented on issue #56459: URL: https://github.com/apache/airflow/issues/56459#issuecomment-3621352820 i want to take a shot at this, can one of the maintainers assign to me please. š -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [I] Bug: Unable to use expand on a TaskGroup containing a @task.kubernetes_cmd task [airflow]
tmaurin commented on issue #56459:
URL: https://github.com/apache/airflow/issues/56459#issuecomment-3469713714
Small update:
I'm not 100% it is the same thing, but I found out you can reproduce the
issue by using an fstring instead of referencing another variable.
In case it can be usefull starting from above example for the TaskGroup:
```
class CustomOperator(BaseOperator):
template_fields = (
"input_file",
)
def __init__(self, input_file, **kwargs):
# This line causes the error with MappedArgument inside of taskGroup
in Airflow 3.x
self.input_file = f"toto/{input_file}"
super().__init__(**kwargs)
def execute(self, context: Context):
print(f"file: {self.input_file}")
return self.input_file
@task()
def transmit_param(val: str) -> str:
return val
@dag(
dag_id="test_mapped_argument_split_issue3",
start_date=datetime(2025, 1, 1),
schedule=None,
catchup=False,
)
def test_mapped_argument_split():
"""Reproduce the MappedArgument split error"""
input_files = [
"s3://bucket/path/to/file1.csv",
"s3://bucket/path/to/file2.csv",
"s3://bucket/path/to/file3.csv",
]
@task_group()
def test_group(
input_file: str,
) -> CustomOperator:
return CustomOperator(input_file=f"{input_file}",
task_id="custom_operator")
test_group.expand(input_file=input_files)
```
Will print:
```
file: toto/MappedArgument(_input=DictOfListsExpandInput(value={'input_file':
['s3://bucket/path/to/file1.csv', 's3://bucket/path/to/file2.csv',
's3://bucket/path/to/file3.csv']}), _key='input_file')
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [I] Bug: Unable to use expand on a TaskGroup containing a @task.kubernetes_cmd task [airflow]
ashb commented on issue #56459: URL: https://github.com/apache/airflow/issues/56459#issuecomment-3468663776 I haven't made any start on this issue, I've only some some preliminary triage. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [I] Bug: Unable to use expand on a TaskGroup containing a @task.kubernetes_cmd task [airflow]
tmaurin commented on issue #56459:
URL: https://github.com/apache/airflow/issues/56459#issuecomment-3467702312
Hello @ashb,
I don't know where you are with it, but in case it can help you, I think
this issue might actually be linked to taskGroup + template variable
referencing another template variable rather than taskgroup + kubernetes_cmd
directly.
I think this dag is showcasing the issue:
```python
from airflow.sdk import BaseOperator, dag, task_group, task
from datetime import datetime
class CustomOperator(BaseOperator):
template_fields = (
"raw_input_file",
"input_file",
)
def __init__(self, input_file, **kwargs):
self.raw_input_file = input_file
# This line causes the error with MappedArgument inside of taskGroup
in Airflow 3.x
self.input_file = "{{ task.raw_input_file }}"
super().__init__(**kwargs)
def execute(self, context):
# Always print the raw file name
print(f"Raw file: {self.raw_input_file}")
# Print the raw file name if task is mapped directly
# But print
MappedArgument(_input=DictOfListsExpandInput(value={'input_file':
['s3://bucket/path/to/file1.csv', 's3://bucket/path/to/file2.csv',
's3://bucket/path/to/file3.csv']}), _key='input_file')
# if task is mapped inside of taskGroup
print(f"Derived file: {self.input_file}")
return self.raw_input_file
@dag(
dag_id="test_mapped_argument_split_issue",
start_date=datetime(2025, 1, 1),
schedule=None,
catchup=False,
)
def test_mapped_argument_split():
"""Reproduce the MappedArgument split error"""
input_files = [
"s3://bucket/path/to/file1.csv",
"s3://bucket/path/to/file2.csv",
"s3://bucket/path/to/file3.csv",
]
@task_group()
def test_group(
input_file: str,
) -> CustomOperator:
return CustomOperator(input_file=input_file,
task_id="custom_operator")
test_group.expand(input_file=input_files)
CustomOperator.partial(task_id="custom_operator").expand(input_file=input_files)
test_dag = test_mapped_argument_split()
```
This is the resulting graph:
https://github.com/user-attachments/assets/484d7af1-9ab5-4abe-b21e-96066979bcbf";
/>
The mapped task though the test_group have a log along this line:
```
Raw file: s3://bucket/path/to/file3.csv
Derived file:
MappedArgument(_input=DictOfListsExpandInput(value={'input_file':
['s3://bucket/path/to/file1.csv', 's3://bucket/path/to/file2.csv',
's3://bucket/path/to/file3.csv']}), _key='input_file')
```
While the task mapped directly have a log along this line:
```
Raw file: s3://bucket/path/to/file3.csv
Derived file: s3://bucket/path/to/file3.csv
```
So mapping directly the task work as expected/like in 2.X while mapping
through a taskgroup does not.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [I] Bug: Unable to use expand on a TaskGroup containing a @task.kubernetes_cmd task [airflow]
SoinSoin commented on issue #56459:
URL: https://github.com/apache/airflow/issues/56459#issuecomment-3441205841
> Can someone please include the full error?
`ERROR - Task failed with exception: source="task"
TypeError: Expected echo_cmd to return a list of strings, but got ['echo',
MappedArgument(_input=DictOfListsExpandInput(value={'name':
XComArg()}), _key='name')]
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py",
line 920 in run
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py",
line 1215 in _execute_task
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/bases/operator.py",
line 397 in wrapper
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py",
line 72 in execute
File
"/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/decorators/kubernetes_cmd.py",
line 92 in _generate_cmds`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [I] Bug: Unable to use expand on a TaskGroup containing a @task.kubernetes_cmd task [airflow]
ashb commented on issue #56459: URL: https://github.com/apache/airflow/issues/56459#issuecomment-3425710049 Can someone please include the full error? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [I] Bug: Unable to use expand on a TaskGroup containing a @task.kubernetes_cmd task [airflow]
Nonolabiscotte commented on issue #56459: URL: https://github.com/apache/airflow/issues/56459#issuecomment-3380038441 +1 Iām experiencing the same issue on Airflow -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [I] Bug: Unable to use expand on a TaskGroup containing a @task.kubernetes_cmd task [airflow]
boring-cyborg[bot] commented on issue #56459: URL: https://github.com/apache/airflow/issues/56459#issuecomment-3376951971 Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [I] Bug: Unable to use expand on a TaskGroup containing a @task.kubernetes_cmd task [airflow]
Mazen94 commented on issue #56459: URL: https://github.com/apache/airflow/issues/56459#issuecomment-3380020406 +1 Iām experiencing the same issue on Airflow -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
