[GitHub] [airflow] uranusjr commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

2022-04-11 Thread GitBox


uranusjr commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847717386


##
tests/models/test_dagrun.py:
##
@@ -1036,14 +1036,17 @@ def 
test_mapped_mixed__literal_not_expanded_at_create(dag_maker, session):
 mapped = MockOperator.partial(task_id='task_2').expand(arg1=literal, 
arg2=XComArg(task))
 
 dr = dag_maker.create_dagrun()
-indices = (
-session.query(TI.map_index)
+query = (
+session.query(TI.map_index, TI.state)
 .filter_by(task_id=mapped.task_id, dag_id=mapped.dag_id, 
run_id=dr.run_id)
 .order_by(TI.map_index)
-.all()
 )
 
-assert indices == [(-1,)]
+assert query.all() == [(-1, None)]
+
+# Verify_integrity shouldn't change the result now that the TIs exist
+dr.verify_integrity()
+assert query.all() == [(-1, None)]

Review Comment:
   Passing in `session` should be enough then



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] uranusjr commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

2022-04-11 Thread GitBox


uranusjr commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847713698


##
tests/models/test_dagrun.py:
##
@@ -1036,14 +1036,17 @@ def 
test_mapped_mixed__literal_not_expanded_at_create(dag_maker, session):
 mapped = MockOperator.partial(task_id='task_2').expand(arg1=literal, 
arg2=XComArg(task))
 
 dr = dag_maker.create_dagrun()
-indices = (
-session.query(TI.map_index)
+query = (
+session.query(TI.map_index, TI.state)
 .filter_by(task_id=mapped.task_id, dag_id=mapped.dag_id, 
run_id=dr.run_id)
 .order_by(TI.map_index)
-.all()
 )
 
-assert indices == [(-1,)]
+assert query.all() == [(-1, None)]
+
+# Verify_integrity shouldn't change the result now that the TIs exist
+dr.verify_integrity()
+assert query.all() == [(-1, None)]

Review Comment:
   We probably need to do this?
   
   ```suggestion
   # Verify_integrity shouldn't change the result now that the TIs exist
   dr.verify_integrity(session=session)
   session.flush()
   assert query.all() == [(-1, None)]
   ```
   
   Otherwise `verify_integrity` would use a new session, and the query result 
won’t change regardless due to transaction isolation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] uranusjr commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

2022-04-11 Thread GitBox


uranusjr commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847689404


##
airflow/models/mappedoperator.py:
##
@@ -559,7 +562,12 @@ def _get_map_lengths(self, run_id: str, *, session: 
Session) -> Dict[str, int]:
 for task_id, length in xcom_query:
 for mapped_arg_name in mapped_dep_keys[task_id]:
 map_lengths[mapped_arg_name] += length
+return map_lengths
 
+def _resolve_map_lengths(self, run_id: str, *, session: Session) -> 
Dict[str, int]:
+"""Return dict of argument name to map length, or throw if some are 
not resolvable"""
+expansion_kwargs = self._get_expansion_kwargs()
+map_lengths = self._get_map_lengths(run_id, session=session)

Review Comment:
   We should document `_get_map_lengths` to explain how it’s different from 
`_resolve_map_lengths`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] uranusjr commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

2022-04-11 Thread GitBox


uranusjr commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847650700


##
airflow/serialization/serialized_objects.py:
##
@@ -1081,12 +1081,12 @@ def deserialize_dag(cls, encoded_dag: Dict[str, Any]) 
-> 'SerializedDAG':
 setattr(task.subdag, 'parent_dag', dag)
 
 if isinstance(task, MappedOperator):
-for d in (task.mapped_kwargs, task.partial_kwargs):
-for k, v in d.items():
-if not isinstance(v, _XComRef):
-continue
+expansion_kwargs = task._get_expansion_kwargs()

Review Comment:
   Because those attributes _shouldn’t_ be accessed by the scheduler anyway? So 
this likely only caused the task to be rendered incorrectly in UI.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org