potiuk commented on PR #38362: URL: https://github.com/apache/airflow/pull/38362#issuecomment-2015097229
I do not think we ever use row count after `executemany` in airflow core so it should be safe to merge. I am not 100% sure (maybe others who know SQLAlchemy better) - but I believe the only place where SQL Alchemy runs execute many for update would be in our "bulk_" methods. The only thin that `values_plus_batch` changes is that when it run bulk update it uses 'execute_batch` which does not return row-count properly. I looked at all the `bulk_*` methods of ours in core to see if they return any row count or make use of it, but I could not find any. The only place I could find that the bulk method would return anything is (in dag.py) ```python return cls.bulk_write_to_db(dags=dags, session=session) ``` But after closer inspection it always return None - because that's what bulk_write_to_db returns. I also looked at the `rowcount` use and the only places I could find it used is: * in delete statements (not relevant) - delete_dag.py, dag_run_endpoint.py, dataset_endpoint.py, pool_endpoint.py, * in single update statements: - dag_processin/manager.py ```python deactivated_dagmodel = session.execute( update(DagModel) .where(DagModel.dag_id.in_(to_deactivate)) .values(is_active=False) .execution_options(synchronize_session="fetch") ) deactivated = deactivated_dagmodel.rowcount ``` - scheduler_job_runner.py: ```python num_failed = session.execute( update(Job) .where( Job.job_type == "SchedulerJob", Job.state == JobState.RUNNING, Job.latest_heartbeat < (timezone.utcnow() - timedelta(seconds=timeout)), ) .values(state=JobState.FAILED) ).rowcount ``` and ```python update(TI) .where( TI.state == TaskInstanceState.DEFERRED, TI.trigger_timeout < timezone.utcnow(), ) .values( state=TaskInstanceState.SCHEDULED, next_method="__fail__", next_kwargs={"error": "Trigger/execution timeout"}, trigger_id=None, ) ``` - models/dagrun.py: ```python count += session.execute( update(TI) .where( TI.dag_id == self.dag_id, TI.run_id == self.run_id, tuple_in_condition((TI.task_id, TI.map_index), schedulable_ti_ids_chunk), ) .values(state=TaskInstanceState.SCHEDULED) .execution_options(synchronize_session=False) ).rowcount ``` and ```python update(TI) .where( TI.dag_id == self.dag_id, TI.run_id == self.run_id, tuple_in_condition((TI.task_id, TI.map_index), dummy_ti_ids_chunk), ) .values( state=TaskInstanceState.SUCCESS, start_date=timezone.utcnow(), end_date=timezone.utcnow(), duration=0, ) .execution_options( synchronize_session=False, ) ).rowcount ``` Here is the doc, > 'values_plus_batch'- SQLAlchemy’s native [insertmanyvalues](https://docs.sqlalchemy.org/en/20/core/connections.html#engine-insertmanyvalues) handler is used for qualifying INSERT statements, assuming [create_engine.use_insertmanyvalues](https://docs.sqlalchemy.org/en/20/core/engines.html#sqlalchemy.create_engine.params.use_insertmanyvalues) is left at its default value of True. Then, psycopg2’s execute_batch() handler is used for qualifying UPDATE and DELETE statements when executed with multiple parameter sets. When using this mode, the [CursorResult.rowcount](https://docs.sqlalchemy.org/en/20/core/connections.html#sqlalchemy.engine.CursorResult.rowcount) attribute will not contain a value for executemany-style executions against UPDATE and DELETE statements. It would be good if others (@ashb @dstandish @kaxil @uranusjr could take a look as well and see if they can confirm that we should not be affected. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org