pwadekar-yd opened a new issue, #63178:
URL: https://github.com/apache/airflow/issues/63178

   ### Apache Airflow version
   
   Other Airflow 3 version (please specify below)
   
   ### If "Other Airflow 3 version" selected, which one?
   
   3.1.5
   
   ### What happened?
   
   I am migrating Airflow from 2.8.1 to 3.1.5. While running the airflow db 
migrate command, facing a lot of issues in the dag run conf bytes to jsonb 
conversion (`0055_3_0_0_remove_pickled_data_from_dagrun_table.py`).
   
   The batch size is only 1000, and I have 1.5M records, so it is taking 
forever. I am running it on an RDS snapshot from local, and suddenly, after a 
few hours getting the below error.
   
   ```
   could not convert dag run conf for 547 records. batch=50001
   sqlalchemy.exc.PendingRollbackError: Can't reconnect until the invalid 
transaction is rolled back.  Please rollback() fully before proceedingI 
   ```
   It may be a network error, but can the migration code handle this?
   Have already run the airflow db clean, and I have increased the batch size 
to 10K, still facing the same issue.
   Any suggestions are welcome. Would really appreciate the help!!
   
   ### What you think should happen instead?
   
   Right now, it fails the entire migration if any record fails. I think it 
should either suppress the error or reprocess the records later. 
   Right now code is commiting single record in a loop. We can change the 
function to handle a higher batch size and make the batch size configurable.
   I wrote the below migration script with bulk commit, and it ran within 10 
mins. The migration code estimated time was 3 days.
   Can you please check why we have not taken the bulk approach instead?
   
   ```
   def upgrade():
       """Apply remove pickled data from dagrun table."""
       conn = op.get_bind()
       empty_vals = {
           "mysql": "X'80057D942E'",
           "postgresql": r"'\x80057D942E'",
           "sqlite": "X'80057D942E'",
       }
       dialect = conn.dialect.name
       try:
           empty_val = empty_vals[dialect]
       except KeyError:
           raise RuntimeError(f"Dialect {dialect} not supported.")
   
       conf_type = sa.JSON().with_variant(postgresql.JSONB, "postgresql")
       op.add_column("dag_run", sa.Column("conf_json", conf_type, 
nullable=True))
   
       if context.is_offline_mode():
           print(
               dedent("""
               ------------
               --  WARNING: Unable to migrate the data in the 'conf' column 
while in offline mode!
               --  The 'conf' column will be set to NULL in offline mode.
               --  Avoid using offline mode if you need to retain 'conf' values.
               ------------
               """)
           )
       else:
           # Optimized: cursor-based pagination (no OFFSET) + bulk UPDATE per 
batch
           BATCH_SIZE = 50000
           last_id = 0
           batch_num = 0
           total_converted = 0
           total_failed = 0
   
           while True:
               batch_num += 1
               print(f"converting dag run conf. batch={batch_num}, 
last_id={last_id}")
   
               rows = conn.execute(
                   text(
                       "SELECT id, conf "
                       "FROM dag_run "
                       "WHERE id > :last_id "
                       "AND conf IS NOT NULL "
                       f"AND conf != {empty_val} "
                       "ORDER BY id "
                       "LIMIT :batch_size"
                   ),
                   {"last_id": last_id, "batch_size": BATCH_SIZE},
               ).fetchall()
   
               if not rows:
                   break
   
               # Collect all successful conversions in one pass
               updates = []
               err_count = 0
               for row_id, pickle_data in rows:
                   try:
                       original_data = pickle.loads(pickle_data)
                       json_data = json.dumps(original_data)
                       updates.append({"json_data": json_data, "id": row_id})
                   except Exception:
                       err_count += 1
   
               # Single bulk UPDATE for the entire batch instead of N 
individual UPDATEs
               if updates:
                   sp = conn.begin_nested()
                   try:
                       conn.execute(
                           text("""
                               UPDATE dag_run
                               SET conf_json = :json_data
                               WHERE id = :id
                           """),
                           updates,  # SQLAlchemy executemany — single 
roundtrip batch
                       )
                       sp.commit()
                   except Exception as e:
                       sp.rollback()
                       print(f"bulk update failed for batch={batch_num}, 
falling back to row-by-row: {e}")
                       for upd in updates:
                           try:
                               sp2 = conn.begin_nested()
                               conn.execute(
                                   text("UPDATE dag_run SET conf_json = 
:json_data WHERE id = :id"),
                                   upd,
                               )
                               sp2.commit()
                           except Exception:
                               sp2.rollback()
                               err_count += 1
   
               total_converted += len(updates)
               total_failed += err_count
               last_id = rows[-1][0]
   
               if err_count:
                   print(f"could not convert {err_count} records in 
batch={batch_num}")
               print(f"batch={batch_num} done. converted={len(updates)}, 
failed={err_count}, total_converted={total_converted}, 
total_failed={total_failed}")
   
       op.drop_column("dag_run", "conf")
       op.alter_column("dag_run", "conf_json", existing_type=conf_type, 
new_column_name="conf")
   ```
   
   ### How to reproduce
   
   Just add random data with bigger 50 JSON fields in the `conf` column in 
`dag_run` table
   
   ### Operating System
   
   Mac Apple M4 (Local)
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   ECS
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to