[jira] [Commented] (AIRFLOW-2747) Explicit re-schedule of sensors

2018-10-24 Thread Gabriel Silk (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-2747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662936#comment-16662936
 ] 

Gabriel Silk commented on AIRFLOW-2747:
---

This is awesome!

One thing – it looks like the "started_at" logic has only been implemented for 
BaseSensor. However, I would suggest we do the same for tasks as well, so that 
tasks can throw a reschedule exception during the course of their execution as 
well as sensors. This would be useful for tasks that figure out their own data 
dependencies at runtime – for example, Hive queries that depend on recent 
snapshot data. If we simply threw a reschedule exception, then we could attempt 
to execute these tasks periodically, rather than explicity modeling the data 
dependency via a sensor.

Does that make sense?

If people are interested, I'd be happy to open a PR

> Explicit re-schedule of sensors
> ---
>
> Key: AIRFLOW-2747
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2747
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: core, operators
>Affects Versions: 1.9.0, 1.10.0
>Reporter: Stefan Seelmann
>Assignee: Stefan Seelmann
>Priority: Major
> Fix For: 2.0.0
>
> Attachments: Screenshot_2018-07-12_14-10-24.png, 
> Screenshot_2018-09-16_20-09-28.png, Screenshot_2018-09-16_20-19-23.png, 
> google_apis-23_r01.zip
>
>
> By default sensors block a worker and just sleep between pokes. This is very 
> inefficient, especially when there are many long-running sensors.
> There is a hacky workaroud by setting a small timeout value and a high retry 
> number. But that has drawbacks:
>  * Errors raised by sensors are hidden and the sensor retries too often
>  * The sensor is retried in a fixed time interval (with optional exponential 
> backoff)
>  * There are many attempts and many log files are generated
>  I'd like to propose an explicit reschedule mechanism:
>  * A new "reschedule" flag for sensors, if set to True it will raise an 
> AirflowRescheduleException that causes a reschedule.
>  * AirflowRescheduleException contains the (earliest) re-schedule date.
>  * Reschedule requests are recorded in new `task_reschedule` table and 
> visualized in the Gantt view.
>  * A new TI dependency that checks if a sensor task is ready to be 
> re-scheduled.
> Advantages:
>  * This change is backward compatible. Existing sensors behave like before. 
> But it's possible to set the "reschedule" flag.
>  * The poke_interval, timeout, and soft_fail parameters are still respected 
> and used to calculate the next schedule time.
>  * Custom sensor implementations can even define the next sensible schedule 
> date by raising AirflowRescheduleException themselves.
>  * Existing TimeSensor and TimeDeltaSensor can also be changed to be 
> rescheduled when the time is reached.
>  * This mechanism can also be used by non-sensor operators (but then the new 
> ReadyToRescheduleDep has to be added to deps or BaseOperator).
> Design decisions and caveats:
>  * When handling AirflowRescheduleException the `try_number` is decremented. 
> That means that subsequent runs use the same try number and write to the same 
> log file.
>  * Sensor TI dependency check now depends on `task_reschedule` table. However 
> only the BaseSensorOperator includes the new ReadyToRescheduleDep for now.
> Open questions and TODOs:
>  * Should a dedicated state `UP_FOR_RESCHEDULE` be used instead of setting 
> the state back to `NONE`? This would require more changes in scheduler code 
> and especially in the UI, but the state of a task would be more explicit and 
> more transparent to the user.
>  * Add example/test for a non-sensor operator
>  * Document the new feature



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-2747) Explicit re-schedule of sensors

2018-09-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-2747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623071#comment-16623071
 ] 

ASF GitHub Bot commented on AIRFLOW-2747:
-

feng-tao closed pull request #3596: [AIRFLOW-2747] Explicit re-schedule of 
sensors
URL: https://github.com/apache/incubator-airflow/pull/3596
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/exceptions.py b/airflow/exceptions.py
index 89f3d0e048..d4098c4a32 100644
--- a/airflow/exceptions.py
+++ b/airflow/exceptions.py
@@ -47,6 +47,17 @@ class AirflowSensorTimeout(AirflowException):
 pass
 
 
+class AirflowRescheduleException(AirflowException):
+"""
+Raise when the task should be re-scheduled at a later time.
+
+:param reschedule_date: The date when the task should be rescheduled
+:type reschedule: datetime
+"""
+def __init__(self, reschedule_date):
+self.reschedule_date = reschedule_date
+
+
 class AirflowTaskTimeout(AirflowException):
 pass
 
diff --git 
a/airflow/migrations/versions/0a2a5b66e19d_add_task_reschedule_table.py 
b/airflow/migrations/versions/0a2a5b66e19d_add_task_reschedule_table.py
new file mode 100644
index 00..6eef6a9437
--- /dev/null
+++ b/airflow/migrations/versions/0a2a5b66e19d_add_task_reschedule_table.py
@@ -0,0 +1,83 @@
+# flake8: noqa
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""add task_reschedule table
+
+Revision ID: 0a2a5b66e19d
+Revises: 9635ae0956e7
+Create Date: 2018-06-17 22:50:00.053620
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '0a2a5b66e19d'
+down_revision = '9635ae0956e7'
+branch_labels = None
+depends_on = None
+
+from alembic import op
+import sqlalchemy as sa
+from sqlalchemy.dialects import mysql
+
+
+TABLE_NAME = 'task_reschedule'
+INDEX_NAME = 'idx_' + TABLE_NAME + '_dag_task_date'
+
+def mysql_timestamp():
+return mysql.TIMESTAMP(fsp=6)
+
+def sa_timestamp():
+return sa.TIMESTAMP(timezone=True)
+
+def upgrade():
+# See 0e2a74e0fc9f_add_time_zone_awareness
+conn = op.get_bind()
+if conn.dialect.name == 'mysql':
+timestamp = mysql_timestamp
+else:
+timestamp = sa_timestamp
+
+op.create_table(
+TABLE_NAME,
+sa.Column('id', sa.Integer(), nullable=False),
+sa.Column('task_id', sa.String(length=250), nullable=False),
+sa.Column('dag_id', sa.String(length=250), nullable=False),
+# use explicit server_default=None otherwise mysql implies defaults 
for first timestamp column
+sa.Column('execution_date', timestamp(), nullable=False, 
server_default=None),
+sa.Column('try_number', sa.Integer(), nullable=False),
+sa.Column('start_date', timestamp(), nullable=False),
+sa.Column('end_date', timestamp(), nullable=False),
+sa.Column('duration', sa.Integer(), nullable=False),
+sa.Column('reschedule_date', timestamp(), nullable=False),
+sa.PrimaryKeyConstraint('id'),
+sa.ForeignKeyConstraint(['task_id', 'dag_id', 'execution_date'],
+['task_instance.task_id', 
'task_instance.dag_id','task_instance.execution_date'],
+name='task_reschedule_dag_task_date_fkey')
+)
+op.create_index(
+INDEX_NAME,
+TABLE_NAME,
+['dag_id', 'task_id', 'execution_date'],
+unique=False
+)
+
+
+def downgrade():
+op.drop_index(INDEX_NAME, table_name=TABLE_NAME)
+op.drop_table(TABLE_NAME)
diff --git a/airflow/models.py b/airflow/models.py
index d703810a77..c6787c693c 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -53,11 +53,11 @@
 import uuid
 from datetime import datetime
 from urllib.parse import urlparse, quote, parse_qsl
-
 from sqlalchemy import (
-Column, Integer, String, DateTime, Text, Boolean, ForeignKey, PickleType,
-Index, Float, LargeBinary, UniqueConstraint)
-from sqlalchemy import func, or_, and_, true as sqltrue
+ 

[jira] [Commented] (AIRFLOW-2747) Explicit re-schedule of sensors

2018-09-17 Thread Stefan Seelmann (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-2747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618498#comment-16618498
 ] 

Stefan Seelmann commented on AIRFLOW-2747:
--

I assume this "radhefa Roufique hossain" is a spam user. Can one with admin 
access please delete the attached google_apis-23_r01.zip? Also reported in 
https://issues.apache.org/jira/browse/INFRA-17031

> Explicit re-schedule of sensors
> ---
>
> Key: AIRFLOW-2747
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2747
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: core, operators
>Affects Versions: 1.9.0
>Reporter: Stefan Seelmann
>Assignee: Roufique hossain
>Priority: Major
> Fix For: 2.0.0
>
> Attachments: Screenshot_2018-07-12_14-10-24.png, 
> Screenshot_2018-09-16_20-09-28.png, Screenshot_2018-09-16_20-19-23.png, 
> google_apis-23_r01.zip
>
>
> By default sensors block a worker and just sleep between pokes. This is very 
> inefficient, especially when there are many long-running sensors.
> There is a hacky workaroud by setting a small timeout value and a high retry 
> number. But that has drawbacks:
>  * Errors raised by sensors are hidden and the sensor retries too often
>  * The sensor is retried in a fixed time interval (with optional exponential 
> backoff)
>  * There are many attempts and many log files are generated
>  I'd like to propose an explicit reschedule mechanism:
>  * A new "reschedule" flag for sensors, if set to True it will raise an 
> AirflowRescheduleException that causes a reschedule.
>  * AirflowRescheduleException contains the (earliest) re-schedule date.
>  * Reschedule requests are recorded in new `task_reschedule` table and 
> visualized in the Gantt view.
>  * A new TI dependency that checks if a sensor task is ready to be 
> re-scheduled.
> Advantages:
>  * This change is backward compatible. Existing sensors behave like before. 
> But it's possible to set the "reschedule" flag.
>  * The poke_interval, timeout, and soft_fail parameters are still respected 
> and used to calculate the next schedule time.
>  * Custom sensor implementations can even define the next sensible schedule 
> date by raising AirflowRescheduleException themselves.
>  * Existing TimeSensor and TimeDeltaSensor can also be changed to be 
> rescheduled when the time is reached.
>  * This mechanism can also be used by non-sensor operators (but then the new 
> ReadyToRescheduleDep has to be added to deps or BaseOperator).
> Design decisions and caveats:
>  * When handling AirflowRescheduleException the `try_number` is decremented. 
> That means that subsequent runs use the same try number and write to the same 
> log file.
>  * Sensor TI dependency check now depends on `task_reschedule` table. However 
> only the BaseSensorOperator includes the new ReadyToRescheduleDep for now.
> Open questions and TODOs:
>  * Should a dedicated state `UP_FOR_RESCHEDULE` be used instead of setting 
> the state back to `NONE`? This would require more changes in scheduler code 
> and especially in the UI, but the state of a task would be more explicit and 
> more transparent to the user.
>  * Add example/test for a non-sensor operator
>  * Document the new feature



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-2747) Explicit re-schedule of sensors

2018-09-16 Thread Stefan Seelmann (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-2747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16616842#comment-16616842
 ] 

Stefan Seelmann commented on AIRFLOW-2747:
--

I changed the Gantt view to not show each individual reschedule but only a 
single bar. The color changes between light green (if currently running) and 
white (if currently inactive), those colors are also shown in other views so 
it's consistent. However failed attempts are still shown as separate bar (as 
before). Attached two screenshots for demonstration.

!Screenshot_2018-09-16_20-19-23.png!!Screenshot_2018-09-16_20-09-28.png!

 

> Explicit re-schedule of sensors
> ---
>
> Key: AIRFLOW-2747
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2747
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: core, operators
>Affects Versions: 1.9.0
>Reporter: Stefan Seelmann
>Assignee: Stefan Seelmann
>Priority: Major
> Fix For: 2.0.0
>
> Attachments: Screenshot_2018-07-12_14-10-24.png, 
> Screenshot_2018-09-16_20-09-28.png, Screenshot_2018-09-16_20-19-23.png
>
>
> By default sensors block a worker and just sleep between pokes. This is very 
> inefficient, especially when there are many long-running sensors.
> There is a hacky workaroud by setting a small timeout value and a high retry 
> number. But that has drawbacks:
>  * Errors raised by sensors are hidden and the sensor retries too often
>  * The sensor is retried in a fixed time interval (with optional exponential 
> backoff)
>  * There are many attempts and many log files are generated
>  I'd like to propose an explicit reschedule mechanism:
>  * A new "reschedule" flag for sensors, if set to True it will raise an 
> AirflowRescheduleException that causes a reschedule.
>  * AirflowRescheduleException contains the (earliest) re-schedule date.
>  * Reschedule requests are recorded in new `task_reschedule` table and 
> visualized in the Gantt view.
>  * A new TI dependency that checks if a sensor task is ready to be 
> re-scheduled.
> Advantages:
>  * This change is backward compatible. Existing sensors behave like before. 
> But it's possible to set the "reschedule" flag.
>  * The poke_interval, timeout, and soft_fail parameters are still respected 
> and used to calculate the next schedule time.
>  * Custom sensor implementations can even define the next sensible schedule 
> date by raising AirflowRescheduleException themselves.
>  * Existing TimeSensor and TimeDeltaSensor can also be changed to be 
> rescheduled when the time is reached.
>  * This mechanism can also be used by non-sensor operators (but then the new 
> ReadyToRescheduleDep has to be added to deps or BaseOperator).
> Design decisions and caveats:
>  * When handling AirflowRescheduleException the `try_number` is decremented. 
> That means that subsequent runs use the same try number and write to the same 
> log file.
>  * Sensor TI dependency check now depends on `task_reschedule` table. However 
> only the BaseSensorOperator includes the new ReadyToRescheduleDep for now.
> Open questions and TODOs:
>  * Should a dedicated state `UP_FOR_RESCHEDULE` be used instead of setting 
> the state back to `NONE`? This would require more changes in scheduler code 
> and especially in the UI, but the state of a task would be more explicit and 
> more transparent to the user.
>  * Add example/test for a non-sensor operator
>  * Document the new feature



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-2747) Explicit re-schedule of sensors

2018-09-02 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-2747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16601524#comment-16601524
 ] 

Apache Spark commented on AIRFLOW-2747:
---

User 'seelmann' has created a pull request for this issue:
https://github.com/apache/incubator-airflow/pull/3596

> Explicit re-schedule of sensors
> ---
>
> Key: AIRFLOW-2747
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2747
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: core, operators
>Affects Versions: 1.9.0
>Reporter: Stefan Seelmann
>Assignee: Stefan Seelmann
>Priority: Major
> Fix For: 2.0.0
>
> Attachments: Screenshot_2018-07-12_14-10-24.png
>
>
> By default sensors block a worker and just sleep between pokes. This is very 
> inefficient, especially when there are many long-running sensors.
> There is a hacky workaroud by setting a small timeout value and a high retry 
> number. But that has drawbacks:
>  * Errors raised by sensors are hidden and the sensor retries too often
>  * The sensor is retried in a fixed time interval (with optional exponential 
> backoff)
>  * There are many attempts and many log files are generated
>  I'd like to propose an explicit reschedule mechanism:
>  * A new "reschedule" flag for sensors, if set to True it will raise an 
> AirflowRescheduleException that causes a reschedule.
>  * AirflowRescheduleException contains the (earliest) re-schedule date.
>  * Reschedule requests are recorded in new `task_reschedule` table and 
> visualized in the Gantt view.
>  * A new TI dependency that checks if a sensor task is ready to be 
> re-scheduled.
> Advantages:
>  * This change is backward compatible. Existing sensors behave like before. 
> But it's possible to set the "reschedule" flag.
>  * The poke_interval, timeout, and soft_fail parameters are still respected 
> and used to calculate the next schedule time.
>  * Custom sensor implementations can even define the next sensible schedule 
> date by raising AirflowRescheduleException themselves.
>  * Existing TimeSensor and TimeDeltaSensor can also be changed to be 
> rescheduled when the time is reached.
>  * This mechanism can also be used by non-sensor operators (but then the new 
> ReadyToRescheduleDep has to be added to deps or BaseOperator).
> Design decisions and caveats:
>  * When handling AirflowRescheduleException the `try_number` is decremented. 
> That means that subsequent runs use the same try number and write to the same 
> log file.
>  * Sensor TI dependency check now depends on `task_reschedule` table. However 
> only the BaseSensorOperator includes the new ReadyToRescheduleDep for now.
> Open questions and TODOs:
>  * Should a dedicated state `UP_FOR_RESCHEDULE` be used instead of setting 
> the state back to `NONE`? This would require more changes in scheduler code 
> and especially in the UI, but the state of a task would be more explicit and 
> more transparent to the user.
>  * Add example/test for a non-sensor operator
>  * Document the new feature



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-2747) Explicit re-schedule of sensors

2018-07-12 Thread Stefan Seelmann (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-2747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542191#comment-16542191
 ] 

Stefan Seelmann commented on AIRFLOW-2747:
--

[~pedromachado] Thanks for the feedback.

I added the content of task_fail and task_instance table above, I hope things 
get clearer.

Regarding the colors:
 * The black bars are executions that requested a reschedule (i.e. the sensor 
raised an AirflowRescheduleException). The start_date and end_date are the 
actual dates the sensor task run, the reschedule_date is the date it requested 
to be rescheduled. I borrowed the layout of the task_reschedule table from 
task_fail table and added the two additional columns.
 * The red bars are failures (which then triggered a retry), those are recorded 
in task_fail table and already today (in master and 1.10) shown like this in 
the gantt view.

Regarding start_date before reschedule_date: I cannot see that problem, the 
start_date of the next row (with the same sensor task_id) is always after the 
previous reschedule_date. Note that the table contains rows of two sensors s2 
and s3.

The way it is visualized (in the gantt view) can be changed, for example there 
can just be a one bar from first start_date to last end_date, in light green 
while still in unfinished state, dark green or red when successful or failed. I 
personally like the multiple bars to see what happened when.

> Explicit re-schedule of sensors
> ---
>
> Key: AIRFLOW-2747
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2747
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: core, operators
>Affects Versions: 1.9.0
>Reporter: Stefan Seelmann
>Assignee: Stefan Seelmann
>Priority: Major
> Fix For: 2.0.0
>
> Attachments: Screenshot_2018-07-12_14-10-24.png
>
>
> By default sensors block a worker and just sleep between pokes. This is very 
> inefficient, especially when there are many long-running sensors.
> There is a hacky workaroud by setting a small timeout value and a high retry 
> number. But that has drawbacks:
>  * Errors raised by sensors are hidden and the sensor retries too often
>  * The sensor is retried in a fixed time interval (with optional exponential 
> backoff)
>  * There are many attempts and many log files are generated
>  I'd like to propose an explicit reschedule mechanism:
>  * A new "reschedule" flag for sensors, if set to True it will raise an 
> AirflowRescheduleException that causes a reschedule.
>  * AirflowRescheduleException contains the (earliest) re-schedule date.
>  * Reschedule requests are recorded in new `task_reschedule` table and 
> visualized in the Gantt view.
>  * A new TI dependency that checks if a sensor task is ready to be 
> re-scheduled.
> Advantages:
>  * This change is backward compatible. Existing sensors behave like before. 
> But it's possible to set the "reschedule" flag.
>  * The poke_interval, timeout, and soft_fail parameters are still respected 
> and used to calculate the next schedule time.
>  * Custom sensor implementations can even define the next sensible schedule 
> date by raising AirflowRescheduleException themselves.
>  * Existing TimeSensor and TimeDeltaSensor can also be changed to be 
> rescheduled when the time is reached.
>  * This mechanism can also be used by non-sensor operators (but then the new 
> ReadyToRescheduleDep has to be added to deps or BaseOperator).
> Design decisions and caveats:
>  * When handling AirflowRescheduleException the `try_number` is decremented. 
> That means that subsequent runs use the same try number and write to the same 
> log file.
>  * Sensor TI dependency check now depends on `task_reschedule` table. However 
> only the BaseSensorOperator includes the new ReadyToRescheduleDep for now.
> Open questions and TODOs:
>  * Should a dedicated state `UP_FOR_RESCHEDULE` be used instead of setting 
> the state back to `NONE`? This would require more changes in scheduler code 
> and especially in the UI, but the state of a task would be more explicit and 
> more transparent to the user.
>  * Add example/test for a non-sensor operator
>  * Document the new feature



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-2747) Explicit re-schedule of sensors

2018-07-12 Thread Pedro Machado (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-2747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16542108#comment-16542108
 ] 

Pedro Machado commented on AIRFLOW-2747:


[~seelmann]  From the user point of view, my only feedback is that the UI 
should not show sensors that are still running as failed or up for retry as 
that would draw attention to things that are running as expected.

I could not make full sense of the Gantt chart. I suppose the bars show in 
black represent reschedule executions of the sensor. Are the red ones retries?

 

I also noticed that the `start_date` happens before the previous 
`reschedule_date`. Did I not read the table correctly?

> Explicit re-schedule of sensors
> ---
>
> Key: AIRFLOW-2747
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2747
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: core, operators
>Affects Versions: 1.9.0
>Reporter: Stefan Seelmann
>Assignee: Stefan Seelmann
>Priority: Major
> Fix For: 2.0.0
>
> Attachments: Screenshot_2018-07-12_14-10-24.png
>
>
> By default sensors block a worker and just sleep between pokes. This is very 
> inefficient, especially when there are many long-running sensors.
> There is a hacky workaroud by setting a small timeout value and a high retry 
> number. But that has drawbacks:
>  * Errors raised by sensors are hidden and the sensor retries too often
>  * The sensor is retried in a fixed time interval (with optional exponential 
> backoff)
>  * There are many attempts and many log files are generated
>  I'd like to propose an explicit reschedule mechanism:
>  * A new "reschedule" flag for sensors, if set to True it will raise an 
> AirflowRescheduleException that causes a reschedule.
>  * AirflowRescheduleException contains the (earliest) re-schedule date.
>  * Reschedule requests are recorded in new `task_reschedule` table and 
> visualized in the Gantt view.
>  * A new TI dependency that checks if a sensor task is ready to be 
> re-scheduled.
> Advantages:
>  * This change is backward compatible. Existing sensors behave like before. 
> But it's possible to set the "reschedule" flag.
>  * The poke_interval, timeout, and soft_fail parameters are still respected 
> and used to calculate the next schedule time.
>  * Custom sensor implementations can even define the next sensible schedule 
> date by raising AirflowRescheduleException themselves.
>  * Existing TimeSensor and TimeDeltaSensor can also be changed to be 
> rescheduled when the time is reached.
>  * This mechanism can also be used by non-sensor operators (but then the new 
> ReadyToRescheduleDep has to be added to deps or BaseOperator).
> Design decisions and caveats:
>  * When handling AirflowRescheduleException the `try_number` is decremented. 
> That means that subsequent runs use the same try number and write to the same 
> log file.
>  * Sensor TI dependency check now depends on `task_reschedule` table. However 
> only the BaseSensorOperator includes the new ReadyToRescheduleDep for now.
> Open questions and TODOs:
>  * Should a dedicated state `UP_FOR_RESCHEDULE` be used instead of setting 
> the state back to `NONE`? This would require more changes in scheduler code 
> and especially in the UI, but the state of a task would be more explicit and 
> more transparent to the user.
>  * Add example/test for a non-sensor operator
>  * Document the new feature



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-2747) Explicit re-schedule of sensors

2018-07-12 Thread Stefan Seelmann (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-2747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541546#comment-16541546
 ] 

Stefan Seelmann commented on AIRFLOW-2747:
--

Initial PR: https://github.com/apache/incubator-airflow/pull/3596

> Explicit re-schedule of sensors
> ---
>
> Key: AIRFLOW-2747
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2747
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: core, operators
>Affects Versions: 1.9.0
>Reporter: Stefan Seelmann
>Assignee: Stefan Seelmann
>Priority: Major
> Fix For: 2.0.0
>
> Attachments: Screenshot_2018-07-12_14-10-24.png
>
>
> By default sensors block a worker and just sleep between pokes. This is very 
> inefficient, especially when there are many long-running sensors.
> There is a hacky workaroud by setting a small timeout value and a high retry 
> number. But that has drawbacks:
>  * Errors throws by sensors are hidden and the sensor retries too often
>  * The sensor is retried in a fixed time interval (with optional exponential 
> backoff)
>  * There are many attempts and many log files are generated
>  I'd like to propose an explicit reschedule mechanism:
>  * A new "reschedule" flag for sensors, if set to True it will raise an 
> AirflowRescheduleException that causes a reschedule.
>  * AirflowRescheduleException contains the (earliest) re-schedule date.
>  * Reschedule requests are recorded in new `task_reschedule` table and 
> visualized in the Gantt view.
>  * A new TI dependency that checks if a sensor task is ready to be 
> re-scheduled.
> Advantages:
>  * This change is backward compatible. Existing sensors behave like before. 
> But it's possible to set the "reschedule" flag.
>  * The poke_interval, timeout, and soft_fail parameters are still respected 
> and used to calculate the next schedule time.
>  * Custom sensor implementations can even define the next sensible schedule 
> date by raising AirflowRescheduleException themselves.
>  * Existing TimeSensor and TimeDeltaSensor can also be changed to be 
> rescheduled when the time is reached.
>  * This mechanism can also be used by non-sensor operators (but then the new 
> ReadyToRescheduleDep has to be added to deps or BaseOperator).
> Design decisions and caveats:
>  * When handling AirflowRescheduleException the `try_number` is decremented. 
> That means that subsequent runs use the same try number and write to the same 
> log file.
>  * Sensor TI dependency check now depends on `task_reschedule` table. However 
> only the BaseSensorOperator includes the new ReadyToRescheduleDep for now.
> Open questions and TODOs:
>  * Should a dedicated state `UP_FOR_RESCHEDULE` be used instead of setting 
> the state back to `NONE`? This would require more changes in scheduler code 
> and especially in the UI, but the state of a task would be more explicit and 
> more transparent to the user.
>  * Add example/test for a non-sensor operator
>  * Document the new feature



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-2747) Explicit re-schedule of sensors

2018-07-12 Thread Stefan Seelmann (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-2747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16541539#comment-16541539
 ] 

Stefan Seelmann commented on AIRFLOW-2747:
--

Screenshot of the Gantt view for an example DAG run:

  !Screenshot_2018-07-12_14-10-24.png!


 And the corresponding rows in task_reschedule table:
{noformat}
$ select * from task_reschedule where 
execution_date='2018-07-12T12:06:28.988028' order by id;
 id | task_id | dag_id |execution_date | try_number |  
start_date   |   end_date| duration |
reschedule_date
+-++---++---+---+--+---
 42 | s3  | dummy  | 2018-07-12 12:06:28.988028+00 |  1 | 
2018-07-12 12:06:54.430185+00 | 2018-07-12 12:06:59.339554+00 |5 | 
2018-07-12 12:07:14.312456+00
 44 | s2  | dummy  | 2018-07-12 12:06:28.988028+00 |  2 | 
2018-07-12 12:07:09.381193+00 | 2018-07-12 12:07:12.480702+00 |3 | 
2018-07-12 12:07:22.467206+00
 45 | s3  | dummy  | 2018-07-12 12:06:28.988028+00 |  1 | 
2018-07-12 12:07:17.111816+00 | 2018-07-12 12:07:18.444199+00 |1 | 
2018-07-12 12:07:33.4376+00
 47 | s2  | dummy  | 2018-07-12 12:06:28.988028+00 |  3 | 
2018-07-12 12:07:34.499979+00 | 2018-07-12 12:07:35.834609+00 |1 | 
2018-07-12 12:07:45.817533+00
 49 | s2  | dummy  | 2018-07-12 12:06:28.988028+00 |  3 | 
2018-07-12 12:07:49.407569+00 | 2018-07-12 12:07:50.843526+00 |1 | 
2018-07-12 12:08:00.834584+00
 51 | s2  | dummy  | 2018-07-12 12:06:28.988028+00 |  4 | 
2018-07-12 12:08:14.526+00| 2018-07-12 12:08:15.768907+00 |1 | 
2018-07-12 12:08:25.762619+00
 53 | s2  | dummy  | 2018-07-12 12:06:28.988028+00 |  4 | 
2018-07-12 12:08:29.329766+00 | 2018-07-12 12:08:31.168762+00 |2 | 
2018-07-12 12:08:41.160209+00
{noformat}

> Explicit re-schedule of sensors
> ---
>
> Key: AIRFLOW-2747
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2747
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: core, operators
>Affects Versions: 1.9.0
>Reporter: Stefan Seelmann
>Assignee: Stefan Seelmann
>Priority: Major
> Fix For: 2.0.0
>
> Attachments: Screenshot_2018-07-12_14-10-24.png
>
>
> By default sensors block a worker and just sleep between pokes. This is very 
> inefficient, especially when there are many long-running sensors.
> There is a hacky workaroud by setting a small timeout value and a high retry 
> number. But that has drawbacks:
>  * Errors throws by sensors are hidden and the sensor retries too often
>  * The sensor is retried in a fixed time interval (with optional exponential 
> backoff)
>  * There are many attempts and many log files are generated
>  I'd like to propose an explicit reschedule mechanism:
>  * A new "reschedule" flag for sensors, if set to True it will raise an 
> AirflowRescheduleException that causes a reschedule.
>  * AirflowRescheduleException contains the (earliest) re-schedule date.
>  * Reschedule requests are recorded in new `task_reschedule` table and 
> visualized in the Gantt view.
>  * A new TI dependency that checks if a sensor task is ready to be 
> re-scheduled.
> Advantages:
>  * This change is backward compatible. Existing sensors behave like before. 
> But it's possible to set the "reschedule" flag.
>  * The poke_interval, timeout, and soft_fail parameters are still respected 
> and used to calculate the next schedule time.
>  * Custom sensor implementations can even define the next sensible schedule 
> date by raising AirflowRescheduleException themselves.
>  * Existing TimeSensor and TimeDeltaSensor can also be changed to be 
> rescheduled when the time is reached.
>  * This mechanism can also be used by non-sensor operators (but then the new 
> ReadyToRescheduleDep has to be added to deps or BaseOperator).
> Design decisions and caveats:
>  * When handling AirflowRescheduleException the `try_number` is decremented. 
> That means that subsequent runs use the same try number and write to the same 
> log file.
>  * Sensor TI dependency check now depends on `task_reschedule` table. However 
> only the BaseSensorOperator includes the new ReadyToRescheduleDep for now.
> Open questions and TODOs:
>  * Should a dedicated state `UP_FOR_RESCHEDULE` be used instead of setting 
> the state back to `NONE`? This would require more changes in scheduler code 
> and especially in the UI, but the state of a task would be more explicit and 
> more transparent to the user.
>  * Add example/test for a non-sensor operator
>  * Document the new feature



--
This message was sent