pwadekar-yd opened a new issue, #63178:
URL: https://github.com/apache/airflow/issues/63178
### Apache Airflow version
Other Airflow 3 version (please specify below)
### If "Other Airflow 3 version" selected, which one?
3.1.5
### What happened?
I am migrating Airflow from 2.8.1 to 3.1.5. While running the airflow db
migrate command, facing a lot of issues in the dag run conf bytes to jsonb
conversion (`0055_3_0_0_remove_pickled_data_from_dagrun_table.py`).
The batch size is only 1000, and I have 1.5M records, so it is taking
forever. I am running it on an RDS snapshot from local, and suddenly, after a
few hours getting the below error.
```
could not convert dag run conf for 547 records. batch=50001
sqlalchemy.exc.PendingRollbackError: Can't reconnect until the invalid
transaction is rolled back. Please rollback() fully before proceedingI
```
It may be a network error, but can the migration code handle this?
Have already run the airflow db clean, and I have increased the batch size
to 10K, still facing the same issue.
Any suggestions are welcome. Would really appreciate the help!!
### What you think should happen instead?
Right now, it fails the entire migration if any record fails. I think it
should either suppress the error or reprocess the records later.
Right now code is commiting single record in a loop. We can change the
function to handle a higher batch size and make the batch size configurable.
I wrote the below migration script with bulk commit, and it ran within 10
mins. The migration code estimated time was 3 days.
Can you please check why we have not taken the bulk approach instead?
```
def upgrade():
"""Apply remove pickled data from dagrun table."""
conn = op.get_bind()
empty_vals = {
"mysql": "X'80057D942E'",
"postgresql": r"'\x80057D942E'",
"sqlite": "X'80057D942E'",
}
dialect = conn.dialect.name
try:
empty_val = empty_vals[dialect]
except KeyError:
raise RuntimeError(f"Dialect {dialect} not supported.")
conf_type = sa.JSON().with_variant(postgresql.JSONB, "postgresql")
op.add_column("dag_run", sa.Column("conf_json", conf_type,
nullable=True))
if context.is_offline_mode():
print(
dedent("""
------------
-- WARNING: Unable to migrate the data in the 'conf' column
while in offline mode!
-- The 'conf' column will be set to NULL in offline mode.
-- Avoid using offline mode if you need to retain 'conf' values.
------------
""")
)
else:
# Optimized: cursor-based pagination (no OFFSET) + bulk UPDATE per
batch
BATCH_SIZE = 50000
last_id = 0
batch_num = 0
total_converted = 0
total_failed = 0
while True:
batch_num += 1
print(f"converting dag run conf. batch={batch_num},
last_id={last_id}")
rows = conn.execute(
text(
"SELECT id, conf "
"FROM dag_run "
"WHERE id > :last_id "
"AND conf IS NOT NULL "
f"AND conf != {empty_val} "
"ORDER BY id "
"LIMIT :batch_size"
),
{"last_id": last_id, "batch_size": BATCH_SIZE},
).fetchall()
if not rows:
break
# Collect all successful conversions in one pass
updates = []
err_count = 0
for row_id, pickle_data in rows:
try:
original_data = pickle.loads(pickle_data)
json_data = json.dumps(original_data)
updates.append({"json_data": json_data, "id": row_id})
except Exception:
err_count += 1
# Single bulk UPDATE for the entire batch instead of N
individual UPDATEs
if updates:
sp = conn.begin_nested()
try:
conn.execute(
text("""
UPDATE dag_run
SET conf_json = :json_data
WHERE id = :id
"""),
updates, # SQLAlchemy executemany — single
roundtrip batch
)
sp.commit()
except Exception as e:
sp.rollback()
print(f"bulk update failed for batch={batch_num},
falling back to row-by-row: {e}")
for upd in updates:
try:
sp2 = conn.begin_nested()
conn.execute(
text("UPDATE dag_run SET conf_json =
:json_data WHERE id = :id"),
upd,
)
sp2.commit()
except Exception:
sp2.rollback()
err_count += 1
total_converted += len(updates)
total_failed += err_count
last_id = rows[-1][0]
if err_count:
print(f"could not convert {err_count} records in
batch={batch_num}")
print(f"batch={batch_num} done. converted={len(updates)},
failed={err_count}, total_converted={total_converted},
total_failed={total_failed}")
op.drop_column("dag_run", "conf")
op.alter_column("dag_run", "conf_json", existing_type=conf_type,
new_column_name="conf")
```
### How to reproduce
Just add random data with bigger 50 JSON fields in the `conf` column in
`dag_run` table
### Operating System
Mac Apple M4 (Local)
### Versions of Apache Airflow Providers
_No response_
### Deployment
Docker-Compose
### Deployment details
ECS
### Anything else?
_No response_
### Are you willing to submit PR?
- [x] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]