potiuk commented on PR #38362:
URL: https://github.com/apache/airflow/pull/38362#issuecomment-2015097229

   I do not think we ever use row count after `executemany` in airflow core so 
it should be safe to merge.
   
   I am not 100% sure (maybe others who know SQLAlchemy better) - but I believe 
the only place where SQL Alchemy runs execute many for update would be in our 
"bulk_" methods. 
   
   The only thin that `values_plus_batch` changes is that when it run bulk 
update it uses 'execute_batch` which does not return row-count properly. I 
looked at all the `bulk_*` methods of ours in core to see if they return any 
row count or make use of it, but I could not find any.
   
   The only place I could find that the bulk method would return anything is  
(in dag.py) 
   
   ```python
   return cls.bulk_write_to_db(dags=dags, session=session)
   ```
   
   But after closer inspection it always return None - because that's what 
bulk_write_to_db returns.
   
   
   I also looked at the `rowcount` use and the only places I could find it used 
is:
   
   * in delete statements (not relevant) - delete_dag.py, dag_run_endpoint.py, 
dataset_endpoint.py, pool_endpoint.py, 
   * in single update statements:
     - dag_processin/manager.py
   
   ```python
               deactivated_dagmodel = session.execute(
                   update(DagModel)
                   .where(DagModel.dag_id.in_(to_deactivate))
                   .values(is_active=False)
                   .execution_options(synchronize_session="fetch")
               )
               deactivated = deactivated_dagmodel.rowcount
   ```
   
   - scheduler_job_runner.py:
   
   ```python
                       num_failed = session.execute(
                           update(Job)
                           .where(
                               Job.job_type == "SchedulerJob",
                               Job.state == JobState.RUNNING,
                               Job.latest_heartbeat < (timezone.utcnow() - 
timedelta(seconds=timeout)),
                           )
                           .values(state=JobState.FAILED)
                       ).rowcount
   ```
   
   and
   
   ```python
               update(TI)
               .where(
                   TI.state == TaskInstanceState.DEFERRED,
                   TI.trigger_timeout < timezone.utcnow(),
               )
               .values(
                   state=TaskInstanceState.SCHEDULED,
                   next_method="__fail__",
                   next_kwargs={"error": "Trigger/execution timeout"},
                   trigger_id=None,
               )
   ```
     - models/dagrun.py:
   
   ```python
                   count += session.execute(
                       update(TI)
                       .where(
                           TI.dag_id == self.dag_id,
                           TI.run_id == self.run_id,
                           tuple_in_condition((TI.task_id, TI.map_index), 
schedulable_ti_ids_chunk),
                       )
                       .values(state=TaskInstanceState.SCHEDULED)
                       .execution_options(synchronize_session=False)
                   ).rowcount
   ```
   
   and
   
   ```python
                       update(TI)
                       .where(
                           TI.dag_id == self.dag_id,
                           TI.run_id == self.run_id,
                           tuple_in_condition((TI.task_id, TI.map_index), 
dummy_ti_ids_chunk),
                       )
                       .values(
                           state=TaskInstanceState.SUCCESS,
                           start_date=timezone.utcnow(),
                           end_date=timezone.utcnow(),
                           duration=0,
                       )
                       .execution_options(
                           synchronize_session=False,
                       )
                   ).rowcount
   ```
   
   
   Here is the doc,
   
   > 'values_plus_batch'- SQLAlchemy’s native 
[insertmanyvalues](https://docs.sqlalchemy.org/en/20/core/connections.html#engine-insertmanyvalues)
 handler is used for qualifying INSERT statements, assuming 
[create_engine.use_insertmanyvalues](https://docs.sqlalchemy.org/en/20/core/engines.html#sqlalchemy.create_engine.params.use_insertmanyvalues)
 is left at its default value of True. Then, psycopg2’s execute_batch() handler 
is used for qualifying UPDATE and DELETE statements when executed with multiple 
parameter sets. When using this mode, the 
[CursorResult.rowcount](https://docs.sqlalchemy.org/en/20/core/connections.html#sqlalchemy.engine.CursorResult.rowcount)
 attribute will not contain a value for executemany-style executions against 
UPDATE and DELETE statements.
   
   It would be good if others (@ashb @dstandish @kaxil  @uranusjr could take a 
look as well and see if they can confirm that we should not be affected.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to