Re: [PR] Create TaskMap rows when pushing XCom values from the Task Exec Interface [airflow]

2025-01-31 Thread via GitHub


ashb merged PR #46319:
URL: https://github.com/apache/airflow/pull/46319


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Create TaskMap rows when pushing XCom values from the Task Exec Interface [airflow]

2025-01-31 Thread via GitHub


ashb commented on code in PR #46319:
URL: https://github.com/apache/airflow/pull/46319#discussion_r1937512117


##
task_sdk/src/airflow/sdk/definitions/mappedoperator.py:
##
@@ -136,6 +137,22 @@ def ensure_xcomarg_return_value(arg: Any) -> None:
 ensure_xcomarg_return_value(v)
 
 
+def is_mappable_value(value: Any) -> TypeGuard[Collection]:
+"""
+Whether a value can be used for task mapping.
+
+We only allow collections with guaranteed ordering, but exclude character
+sequences since that's usually not what users would expect to be mappable.
+
+:meta private:
+"""
+if not isinstance(value, (Sequence, dict)):
+return False
+if isinstance(value, (bytearray, bytes, str)):
+return False
+return True

Review Comment:
   We didn't before. I just copied this (in my next massive PR the other one 
will be deleted.)
   
   Let me update the source (somewhere in TI.py I think) to use this version



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Create TaskMap rows when pushing XCom values from the Task Exec Interface [airflow]

2025-01-31 Thread via GitHub


jedcunningham commented on code in PR #46319:
URL: https://github.com/apache/airflow/pull/46319#discussion_r1937505479


##
task_sdk/src/airflow/sdk/definitions/mappedoperator.py:
##
@@ -136,6 +137,22 @@ def ensure_xcomarg_return_value(arg: Any) -> None:
 ensure_xcomarg_return_value(v)
 
 
+def is_mappable_value(value: Any) -> TypeGuard[Collection]:
+"""
+Whether a value can be used for task mapping.
+
+We only allow collections with guaranteed ordering, but exclude character
+sequences since that's usually not what users would expect to be mappable.
+
+:meta private:
+"""
+if not isinstance(value, (Sequence, dict)):
+return False
+if isinstance(value, (bytearray, bytes, str)):
+return False
+return True

Review Comment:
   What about a set? That should be false too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Create TaskMap rows when pushing XCom values from the Task Exec Interface [airflow]

2025-01-31 Thread via GitHub


ashb commented on code in PR #46319:
URL: https://github.com/apache/airflow/pull/46319#discussion_r1937506824


##
airflow/api_fastapi/execution_api/routes/xcoms.py:
##
@@ -152,14 +170,30 @@ def set_xcom(
 },
 )
 
-if not has_xcom_access(key, token):
-raise HTTPException(
-status_code=status.HTTP_403_FORBIDDEN,
-detail={
-"reason": "access_denied",
-"message": f"Task does not have access to set XCom key 
'{key}'",
-},
+if mapped_length is not None:
+task_map = TaskMap(
+dag_id=dag_id,
+task_id=task_id,
+run_id=run_id,
+map_index=map_index,
+length=mapped_length,
+keys=None,
 )
+max_map_length = conf.getint("core", "max_map_length", fallback=1024)
+if task_map.length > max_map_length:
+raise HTTPException(
+status_code=status.HTTP_400_BAD_REQUEST,
+detail={
+"reason": "unmappable_return_value_length",
+"message": "pushed value is too large to map as a 
downstream's dependency",
+},
+)
+session.add(task_map)
+
+# else:
+# TODO: Can/should we check if a client _hasn't_ provided this for an 
upstream of a mapped task? That
+# means loading the serialized dag and that seems like a relatively costly 
operation for minimal benefit
+# (the mapped task would fail in a moment as it can't be expanded anyway.)

Review Comment:
   Yeah exactly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Create TaskMap rows when pushing XCom values from the Task Exec Interface [airflow]

2025-01-31 Thread via GitHub


ashb commented on code in PR #46319:
URL: https://github.com/apache/airflow/pull/46319#discussion_r1937507968


##
airflow/api_fastapi/execution_api/routes/xcoms.py:
##
@@ -152,14 +170,30 @@ def set_xcom(
 },
 )
 
-if not has_xcom_access(key, token):
-raise HTTPException(
-status_code=status.HTTP_403_FORBIDDEN,
-detail={
-"reason": "access_denied",
-"message": f"Task does not have access to set XCom key 
'{key}'",
-},
+if mapped_length is not None:
+task_map = TaskMap(
+dag_id=dag_id,
+task_id=task_id,
+run_id=run_id,
+map_index=map_index,
+length=mapped_length,
+keys=None,
 )
+max_map_length = conf.getint("core", "max_map_length", fallback=1024)
+if task_map.length > max_map_length:
+raise HTTPException(
+status_code=status.HTTP_400_BAD_REQUEST,
+detail={
+"reason": "unmappable_return_value_length",
+"message": "pushed value is too large to map as a 
downstream's dependency",
+},
+)
+session.add(task_map)
+
+# else:
+# TODO: Can/should we check if a client _hasn't_ provided this for an 
upstream of a mapped task? That
+# means loading the serialized dag and that seems like a relatively costly 
operation for minimal benefit
+# (the mapped task would fail in a moment as it can't be expanded anyway.)

Review Comment:
   One thing we might be able to do is to restructure the serdag format to be 
able to efficiently find this info via JSON queries _in_ the field in the DB, 
meaning we could query for certain facets directly without having to load the 
full DAG into python memory



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Create TaskMap rows when pushing XCom values from the Task Exec Interface [airflow]

2025-01-31 Thread via GitHub


amoghrajesh commented on code in PR #46319:
URL: https://github.com/apache/airflow/pull/46319#discussion_r1937488045


##
airflow/api_fastapi/execution_api/routes/xcoms.py:
##
@@ -104,6 +105,8 @@ def get_xcom(
 return XComResponse(key=key, value=xcom_value)
 
 
+# TODO:once we have JWT tokens, then remove the dag/run/task ids from the URL 
and just use the info in the

Review Comment:
   ```suggestion
   # TODO: once we have JWT tokens, then remove the dag_id/run_id/task_id from 
the URL and just use the info in the
   ```



##
airflow/api_fastapi/execution_api/routes/xcoms.py:
##
@@ -184,13 +218,16 @@ def set_xcom(
 return {"message": "XCom successfully set"}
 
 
-def has_xcom_access(xcom_key: str, token: TIToken) -> bool:
+def has_xcom_access(
+dag_id: str, run_id: str, task_id: str, xcom_key: str, token: TIToken, 
write: bool = False

Review Comment:
   Can we make it `write_access: bool = False` to be in line with variables? Or 
vice verse is fine too



##
airflow/api_fastapi/execution_api/routes/xcoms.py:
##
@@ -152,14 +170,30 @@ def set_xcom(
 },
 )
 
-if not has_xcom_access(key, token):
-raise HTTPException(
-status_code=status.HTTP_403_FORBIDDEN,
-detail={
-"reason": "access_denied",
-"message": f"Task does not have access to set XCom key 
'{key}'",
-},
+if mapped_length is not None:
+task_map = TaskMap(
+dag_id=dag_id,
+task_id=task_id,
+run_id=run_id,
+map_index=map_index,
+length=mapped_length,
+keys=None,
 )
+max_map_length = conf.getint("core", "max_map_length", fallback=1024)
+if task_map.length > max_map_length:
+raise HTTPException(
+status_code=status.HTTP_400_BAD_REQUEST,
+detail={
+"reason": "unmappable_return_value_length",
+"message": "pushed value is too large to map as a 
downstream's dependency",
+},
+)
+session.add(task_map)
+
+# else:
+# TODO: Can/should we check if a client _hasn't_ provided this for an 
upstream of a mapped task? That
+# means loading the serialized dag and that seems like a relatively costly 
operation for minimal benefit
+# (the mapped task would fail in a moment as it can't be expanded anyway.)

Review Comment:
   Loading serdag is a route we should avoid if we can. Its super expensive and 
doing it in an API is just going to slow down the response significantly



##
airflow/api_fastapi/execution_api/routes/xcoms.py:
##
@@ -104,6 +105,8 @@ def get_xcom(
 return XComResponse(key=key, value=xcom_value)
 
 
+# TODO:once we have JWT tokens, then remove the dag/run/task ids from the URL 
and just use the info in the

Review Comment:
   Nit



##
airflow/api_fastapi/execution_api/routes/xcoms.py:
##
@@ -184,13 +218,16 @@ def set_xcom(
 return {"message": "XCom successfully set"}
 
 
-def has_xcom_access(xcom_key: str, token: TIToken) -> bool:
+def has_xcom_access(
+dag_id: str, run_id: str, task_id: str, xcom_key: str, token: TIToken, 
write: bool = False

Review Comment:
   Cool!



##
airflow/api_fastapi/execution_api/routes/xcoms.py:
##
@@ -152,14 +170,30 @@ def set_xcom(
 },
 )
 
-if not has_xcom_access(key, token):
-raise HTTPException(
-status_code=status.HTTP_403_FORBIDDEN,
-detail={
-"reason": "access_denied",
-"message": f"Task does not have access to set XCom key 
'{key}'",
-},
+if mapped_length is not None:
+task_map = TaskMap(
+dag_id=dag_id,
+task_id=task_id,
+run_id=run_id,
+map_index=map_index,
+length=mapped_length,
+keys=None,
 )
+max_map_length = conf.getint("core", "max_map_length", fallback=1024)
+if task_map.length > max_map_length:
+raise HTTPException(
+status_code=status.HTTP_400_BAD_REQUEST,
+detail={
+"reason": "unmappable_return_value_length",
+"message": "pushed value is too large to map as a 
downstream's dependency",
+},
+)
+session.add(task_map)
+
+# else:
+# TODO: Can/should we check if a client _hasn't_ provided this for an 
upstream of a mapped task? That
+# means loading the serialized dag and that seems like a relatively costly 
operation for minimal benefit
+# (the mapped task would fail in a moment as it can't be expanded anyway.)

Review Comment:
   Might also affect tasks that have `execution_timeout` set



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use t

Re: [PR] Create TaskMap rows when pushing XCom values from the Task Exec Interface [airflow]

2025-01-31 Thread via GitHub


ashb commented on code in PR #46319:
URL: https://github.com/apache/airflow/pull/46319#discussion_r1937414799


##
airflow/api_fastapi/execution_api/routes/xcoms.py:
##
@@ -184,13 +218,16 @@ def set_xcom(
 return {"message": "XCom successfully set"}
 
 
-def has_xcom_access(xcom_key: str, token: TIToken) -> bool:
+def has_xcom_access(
+dag_id: str, run_id: str, task_id: str, xcom_key: str, token: TIToken, 
write: bool = False

Review Comment:
   Nothing using it yet, but I did add the idea of read vs write access here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org