[GitHub] [airflow] ashb commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

2022-04-11 Thread GitBox


ashb commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847719456


##
tests/models/test_dagrun.py:
##
@@ -1036,14 +1036,17 @@ def 
test_mapped_mixed__literal_not_expanded_at_create(dag_maker, session):
 mapped = MockOperator.partial(task_id='task_2').expand(arg1=literal, 
arg2=XComArg(task))
 
 dr = dag_maker.create_dagrun()
-indices = (
-session.query(TI.map_index)
+query = (
+session.query(TI.map_index, TI.state)
 .filter_by(task_id=mapped.task_id, dag_id=mapped.dag_id, 
run_id=dr.run_id)
 .order_by(TI.map_index)
-.all()
 )
 
-assert indices == [(-1,)]
+assert query.all() == [(-1, None)]
+
+# Verify_integrity shouldn't change the result now that the TIs exist
+dr.verify_integrity()
+assert query.all() == [(-1, None)]

Review Comment:
   Oh yes. Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] ashb commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

2022-04-11 Thread GitBox


ashb commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847715879


##
tests/models/test_dagrun.py:
##
@@ -1036,14 +1036,17 @@ def 
test_mapped_mixed__literal_not_expanded_at_create(dag_maker, session):
 mapped = MockOperator.partial(task_id='task_2').expand(arg1=literal, 
arg2=XComArg(task))
 
 dr = dag_maker.create_dagrun()
-indices = (
-session.query(TI.map_index)
+query = (
+session.query(TI.map_index, TI.state)
 .filter_by(task_id=mapped.task_id, dag_id=mapped.dag_id, 
run_id=dr.run_id)
 .order_by(TI.map_index)
-.all()
 )
 
-assert indices == [(-1,)]
+assert query.all() == [(-1, None)]
+
+# Verify_integrity shouldn't change the result now that the TIs exist
+dr.verify_integrity()
+assert query.all() == [(-1, None)]

Review Comment:
   It has a flush inside it already 
https://github.com/apache/airflow/blob/9c9272de3a32d30d831fac1272a07244d5fb8e0b/airflow/models/dagrun.py#L903



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] ashb commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

2022-04-11 Thread GitBox


ashb commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847700088


##
airflow/models/mappedoperator.py:
##
@@ -559,7 +562,12 @@ def _get_map_lengths(self, run_id: str, *, session: 
Session) -> Dict[str, int]:
 for task_id, length in xcom_query:
 for mapped_arg_name in mapped_dep_keys[task_id]:
 map_lengths[mapped_arg_name] += length
+return map_lengths
 
+def _resolve_map_lengths(self, run_id: str, *, session: Session) -> 
Dict[str, int]:
+"""Return dict of argument name to map length, or throw if some are 
not resolvable"""
+expansion_kwargs = self._get_expansion_kwargs()
+map_lengths = self._get_map_lengths(run_id, session=session)

Review Comment:
   Done:
   
   > """Return dict of argument name to map length.
   >
   >If any arguments are not known right now (upstream task not 
finished) they will not be present in the
   >dict.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] ashb commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

2022-04-11 Thread GitBox


ashb commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847687365


##
airflow/decorators/base.py:
##
@@ -406,6 +406,9 @@ class DecoratedMappedOperator(MappedOperator):
 # in partial_kwargs, and MappedOperator prevents duplication.
 mapped_op_kwargs: Dict[str, "Mappable"]
 
+def __hash__(self):

Review Comment:
   This was needed for the `@cache` decorator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] ashb commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

2022-04-11 Thread GitBox


ashb commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847653426


##
airflow/serialization/serialized_objects.py:
##
@@ -1081,12 +1081,12 @@ def deserialize_dag(cls, encoded_dag: Dict[str, Any]) 
-> 'SerializedDAG':
 setattr(task.subdag, 'parent_dag', dag)
 
 if isinstance(task, MappedOperator):
-for d in (task.mapped_kwargs, task.partial_kwargs):
-for k, v in d.items():
-if not isinstance(v, _XComRef):
-continue
+expansion_kwargs = task._get_expansion_kwargs()

Review Comment:
   Ah, yeah that makes sense. When the LocalTaskJob checks this it has the real 
DAG.
   
   It's just my change here now makes it be accessed via verify_integrity



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] ashb commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

2022-04-11 Thread GitBox


ashb commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847645333


##
airflow/serialization/serialized_objects.py:
##
@@ -1081,12 +1081,12 @@ def deserialize_dag(cls, encoded_dag: Dict[str, Any]) 
-> 'SerializedDAG':
 setattr(task.subdag, 'parent_dag', dag)
 
 if isinstance(task, MappedOperator):
-for d in (task.mapped_kwargs, task.partial_kwargs):
-for k, v in d.items():
-if not isinstance(v, _XComRef):
-continue
+expansion_kwargs = task._get_expansion_kwargs()

Review Comment:
   This was a bug that I don't know how we didn't hit before? The serialized 
task as a result had these objects left as _XcomRef, which as it is a named 
tuple has a `__len__` of 2 😱 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] ashb commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

2022-04-11 Thread GitBox


ashb commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847446426


##
airflow/models/dagrun.py:
##
@@ -838,14 +838,24 @@ def verify_integrity(self, session: Session = 
NEW_SESSION):
 ti.state = State.REMOVED
 continue
 
-if task.is_mapped:
-task = cast("MappedOperator", task)
-num_mapped_tis = task.parse_time_mapped_ti_count
-# Check if the number of mapped literals has changed and we 
need to mark this TI as removed
-if not num_mapped_tis or ti.map_index >= num_mapped_tis:
+if not task.is_mapped:
+continue
+task = cast("MappedOperator", task)
+num_mapped_tis = task.parse_time_mapped_ti_count
+# Check if the number of mapped literals has changed and we need 
to mark this TI as removed
+if num_mapped_tis is not None:
+if ti.map_index >= num_mapped_tis:
+self.log.debug(
+"Removing task '%s' as the map_index is longer than 
the literal list (%s)",
+ti,
+num_mapped_tis,
+)
 ti.state = State.REMOVED
 elif ti.map_index < 0:
+self.log.debug("Removing the unmapped TI '%s' as the 
mapping can now be performed", ti)
 ti.state = State.REMOVED
+# TODO: What if it is _now_ None, but wasn't before? How do we 
detect that? And how do we detect

Review Comment:
   Turns out need to fix this -- it's already tested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [airflow] ashb commented on a diff in pull request #22909: Fix bug where dynamically mapped tasks got set to REMOVED

2022-04-11 Thread GitBox


ashb commented on code in PR #22909:
URL: https://github.com/apache/airflow/pull/22909#discussion_r847420061


##
airflow/models/dagrun.py:
##
@@ -838,14 +838,24 @@ def verify_integrity(self, session: Session = 
NEW_SESSION):
 ti.state = State.REMOVED
 continue
 
-if task.is_mapped:
-task = cast("MappedOperator", task)
-num_mapped_tis = task.parse_time_mapped_ti_count
-# Check if the number of mapped literals has changed and we 
need to mark this TI as removed
-if not num_mapped_tis or ti.map_index >= num_mapped_tis:
+if not task.is_mapped:
+continue
+task = cast("MappedOperator", task)
+num_mapped_tis = task.parse_time_mapped_ti_count
+# Check if the number of mapped literals has changed and we need 
to mark this TI as removed
+if num_mapped_tis is not None:
+if ti.map_index >= num_mapped_tis:
+self.log.debug(

Review Comment:
   This was already covered from before, I've just reworked the conditional 
https://github.com/apache/airflow/blob/f662b7de8c5e61f640f150d4e68bde21dcdd09b4/tests/models/test_dagrun.py#L953-L986



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org