This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 8c956756e7b60ee265a73309cb3a245966a7477c
Author: Kaxil Naik <[email protected]>
AuthorDate: Thu Feb 25 02:56:40 2021 +0000

    Make TaskInstance.pool_slots not nullable with a default of 1 (#14406)
    
    closes https://github.com/apache/airflow/issues/13799
    
    Without it the migration from 1.10.14 to 2.0.0 can fail with following 
error for old TIs:
    
    ```
    Traceback (most recent call last):
      File 
"/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 
1275, in _execute
        self._run_scheduler_loop()
      File 
"/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 
1377, in _run_scheduler_loop
        num_queued_tis = self._do_scheduling(session)
      File 
"/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 
1533, in _do_scheduling
        num_queued_tis = 
self._critical_section_execute_task_instances(session=session)
      File 
"/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 
1132, in _critical_section_execute_task_instances
        queued_tis = self._executable_task_instances_to_queued(max_tis, 
session=session)
      File "/usr/local/lib/python3.6/dist-packages/airflow/utils/session.py", 
line 62, in wrapper
        return func(*args, **kwargs)
      File 
"/usr/local/lib/python3.6/dist-packages/airflow/jobs/scheduler_job.py", line 
1034, in _executable_task_instances_to_queued
        if task_instance.pool_slots > open_slots:
    TypeError: '>' not supported between instances of 'NoneType' and 'int'
    ```
    
    Workaround was to run manually:
    
    ```
    UPDATE task_instance SET pool_slots = 1 WHERE pool_slots IS NULL;
    ```
    
    This commit makes adds a DB migration to change the value to 1 for records 
with NULL value. And makes the column NOT NULLABLE.
    
    This bug was caused by https://github.com/apache/airflow/pull/7160
    
    (cherry picked from commit f763b7c3aa9cdac82b5d77e21e1840fbe931257a)
---
 .../8646922c8a04_change_default_pool_slots_to_1.py | 93 ++++++++++++++++++++++
 airflow/models/taskinstance.py                     |  2 +-
 2 files changed, 94 insertions(+), 1 deletion(-)

diff --git 
a/airflow/migrations/versions/8646922c8a04_change_default_pool_slots_to_1.py 
b/airflow/migrations/versions/8646922c8a04_change_default_pool_slots_to_1.py
new file mode 100644
index 0000000..bf49873
--- /dev/null
+++ b/airflow/migrations/versions/8646922c8a04_change_default_pool_slots_to_1.py
@@ -0,0 +1,93 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Change default pool_slots to 1
+
+Revision ID: 8646922c8a04
+Revises: 449b4072c2da
+Create Date: 2021-02-23 23:19:22.409973
+
+"""
+
+import dill
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy import Column, Float, Integer, PickleType, String
+
+# revision identifiers, used by Alembic.
+from sqlalchemy.ext.declarative import declarative_base
+
+from airflow.models.base import COLLATION_ARGS
+from airflow.utils.sqlalchemy import UtcDateTime
+
+revision = '8646922c8a04'
+down_revision = '449b4072c2da'
+branch_labels = None
+depends_on = None
+
+Base = declarative_base()
+BATCH_SIZE = 5000
+ID_LEN = 250
+
+
+class TaskInstance(Base):  # noqa: D101  # type: ignore
+    __tablename__ = "task_instance"
+
+    task_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
+    dag_id = Column(String(ID_LEN, **COLLATION_ARGS), primary_key=True)
+    execution_date = Column(UtcDateTime, primary_key=True)
+    start_date = Column(UtcDateTime)
+    end_date = Column(UtcDateTime)
+    duration = Column(Float)
+    state = Column(String(20))
+    _try_number = Column('try_number', Integer, default=0)
+    max_tries = Column(Integer)
+    hostname = Column(String(1000))
+    unixname = Column(String(1000))
+    job_id = Column(Integer)
+    pool = Column(String(50), nullable=False)
+    pool_slots = Column(Integer, default=1)
+    queue = Column(String(256))
+    priority_weight = Column(Integer)
+    operator = Column(String(1000))
+    queued_dttm = Column(UtcDateTime)
+    queued_by_job_id = Column(Integer)
+    pid = Column(Integer)
+    executor_config = Column(PickleType(pickler=dill))
+    external_executor_id = Column(String(ID_LEN, **COLLATION_ARGS))
+
+
+def upgrade():
+    """Change default pool_slots to 1 and make pool_slots not nullable"""
+    connection = op.get_bind()
+    sessionmaker = sa.orm.sessionmaker()
+    session = sessionmaker(bind=connection)
+
+    
session.query(TaskInstance).filter(TaskInstance.pool_slots.is_(None)).update(
+        {TaskInstance.pool_slots: 1}, synchronize_session=False
+    )
+    session.commit()
+
+    with op.batch_alter_table("task_instance", schema=None) as batch_op:
+        batch_op.alter_column("pool_slots", existing_type=sa.Integer, 
nullable=False)
+
+
+def downgrade():
+    """Unapply Change default pool_slots to 1"""
+    with op.batch_alter_table("task_instance", schema=None) as batch_op:
+        batch_op.alter_column("pool_slots", existing_type=sa.Integer, 
nullable=True)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index c7d7ff7..3ceb5a3 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -273,7 +273,7 @@ class TaskInstance(Base, LoggingMixin):  # pylint: 
disable=R0902,R0904
     unixname = Column(String(1000))
     job_id = Column(Integer)
     pool = Column(String(50), nullable=False)
-    pool_slots = Column(Integer, default=1)
+    pool_slots = Column(Integer, default=1, nullable=False)
     queue = Column(String(256))
     priority_weight = Column(Integer)
     operator = Column(String(1000))

Reply via email to