uranusjr commented on code in PR #30641: URL: https://github.com/apache/airflow/pull/30641#discussion_r1169414049
########## airflow/models/xcom_arg.py: ########## @@ -309,11 +310,26 @@ def zip(self, *others: XComArg, fillvalue: Any = NOTSET) -> ZipXComArg: return super().zip(*others, fillvalue=fillvalue) def get_task_map_length(self, run_id: str, *, session: Session) -> int | None: + from airflow.models.taskinstance import TaskInstance from airflow.models.taskmap import TaskMap from airflow.models.xcom import XCom task = self.operator if isinstance(task, MappedOperator): + unfinished_ti_count_query = session.query(func.count(TaskInstance.map_index)).filter( + TaskInstance.dag_id == task.dag_id, + TaskInstance.run_id == run_id, + TaskInstance.task_id == task.task_id, + # Special NULL treatment is needed because 'state' can be NULL. + # The "IN" part would produce "NULL NOT IN ..." and eventually + # "NULl = NULL", which is a big no-no in SQL. + or_( + TaskInstance.state.is_(None), + TaskInstance.state.in_(s.value for s in State.unfinished if s is not None), Review Comment: Do you mean `filter(None, State.unfinished)`? But we still need to get the `value` part in, `map(lambda s: s.value, filter(...))` is significantly worse IMO. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org