[jira] [Updated] (AIRFLOW-2751) hive to druid library conflicts
[ https://issues.apache.org/jira/browse/AIRFLOW-2751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] juhwi lee updated AIRFLOW-2751: --- Description: There is a jackson.datatype.guava library conflict problem in HiveToDruidTransfer when hive and druid has different library version. In my case hdp version: 2.6.3.0 druid version : druid-0.10.1( jackson-datatype-guava-2.4.6.jar) We should be able to set this property to solve that problem. "mapreduce.job.user.classpath.first" : "true" It means "prefer loading Druid's version of a library when there is a conflict". reference. [http://druid.io/docs/latest/operations/other-hadoop.html] (Tip #2) But Job properties is hard coded in HiveToDruidTransfer. This is the error message I got. 2018-06-22T01:37:11,190 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1525761839652_85792_m_00_0, Status : FAILED Error: class com.fasterxml.jackson.datatype.guava.deser.HostAndPortDeserializer overrides final method deserialize.(Lcom/fasterxml/jackson/core/JsonParser;Lcom/fasterxml/jackson/databind/DeserializationContext;)Ljava/lang/Object; 2018-06-22T01:37:15,216 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1525761839652_85792_m_00_1, Status : FAILED Error: class com.fasterxml.jackson.datatype.guava.deser.HostAndPortDeserializer overrides final method deserialize.(Lcom/fasterxml/jackson/core/JsonParser;Lcom/fasterxml/jackson/databind/DeserializationContext;)Ljava/lang/Object; 2018-06-22T01:37:20,231 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1525761839652_85792_m_00_2, Status : FAILED Error: class com.fasterxml.jackson.datatype.guava.deser.HostAndPortDeserializer overrides final method deserialize.(Lcom/fasterxml/jackson/core/JsonParser;Lcom/fasterxml/jackson/databind/DeserializationContext;)Ljava/lang/Object; 2018-06-22T01:37:26,248 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - map 100% reduce 100% 2018-06-22T01:37:26,252 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Job job_1525761839652_85792 failed with state FAILED due to: Task failed task_1525761839652_85792_m_00 Job failed as tasks failed. failedMaps:1 failedReduces:0 was: There is a jackson.datatype.guava library conflict problem in HiveToDruidTransfer when hive and druid has different library. In my case hdp version: 2.6.3.0 druid version : druid-0.10.1( jackson-datatype-guava-2.4.6.jar) We should be able to set this property to solve that problem. "mapreduce.job.user.classpath.first" : "true" It means "prefer loading Druid's version of a library when there is a conflict". reference. [http://druid.io/docs/latest/operations/other-hadoop.html] (Tip #2) But Job properties is hard coded in HiveToDruidTransfer. This is the error message I got. 2018-06-22T01:37:11,190 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1525761839652_85792_m_00_0, Status : FAILED Error: class com.fasterxml.jackson.datatype.guava.deser.HostAndPortDeserializer overrides final method deserialize.(Lcom/fasterxml/jackson/core/JsonParser;Lcom/fasterxml/jackson/databind/DeserializationContext;)Ljava/lang/Object; 2018-06-22T01:37:15,216 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1525761839652_85792_m_00_1, Status : FAILED Error: class com.fasterxml.jackson.datatype.guava.deser.HostAndPortDeserializer overrides final method deserialize.(Lcom/fasterxml/jackson/core/JsonParser;Lcom/fasterxml/jackson/databind/DeserializationContext;)Ljava/lang/Object; 2018-06-22T01:37:20,231 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1525761839652_85792_m_00_2, Status : FAILED Error: class com.fasterxml.jackson.datatype.guava.deser.HostAndPortDeserializer overrides final method deserialize.(Lcom/fasterxml/jackson/core/JsonParser;Lcom/fasterxml/jackson/databind/DeserializationContext;)Ljava/lang/Object; 2018-06-22T01:37:26,248 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - map 100% reduce 100% 2018-06-22T01:37:26,252 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Job job_1525761839652_85792 failed with state FAILED due to: Task failed task_1525761839652_85792_m_00 Job failed as tasks failed. failedMaps:1 failedReduces:0 > hive to druid library conflicts > --- > > Key: AIRFLOW-2751 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2751 > Project: Apache Airflow > Issue Type: Bug > Components: operators >Affects Versions: 1.10 >Reporter: juhwi lee >Priority: Major > > There is a jackson.datatype.guava library conflict problem in > HiveToDruidTransfer when hive and druid h
[jira] [Updated] (AIRFLOW-2751) hive to druid library conflicts
[ https://issues.apache.org/jira/browse/AIRFLOW-2751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] juhwi lee updated AIRFLOW-2751: --- Description: There is a jackson.datatype.guava library conflict problem in HiveToDruidTransfer when hive and druid has different library. In my case hdp version: 2.6.3.0 druid version : druid-0.10.1( jackson-datatype-guava-2.4.6.jar) We should be able to set this property to solve that problem. "mapreduce.job.user.classpath.first" : "true" It means "prefer loading Druid's version of a library when there is a conflict". reference. [http://druid.io/docs/latest/operations/other-hadoop.html] (Tip #2) But Job properties is hard coded in HiveToDruidTransfer. This is the error message I got. 2018-06-22T01:37:11,190 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1525761839652_85792_m_00_0, Status : FAILED Error: class com.fasterxml.jackson.datatype.guava.deser.HostAndPortDeserializer overrides final method deserialize.(Lcom/fasterxml/jackson/core/JsonParser;Lcom/fasterxml/jackson/databind/DeserializationContext;)Ljava/lang/Object; 2018-06-22T01:37:15,216 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1525761839652_85792_m_00_1, Status : FAILED Error: class com.fasterxml.jackson.datatype.guava.deser.HostAndPortDeserializer overrides final method deserialize.(Lcom/fasterxml/jackson/core/JsonParser;Lcom/fasterxml/jackson/databind/DeserializationContext;)Ljava/lang/Object; 2018-06-22T01:37:20,231 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1525761839652_85792_m_00_2, Status : FAILED Error: class com.fasterxml.jackson.datatype.guava.deser.HostAndPortDeserializer overrides final method deserialize.(Lcom/fasterxml/jackson/core/JsonParser;Lcom/fasterxml/jackson/databind/DeserializationContext;)Ljava/lang/Object; 2018-06-22T01:37:26,248 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - map 100% reduce 100% 2018-06-22T01:37:26,252 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Job job_1525761839652_85792 failed with state FAILED due to: Task failed task_1525761839652_85792_m_00 Job failed as tasks failed. failedMaps:1 failedReduces:0 was: There is a jackson.datatype.guava library conflict problem in HiveToDruidTransfer when hive and druid has different library. In my case hdp version: 2.6.3.0 druid version : druid-0.10.1( jackson-datatype-guava-2.4.6.jar) This is the error message I got. 2018-06-22T01:37:11,190 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1525761839652_85792_m_00_0, Status : FAILED Error: class com.fasterxml.jackson.datatype.guava.deser.HostAndPortDeserializer overrides final method deserialize.(Lcom/fasterxml/jackson/core/JsonParser;Lcom/fasterxml/jackson/databind/DeserializationContext;)Ljava/lang/Object; 2018-06-22T01:37:15,216 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1525761839652_85792_m_00_1, Status : FAILED Error: class com.fasterxml.jackson.datatype.guava.deser.HostAndPortDeserializer overrides final method deserialize.(Lcom/fasterxml/jackson/core/JsonParser;Lcom/fasterxml/jackson/databind/DeserializationContext;)Ljava/lang/Object; 2018-06-22T01:37:20,231 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1525761839652_85792_m_00_2, Status : FAILED Error: class com.fasterxml.jackson.datatype.guava.deser.HostAndPortDeserializer overrides final method deserialize.(Lcom/fasterxml/jackson/core/JsonParser;Lcom/fasterxml/jackson/databind/DeserializationContext;)Ljava/lang/Object; 2018-06-22T01:37:26,248 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - map 100% reduce 100% 2018-06-22T01:37:26,252 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Job job_1525761839652_85792 failed with state FAILED due to: Task failed task_1525761839652_85792_m_00 Job failed as tasks failed. failedMaps:1 failedReduces:0 We should be able to set this property to solve that problem. "mapreduce.job.user.classpath.first" : "true" It means "prefer loading Druid's version of a library when there is a conflict". reference. http://druid.io/docs/latest/operations/other-hadoop.html (Tip #2) But Job properties is hard coded in HiveToDruidTransfer. > hive to druid library conflicts > --- > > Key: AIRFLOW-2751 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2751 > Project: Apache Airflow > Issue Type: Bug > Components: operators >Affects Versions: 1.10 >Reporter: juhwi lee >Priority: Major > > There is a jackson.datatype.guava library conflict problem in > HiveToDruidTransfer when hive and druid has different library. > In my c
[jira] [Created] (AIRFLOW-2751) hive to druid library conflicts
juhwi lee created AIRFLOW-2751: -- Summary: hive to druid library conflicts Key: AIRFLOW-2751 URL: https://issues.apache.org/jira/browse/AIRFLOW-2751 Project: Apache Airflow Issue Type: Bug Components: operators Affects Versions: 1.10 Reporter: juhwi lee There is a jackson.datatype.guava library conflict problem in HiveToDruidTransfer when hive and druid has different library. In my case hdp version: 2.6.3.0 druid version : druid-0.10.1( jackson-datatype-guava-2.4.6.jar) This is the error message I got. 2018-06-22T01:37:11,190 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1525761839652_85792_m_00_0, Status : FAILED Error: class com.fasterxml.jackson.datatype.guava.deser.HostAndPortDeserializer overrides final method deserialize.(Lcom/fasterxml/jackson/core/JsonParser;Lcom/fasterxml/jackson/databind/DeserializationContext;)Ljava/lang/Object; 2018-06-22T01:37:15,216 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1525761839652_85792_m_00_1, Status : FAILED Error: class com.fasterxml.jackson.datatype.guava.deser.HostAndPortDeserializer overrides final method deserialize.(Lcom/fasterxml/jackson/core/JsonParser;Lcom/fasterxml/jackson/databind/DeserializationContext;)Ljava/lang/Object; 2018-06-22T01:37:20,231 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Task Id : attempt_1525761839652_85792_m_00_2, Status : FAILED Error: class com.fasterxml.jackson.datatype.guava.deser.HostAndPortDeserializer overrides final method deserialize.(Lcom/fasterxml/jackson/core/JsonParser;Lcom/fasterxml/jackson/databind/DeserializationContext;)Ljava/lang/Object; 2018-06-22T01:37:26,248 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - map 100% reduce 100% 2018-06-22T01:37:26,252 INFO [task-runner-0-priority-0] org.apache.hadoop.mapreduce.Job - Job job_1525761839652_85792 failed with state FAILED due to: Task failed task_1525761839652_85792_m_00 Job failed as tasks failed. failedMaps:1 failedReduces:0 We should be able to set this property to solve that problem. "mapreduce.job.user.classpath.first" : "true" It means "prefer loading Druid's version of a library when there is a conflict". reference. http://druid.io/docs/latest/operations/other-hadoop.html (Tip #2) But Job properties is hard coded in HiveToDruidTransfer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Work started] (AIRFLOW-2748) dbexportcmd & dbimportcmd should support use_customer_cluster, use_customer_label in QuboleOperator
[ https://issues.apache.org/jira/browse/AIRFLOW-2748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on AIRFLOW-2748 started by Joy Lal Chattaraj. -- > dbexportcmd & dbimportcmd should support use_customer_cluster, > use_customer_label in QuboleOperator > --- > > Key: AIRFLOW-2748 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2748 > Project: Apache Airflow > Issue Type: Improvement >Reporter: Joy Lal Chattaraj >Assignee: Joy Lal Chattaraj >Priority: Major > > A customer pointed out that the QuboleOperator & qubole_hook in the Airflow > codebase doesn't support `use_customer_cluster`, `customer_cluster_label` > and `hive_serde` in Airflow. This is a request for adding the above > functionality. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-2750) Add subcommands to delete and list users
Kengo Seki created AIRFLOW-2750: --- Summary: Add subcommands to delete and list users Key: AIRFLOW-2750 URL: https://issues.apache.org/jira/browse/AIRFLOW-2750 Project: Apache Airflow Issue Type: New Feature Components: cli Reporter: Kengo Seki Assignee: Kengo Seki Currently, adding user is the only operation that CLI has on RBAC. It'd be useful if users could delete and list RBAC users via CLI. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2747) Explicit re-schedule of sensors
[ https://issues.apache.org/jira/browse/AIRFLOW-2747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542191#comment-16542191 ] Stefan Seelmann commented on AIRFLOW-2747: -- [~pedromachado] Thanks for the feedback. I added the content of task_fail and task_instance table above, I hope things get clearer. Regarding the colors: * The black bars are executions that requested a reschedule (i.e. the sensor raised an AirflowRescheduleException). The start_date and end_date are the actual dates the sensor task run, the reschedule_date is the date it requested to be rescheduled. I borrowed the layout of the task_reschedule table from task_fail table and added the two additional columns. * The red bars are failures (which then triggered a retry), those are recorded in task_fail table and already today (in master and 1.10) shown like this in the gantt view. Regarding start_date before reschedule_date: I cannot see that problem, the start_date of the next row (with the same sensor task_id) is always after the previous reschedule_date. Note that the table contains rows of two sensors s2 and s3. The way it is visualized (in the gantt view) can be changed, for example there can just be a one bar from first start_date to last end_date, in light green while still in unfinished state, dark green or red when successful or failed. I personally like the multiple bars to see what happened when. > Explicit re-schedule of sensors > --- > > Key: AIRFLOW-2747 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2747 > Project: Apache Airflow > Issue Type: Improvement > Components: core, operators >Affects Versions: 1.9.0 >Reporter: Stefan Seelmann >Assignee: Stefan Seelmann >Priority: Major > Fix For: 2.0.0 > > Attachments: Screenshot_2018-07-12_14-10-24.png > > > By default sensors block a worker and just sleep between pokes. This is very > inefficient, especially when there are many long-running sensors. > There is a hacky workaroud by setting a small timeout value and a high retry > number. But that has drawbacks: > * Errors raised by sensors are hidden and the sensor retries too often > * The sensor is retried in a fixed time interval (with optional exponential > backoff) > * There are many attempts and many log files are generated > I'd like to propose an explicit reschedule mechanism: > * A new "reschedule" flag for sensors, if set to True it will raise an > AirflowRescheduleException that causes a reschedule. > * AirflowRescheduleException contains the (earliest) re-schedule date. > * Reschedule requests are recorded in new `task_reschedule` table and > visualized in the Gantt view. > * A new TI dependency that checks if a sensor task is ready to be > re-scheduled. > Advantages: > * This change is backward compatible. Existing sensors behave like before. > But it's possible to set the "reschedule" flag. > * The poke_interval, timeout, and soft_fail parameters are still respected > and used to calculate the next schedule time. > * Custom sensor implementations can even define the next sensible schedule > date by raising AirflowRescheduleException themselves. > * Existing TimeSensor and TimeDeltaSensor can also be changed to be > rescheduled when the time is reached. > * This mechanism can also be used by non-sensor operators (but then the new > ReadyToRescheduleDep has to be added to deps or BaseOperator). > Design decisions and caveats: > * When handling AirflowRescheduleException the `try_number` is decremented. > That means that subsequent runs use the same try number and write to the same > log file. > * Sensor TI dependency check now depends on `task_reschedule` table. However > only the BaseSensorOperator includes the new ReadyToRescheduleDep for now. > Open questions and TODOs: > * Should a dedicated state `UP_FOR_RESCHEDULE` be used instead of setting > the state back to `NONE`? This would require more changes in scheduler code > and especially in the UI, but the state of a task would be more explicit and > more transparent to the user. > * Add example/test for a non-sensor operator > * Document the new feature -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (AIRFLOW-2747) Explicit re-schedule of sensors
[ https://issues.apache.org/jira/browse/AIRFLOW-2747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541539#comment-16541539 ] Stefan Seelmann edited comment on AIRFLOW-2747 at 7/12/18 8:27 PM: --- Screenshot of the Gantt view for an example DAG run: !Screenshot_2018-07-12_14-10-24.png! And the corresponding rows in task_reschedule, task_fail, and task_instance table: {noformat} $ select * from task_reschedule where execution_date='2018-07-12T12:06:28.988028' order by id; id | task_id | dag_id |execution_date | try_number | start_date | end_date| duration | reschedule_date +-++---++---+---+--+--- 42 | s3 | dummy | 2018-07-12 12:06:28.988028+00 | 1 | 2018-07-12 12:06:54.430185+00 | 2018-07-12 12:06:59.339554+00 |5 | 2018-07-12 12:07:14.312456+00 44 | s2 | dummy | 2018-07-12 12:06:28.988028+00 | 2 | 2018-07-12 12:07:09.381193+00 | 2018-07-12 12:07:12.480702+00 |3 | 2018-07-12 12:07:22.467206+00 45 | s3 | dummy | 2018-07-12 12:06:28.988028+00 | 1 | 2018-07-12 12:07:17.111816+00 | 2018-07-12 12:07:18.444199+00 |1 | 2018-07-12 12:07:33.4376+00 47 | s2 | dummy | 2018-07-12 12:06:28.988028+00 | 3 | 2018-07-12 12:07:34.499979+00 | 2018-07-12 12:07:35.834609+00 |1 | 2018-07-12 12:07:45.817533+00 49 | s2 | dummy | 2018-07-12 12:06:28.988028+00 | 3 | 2018-07-12 12:07:49.407569+00 | 2018-07-12 12:07:50.843526+00 |1 | 2018-07-12 12:08:00.834584+00 51 | s2 | dummy | 2018-07-12 12:06:28.988028+00 | 4 | 2018-07-12 12:08:14.526+00| 2018-07-12 12:08:15.768907+00 |1 | 2018-07-12 12:08:25.762619+00 53 | s2 | dummy | 2018-07-12 12:06:28.988028+00 | 4 | 2018-07-12 12:08:29.329766+00 | 2018-07-12 12:08:31.168762+00 |2 | 2018-07-12 12:08:41.160209+00 {noformat} {noformat} $ select * from task_fail where execution_date='2018-07-12T12:06:28.988028' order by id; id | task_id | dag_id |execution_date | start_date | end_date| duration -+-++---+---+---+-- 173 | t1 | dummy | 2018-07-12 12:06:28.988028+00 | 2018-07-12 12:06:33.005215+00 | 2018-07-12 12:06:36.503438+00 |3 179 | s2 | dummy | 2018-07-12 12:06:28.988028+00 | 2018-07-12 12:06:54.860487+00 | 2018-07-12 12:06:59.352183+00 |4 181 | s2 | dummy | 2018-07-12 12:06:28.988028+00 | 2018-07-12 12:07:25.124649+00 | 2018-07-12 12:07:26.606175+00 |1 182 | s2 | dummy | 2018-07-12 12:06:28.988028+00 | 2018-07-12 12:08:04.295306+00 | 2018-07-12 12:08:05.610363+00 |1 {noformat} {noformat} $ select task_id,dag_id,execution_date,start_date,end_date,duration,state,try_number from task_instance where dag_id='dummy' and execution_date='2018-07-12T12:06:28.988028'; task_id | dag_id |execution_date | start_date | end_date| duration | state | try_number -++---+---+---+--+-+ s2 | dummy | 2018-07-12 12:06:28.988028+00 | 2018-07-12 12:08:44.828189+00 | 2018-07-12 12:08:46.609474+00 | 1.781285 | success | 4 t2 | dummy | 2018-07-12 12:06:28.988028+00 | 2018-07-12 12:08:50.711506+00 | 2018-07-12 12:08:54.888104+00 | 4.176598 | success | 1 b1 | dummy | 2018-07-12 12:06:28.988028+00 | 2018-07-12 12:08:57.965998+00 | 2018-07-12 12:08:59.547209+00 | 1.581211 | success | 1 t1 | dummy | 2018-07-12 12:06:28.988028+00 | 2018-07-12 12:06:44.652687+00 | 2018-07-12 12:06:48.328103+00 | 3.675416 | success | 2 sub1| dummy | 2018-07-12 12:06:28.988028+00 | 2018-07-12 12:09:03.322963+00 | 2018-07-12 12:09:40.248113+00 | 36.92515 | success | 1 s1 | dummy | 2018-07-12 12:06:28.988028+00 | 2018-07-12 12:06:54.345113+00 | 2018-07-12 12:06:58.871657+00 | 4.526544 | success | 1 s3 | dummy | 2018-07-12 12:06:28.988028+00 | 2018-07-12 12:07:37.190335+00 | 2018-07-12 12:07:38.725783+00 | 1.535448 | success | 1 {noformat} was (Author: seelmann): Screenshot of the Gantt view for an example DAG run: !Screenshot_2018-07-12_14-10-24.png! And the corresponding rows in task_reschedule table: {noformat} $ select * from task_reschedule where execution_date='2018-07-12T12:06:28.988028' order by id; id | task_id | dag_id |execution_date | t
[jira] [Comment Edited] (AIRFLOW-2747) Explicit re-schedule of sensors
[ https://issues.apache.org/jira/browse/AIRFLOW-2747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542108#comment-16542108 ] Pedro Machado edited comment on AIRFLOW-2747 at 7/12/18 7:03 PM: - [~seelmann] From the user point of view, my only feedback is that the UI should not show sensors that are still running as failed or up for retry as that would draw attention to things that are running as expected. I could not make full sense of the Gantt chart. I suppose the bars shown in black represent reschedule executions of the sensor. Are the red ones retries? I also noticed that the `start_date` happens before the previous `reschedule_date`. Did I not read the table correctly? was (Author: pedromachado): [~seelmann] From the user point of view, my only feedback is that the UI should not show sensors that are still running as failed or up for retry as that would draw attention to things that are running as expected. I could not make full sense of the Gantt chart. I suppose the bars show in black represent reschedule executions of the sensor. Are the red ones retries? I also noticed that the `start_date` happens before the previous `reschedule_date`. Did I not read the table correctly? > Explicit re-schedule of sensors > --- > > Key: AIRFLOW-2747 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2747 > Project: Apache Airflow > Issue Type: Improvement > Components: core, operators >Affects Versions: 1.9.0 >Reporter: Stefan Seelmann >Assignee: Stefan Seelmann >Priority: Major > Fix For: 2.0.0 > > Attachments: Screenshot_2018-07-12_14-10-24.png > > > By default sensors block a worker and just sleep between pokes. This is very > inefficient, especially when there are many long-running sensors. > There is a hacky workaroud by setting a small timeout value and a high retry > number. But that has drawbacks: > * Errors raised by sensors are hidden and the sensor retries too often > * The sensor is retried in a fixed time interval (with optional exponential > backoff) > * There are many attempts and many log files are generated > I'd like to propose an explicit reschedule mechanism: > * A new "reschedule" flag for sensors, if set to True it will raise an > AirflowRescheduleException that causes a reschedule. > * AirflowRescheduleException contains the (earliest) re-schedule date. > * Reschedule requests are recorded in new `task_reschedule` table and > visualized in the Gantt view. > * A new TI dependency that checks if a sensor task is ready to be > re-scheduled. > Advantages: > * This change is backward compatible. Existing sensors behave like before. > But it's possible to set the "reschedule" flag. > * The poke_interval, timeout, and soft_fail parameters are still respected > and used to calculate the next schedule time. > * Custom sensor implementations can even define the next sensible schedule > date by raising AirflowRescheduleException themselves. > * Existing TimeSensor and TimeDeltaSensor can also be changed to be > rescheduled when the time is reached. > * This mechanism can also be used by non-sensor operators (but then the new > ReadyToRescheduleDep has to be added to deps or BaseOperator). > Design decisions and caveats: > * When handling AirflowRescheduleException the `try_number` is decremented. > That means that subsequent runs use the same try number and write to the same > log file. > * Sensor TI dependency check now depends on `task_reschedule` table. However > only the BaseSensorOperator includes the new ReadyToRescheduleDep for now. > Open questions and TODOs: > * Should a dedicated state `UP_FOR_RESCHEDULE` be used instead of setting > the state back to `NONE`? This would require more changes in scheduler code > and especially in the UI, but the state of a task would be more explicit and > more transparent to the user. > * Add example/test for a non-sensor operator > * Document the new feature -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2747) Explicit re-schedule of sensors
[ https://issues.apache.org/jira/browse/AIRFLOW-2747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542108#comment-16542108 ] Pedro Machado commented on AIRFLOW-2747: [~seelmann] From the user point of view, my only feedback is that the UI should not show sensors that are still running as failed or up for retry as that would draw attention to things that are running as expected. I could not make full sense of the Gantt chart. I suppose the bars show in black represent reschedule executions of the sensor. Are the red ones retries? I also noticed that the `start_date` happens before the previous `reschedule_date`. Did I not read the table correctly? > Explicit re-schedule of sensors > --- > > Key: AIRFLOW-2747 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2747 > Project: Apache Airflow > Issue Type: Improvement > Components: core, operators >Affects Versions: 1.9.0 >Reporter: Stefan Seelmann >Assignee: Stefan Seelmann >Priority: Major > Fix For: 2.0.0 > > Attachments: Screenshot_2018-07-12_14-10-24.png > > > By default sensors block a worker and just sleep between pokes. This is very > inefficient, especially when there are many long-running sensors. > There is a hacky workaroud by setting a small timeout value and a high retry > number. But that has drawbacks: > * Errors raised by sensors are hidden and the sensor retries too often > * The sensor is retried in a fixed time interval (with optional exponential > backoff) > * There are many attempts and many log files are generated > I'd like to propose an explicit reschedule mechanism: > * A new "reschedule" flag for sensors, if set to True it will raise an > AirflowRescheduleException that causes a reschedule. > * AirflowRescheduleException contains the (earliest) re-schedule date. > * Reschedule requests are recorded in new `task_reschedule` table and > visualized in the Gantt view. > * A new TI dependency that checks if a sensor task is ready to be > re-scheduled. > Advantages: > * This change is backward compatible. Existing sensors behave like before. > But it's possible to set the "reschedule" flag. > * The poke_interval, timeout, and soft_fail parameters are still respected > and used to calculate the next schedule time. > * Custom sensor implementations can even define the next sensible schedule > date by raising AirflowRescheduleException themselves. > * Existing TimeSensor and TimeDeltaSensor can also be changed to be > rescheduled when the time is reached. > * This mechanism can also be used by non-sensor operators (but then the new > ReadyToRescheduleDep has to be added to deps or BaseOperator). > Design decisions and caveats: > * When handling AirflowRescheduleException the `try_number` is decremented. > That means that subsequent runs use the same try number and write to the same > log file. > * Sensor TI dependency check now depends on `task_reschedule` table. However > only the BaseSensorOperator includes the new ReadyToRescheduleDep for now. > Open questions and TODOs: > * Should a dedicated state `UP_FOR_RESCHEDULE` be used instead of setting > the state back to `NONE`? This would require more changes in scheduler code > and especially in the UI, but the state of a task would be more explicit and > more transparent to the user. > * Add example/test for a non-sensor operator > * Document the new feature -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-2749) Add Operator and hook to delete BigQuery dataset
Ivan created AIRFLOW-2749: - Summary: Add Operator and hook to delete BigQuery dataset Key: AIRFLOW-2749 URL: https://issues.apache.org/jira/browse/AIRFLOW-2749 Project: Apache Airflow Issue Type: New Feature Components: contrib, gcp Affects Versions: 1.10.0, 2.0.0 Reporter: Ivan Assignee: Ivan The feature to delete BigQuery dataset is missing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-1729) Ignore whole directories in .airflowignore
[ https://issues.apache.org/jira/browse/AIRFLOW-1729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541666#comment-16541666 ] Ash Berlin-Taylor commented on AIRFLOW-1729: Closer to a fuller fix is this diff: {code:python} diff --git a/airflow/models.py b/airflow/models.py index 089befef..e722b609 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -522,12 +522,27 @@ class DagBag(BaseDagBag, LoggingMixin): if os.path.isfile(dag_folder): self.process_file(dag_folder, only_if_updated=only_if_updated) elif os.path.isdir(dag_folder): +patterns_by_dir = {} for root, dirs, files in os.walk(dag_folder, followlinks=True): -patterns = [] +patterns = patterns_by_dir.get(root, []).copy() +self.log.info("Root %s dirs %r patterns %r", root, dirs, patterns) ignore_file = os.path.join(root, '.airflowignore') if os.path.isfile(ignore_file): +self.log.info("Loading %s", ignore_file) with open(ignore_file, 'r') as f: patterns += [p for p in f.read().split('\n') if p] +#dirs[:] = list[d for d in dirs if not any([re.findall(p, os.path.join(root, d)) for p in patterns])] + +# If we can ignore any subdirs entirely we should - fewer paths +# to walk is better. We have to modify the ``dirs`` array in +# place for this to affect os.walk +dirs[:] = [d for d in dirs if not any(re.findall(p, os.path.join(root, d)) for p in patterns)] + +# We want patterns defined in a parent folder's .airflowignore to +# apply to subdirs too +for d in dirs: +patterns_by_dir[os.path.join(root, d)] = patterns + for f in files: try: filepath = os.path.join(root, f) {code} Reasons I haven't just opened a PR with that: We need to add tests for this so it doesn't break again; we should de-duplicate between this code and the almost identical code in airflow.utils.dag_processing. > Ignore whole directories in .airflowignore > -- > > Key: AIRFLOW-1729 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1729 > Project: Apache Airflow > Issue Type: Improvement > Components: core >Affects Versions: Airflow 2.0 >Reporter: Cedric Hourcade >Assignee: Kamil Sambor >Priority: Minor > Fix For: 2.0.0 > > > The .airflowignore file allows to prevent scanning files for DAG. But even if > we blacklist fulldirectory the {{os.walk}} will still go through them no > matter how deep they are and skip files one by one, which can be an issue > when you keep around big .git or virtualvenv directories. > I suggest to add something like: > {code} > dirs[:] = [d for d in dirs if not any([re.findall(p, os.path.join(root, d)) > for p in patterns])] > {code} > to prune the directories here: > https://github.com/apache/incubator-airflow/blob/cfc2f73c445074e1e09d6ef6a056cd2b33a945da/airflow/utils/dag_processing.py#L208-L209 > and in {{list_py_file_paths}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (AIRFLOW-1729) Ignore whole directories in .airflowignore
[ https://issues.apache.org/jira/browse/AIRFLOW-1729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ash Berlin-Taylor reopened AIRFLOW-1729: PR that marked this as closed hasn't resolved this issue. > Ignore whole directories in .airflowignore > -- > > Key: AIRFLOW-1729 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1729 > Project: Apache Airflow > Issue Type: Improvement > Components: core >Affects Versions: Airflow 2.0 >Reporter: Cedric Hourcade >Assignee: Kamil Sambor >Priority: Minor > Fix For: 2.0.0 > > > The .airflowignore file allows to prevent scanning files for DAG. But even if > we blacklist fulldirectory the {{os.walk}} will still go through them no > matter how deep they are and skip files one by one, which can be an issue > when you keep around big .git or virtualvenv directories. > I suggest to add something like: > {code} > dirs[:] = [d for d in dirs if not any([re.findall(p, os.path.join(root, d)) > for p in patterns])] > {code} > to prune the directories here: > https://github.com/apache/incubator-airflow/blob/cfc2f73c445074e1e09d6ef6a056cd2b33a945da/airflow/utils/dag_processing.py#L208-L209 > and in {{list_py_file_paths}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2729) .airflowignore is not being respected
[ https://issues.apache.org/jira/browse/AIRFLOW-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541594#comment-16541594 ] Ash Berlin-Taylor commented on AIRFLOW-2729: I was going to say "I can't reproduce this" (which I said on the mailing list) but I can. With some extra debug logging against 1.9: {code} [2018-07-12 13:56:52,834] {models.py:427} INFO - 1.9 Testing '/Users/ash/airflow/dags/airflow-tasks/testdag.py' against ['.*submodules/*'] [2018-07-12 13:56:52,839] {models.py:427} INFO - 1.9 Testing '/Users/ash/airflow/dags/airflow-tasks/submodules/quantflow/quantflow/operators/zipline_operators.py' against ['.*submodules/*'] {code} The same logging in 1.10 {code} [2018-07-12 13:58:17,454] {models.py:540} INFO - 1.10 Testing '/Users/ash/airflow/dags/airflow-tasks/testdag.py' against ['.*submodules/*'] [2018-07-12 13:58:17,463] {models.py:540} INFO - 1.10 Testing '/Users/ash/airflow/dags/airflow-tasks/submodules/quantflow/quantflow/operators/zipline_operators.py' against [] {code} It's definitely not behaving the same way. > .airflowignore is not being respected > - > > Key: AIRFLOW-2729 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2729 > Project: Apache Airflow > Issue Type: Bug >Affects Versions: 1.10 >Reporter: James Meickle >Priority: Minor > > I have a repo that in 1.10 is giving airflowignore errors that did not exist > in 1.9. I have a DAG repo with the following .airflowignore: > {{airflow@XXX:~$ ls -la /home/airflow/airflow/dags/airflow-tasks/}} > {{total 172}} > {{drwxr-xr-x 6 airflow airflow 4096 Jul 9 18:48 .}} > {{drwxrwxr-x 3 airflow airflow 4096 Jul 9 18:48 ..}} > {{-rw-r--r-- 1 airflow airflow 13 Jul 9 16:20 .airflowignore}} > {{airflow@airflow-core-i-063df3268720e58fd:~$ cat > /home/airflow/airflow/dags/airflow-tasks/.airflowignore}} > {{submodules/*}} > However, the submoduled repository is being scanned for DAGs anyways, > including the test suite. Note the paths in the section below: > > {{Jul 09 18:52:01 airflow_web-stdout.log: [2018-07-09 18:52:01,814] > \{{models.py:351}} DEBUG - Importing > /home/airflow/airflow/dags/airflow-tasks/submodules/quantflow/quantflow/operators/zipline_operators.py}} > {{Jul 09 18:52:01 airflow_web-stdout.log: [2018-07-09 18:52:01,817] > \{{models.py:351}} DEBUG - Importing > /home/airflow/airflow/dags/airflow-tasks/submodules/quantflow/tests/operators/test_sqs_operators.py}} > {{Jul 09 18:52:01 airflow_web-stdout.log: [2018-07-09 18:52:01,818] > \{{models.py:365}} ERROR - Failed to import: > /home/airflow/airflow/dags/airflow-tasks/submodules/quantflow/tests/operators/test_sqs_operators.py}} > {{Jul 09 18:52:01 airflow_web-stdout.log: Traceback (most recent call last):}} > {{Jul 09 18:52:01 airflow_web-stdout.log: File > "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py", > line 362, in process_file}} > {{Jul 09 18:52:01 airflow_web-stdout.log: m = imp.load_source(mod_name, > filepath)}} > {{Jul 09 18:52:01 airflow_web-stdout.log: File > "/home/airflow/virtualenvs/airflow/lib/python3.5/imp.py", line 172, in > load_source}} > {{Jul 09 18:52:01 airflow_web-stdout.log: module = _load(spec)}} > {{Jul 09 18:52:01 airflow_web-stdout.log: File " importlib._bootstrap>", line 693, in _load}} > {{Jul 09 18:52:01 airflow_web-stdout.log: File " importlib._bootstrap>", line 673, in _load_unlocked}} > {{Jul 09 18:52:01 airflow_web-stdout.log: File " importlib._bootstrap_external>", line 665, in exec_module}} > {{Jul 09 18:52:01 airflow_web-stdout.log: File " importlib._bootstrap>", line 222, in _call_with_frames_removed}} > {{Jul 09 18:52:01 airflow_web-stdout.log: File > "/home/airflow/airflow/dags/airflow-tasks/submodules/quantflow/tests/operators/test_sqs_operators.py", > line 6, in }} > {{Jul 09 18:52:01 airflow_web-stdout.log: from moto import mock_sqs}} > {{Jul 09 18:52:01 airflow_web-stdout.log: ImportError: No module named > 'moto'}} > {{Jul 09 18:52:01 airflow_web-stdout.log: [2018-07-09 18:52:01,821] > \{{models.py:351}} DEBUG - Importing > /home/airflow/airflow/dags/airflow-tasks/submodules/quantflow/tests/operators/test_zipline_operators.py}} > {{Jul 09 18:52:01 airflow_web-stdout.log: [2018-07-09 18:52:01,822] > \{{models.py:365}} ERROR - Failed to import: > /home/airflow/airflow/dags/airflow-tasks/submodules/quantflow/tests/operators/test_zipline_operators.py}} > {{Jul 09 18:52:01 airflow_web-stdout.log: Traceback (most recent call last):}} > {{Jul 09 18:52:01 airflow_web-stdout.log: File > "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py", > line 362, in process_file}} > {{Jul 09 18:52:01 airflow_web-stdout.log: m = imp.load_source(mod_name, > filepath)}} > {{Jul 09 18:52:01 airflow_web-stdout.log: File > "/home/airflow/virtualen
[jira] [Updated] (AIRFLOW-2729) .airflowignore is not being respected
[ https://issues.apache.org/jira/browse/AIRFLOW-2729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ash Berlin-Taylor updated AIRFLOW-2729: --- Description: I have a repo that in 1.10 is giving airflowignore errors that did not exist in 1.9. I have a DAG repo with the following .airflowignore: {{airflow@XXX:~$ ls -la /home/airflow/airflow/dags/airflow-tasks/}} {{total 172}} {{drwxr-xr-x 6 airflow airflow 4096 Jul 9 18:48 .}} {{drwxrwxr-x 3 airflow airflow 4096 Jul 9 18:48 ..}} {{-rw-r--r-- 1 airflow airflow 13 Jul 9 16:20 .airflowignore}} {{airflow@airflow-core-i-063df3268720e58fd:~$ cat /home/airflow/airflow/dags/airflow-tasks/.airflowignore}} {{submodules/*}} However, the submoduled repository is being scanned for DAGs anyways, including the test suite. Note the paths in the section below: {{Jul 09 18:52:01 airflow_web-stdout.log: [2018-07-09 18:52:01,814] \{{models.py:351}} DEBUG - Importing /home/airflow/airflow/dags/airflow-tasks/submodules/quantflow/quantflow/operators/zipline_operators.py}} {{Jul 09 18:52:01 airflow_web-stdout.log: [2018-07-09 18:52:01,817] \{{models.py:351}} DEBUG - Importing /home/airflow/airflow/dags/airflow-tasks/submodules/quantflow/tests/operators/test_sqs_operators.py}} {{Jul 09 18:52:01 airflow_web-stdout.log: [2018-07-09 18:52:01,818] \{{models.py:365}} ERROR - Failed to import: /home/airflow/airflow/dags/airflow-tasks/submodules/quantflow/tests/operators/test_sqs_operators.py}} {{Jul 09 18:52:01 airflow_web-stdout.log: Traceback (most recent call last):}} {{Jul 09 18:52:01 airflow_web-stdout.log: File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py", line 362, in process_file}} {{Jul 09 18:52:01 airflow_web-stdout.log: m = imp.load_source(mod_name, filepath)}} {{Jul 09 18:52:01 airflow_web-stdout.log: File "/home/airflow/virtualenvs/airflow/lib/python3.5/imp.py", line 172, in load_source}} {{Jul 09 18:52:01 airflow_web-stdout.log: module = _load(spec)}} {{Jul 09 18:52:01 airflow_web-stdout.log: File "", line 693, in _load}} {{Jul 09 18:52:01 airflow_web-stdout.log: File "", line 673, in _load_unlocked}} {{Jul 09 18:52:01 airflow_web-stdout.log: File "", line 665, in exec_module}} {{Jul 09 18:52:01 airflow_web-stdout.log: File "", line 222, in _call_with_frames_removed}} {{Jul 09 18:52:01 airflow_web-stdout.log: File "/home/airflow/airflow/dags/airflow-tasks/submodules/quantflow/tests/operators/test_sqs_operators.py", line 6, in }} {{Jul 09 18:52:01 airflow_web-stdout.log: from moto import mock_sqs}} {{Jul 09 18:52:01 airflow_web-stdout.log: ImportError: No module named 'moto'}} {{Jul 09 18:52:01 airflow_web-stdout.log: [2018-07-09 18:52:01,821] \{{models.py:351}} DEBUG - Importing /home/airflow/airflow/dags/airflow-tasks/submodules/quantflow/tests/operators/test_zipline_operators.py}} {{Jul 09 18:52:01 airflow_web-stdout.log: [2018-07-09 18:52:01,822] \{{models.py:365}} ERROR - Failed to import: /home/airflow/airflow/dags/airflow-tasks/submodules/quantflow/tests/operators/test_zipline_operators.py}} {{Jul 09 18:52:01 airflow_web-stdout.log: Traceback (most recent call last):}} {{Jul 09 18:52:01 airflow_web-stdout.log: File "/home/airflow/virtualenvs/airflow/lib/python3.5/site-packages/airflow/models.py", line 362, in process_file}} {{Jul 09 18:52:01 airflow_web-stdout.log: m = imp.load_source(mod_name, filepath)}} {{Jul 09 18:52:01 airflow_web-stdout.log: File "/home/airflow/virtualenvs/airflow/lib/python3.5/imp.py", line 172, in load_source}} {{Jul 09 18:52:01 airflow_web-stdout.log: module = _load(spec)}} {{Jul 09 18:52:01 airflow_web-stdout.log: File "", line 693, in _load}} {{Jul 09 18:52:01 airflow_web-stdout.log: File "", line 673, in _load_unlocked}} {{Jul 09 18:52:01 airflow_web-stdout.log: File "", line 665, in exec_module}} {{Jul 09 18:52:01 airflow_web-stdout.log: File "", line 222, in _call_with_frames_removed}} {{Jul 09 18:52:01 airflow_web-stdout.log: File "/home/airflow/airflow/dags/airflow-tasks/submodules/quantflow/tests/operators/test_zipline_operators.py", line 6, in }} {{Jul 09 18:52:01 airflow_web-stdout.log: from freezegun import freeze_time}} {{Jul 09 18:52:01 airflow_web-stdout.log: ImportError: No module named 'freezegun'}} was: I have a repo that in 1.10 is giving airflowignore errors that did not exist in 1.9. I have a DAG repo with the following .airflowignore: {{airflow@XXX:~$ ls -la /home/airflow/airflow/dags/airflow-tasks/}} {{total 172}} {{drwxr-xr-x 6 airflow airflow 4096 Jul 9 18:48 .}} {{drwxrwxr-x 3 airflow airflow 4096 Jul 9 18:48 ..}} {{-rw-r--r-- 1 airflow airflow 13 Jul 9 16:20 .airflowignore}} {{airflow@airflow-core-i-063df3268720e58fd:~$ cat /home/airflow/airflow/dags/airflow-tasks/.airflowignore }} {{submodules/*}} However, the submoduled repository is being scanned for DAGs anyways, including the test suite. Note the paths in the section below: {{Jul 09 18:52:01 airfl
[jira] [Updated] (AIRFLOW-2747) Explicit re-schedule of sensors
[ https://issues.apache.org/jira/browse/AIRFLOW-2747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Seelmann updated AIRFLOW-2747: - Description: By default sensors block a worker and just sleep between pokes. This is very inefficient, especially when there are many long-running sensors. There is a hacky workaroud by setting a small timeout value and a high retry number. But that has drawbacks: * Errors raised by sensors are hidden and the sensor retries too often * The sensor is retried in a fixed time interval (with optional exponential backoff) * There are many attempts and many log files are generated I'd like to propose an explicit reschedule mechanism: * A new "reschedule" flag for sensors, if set to True it will raise an AirflowRescheduleException that causes a reschedule. * AirflowRescheduleException contains the (earliest) re-schedule date. * Reschedule requests are recorded in new `task_reschedule` table and visualized in the Gantt view. * A new TI dependency that checks if a sensor task is ready to be re-scheduled. Advantages: * This change is backward compatible. Existing sensors behave like before. But it's possible to set the "reschedule" flag. * The poke_interval, timeout, and soft_fail parameters are still respected and used to calculate the next schedule time. * Custom sensor implementations can even define the next sensible schedule date by raising AirflowRescheduleException themselves. * Existing TimeSensor and TimeDeltaSensor can also be changed to be rescheduled when the time is reached. * This mechanism can also be used by non-sensor operators (but then the new ReadyToRescheduleDep has to be added to deps or BaseOperator). Design decisions and caveats: * When handling AirflowRescheduleException the `try_number` is decremented. That means that subsequent runs use the same try number and write to the same log file. * Sensor TI dependency check now depends on `task_reschedule` table. However only the BaseSensorOperator includes the new ReadyToRescheduleDep for now. Open questions and TODOs: * Should a dedicated state `UP_FOR_RESCHEDULE` be used instead of setting the state back to `NONE`? This would require more changes in scheduler code and especially in the UI, but the state of a task would be more explicit and more transparent to the user. * Add example/test for a non-sensor operator * Document the new feature was: By default sensors block a worker and just sleep between pokes. This is very inefficient, especially when there are many long-running sensors. There is a hacky workaroud by setting a small timeout value and a high retry number. But that has drawbacks: * Errors throws by sensors are hidden and the sensor retries too often * The sensor is retried in a fixed time interval (with optional exponential backoff) * There are many attempts and many log files are generated I'd like to propose an explicit reschedule mechanism: * A new "reschedule" flag for sensors, if set to True it will raise an AirflowRescheduleException that causes a reschedule. * AirflowRescheduleException contains the (earliest) re-schedule date. * Reschedule requests are recorded in new `task_reschedule` table and visualized in the Gantt view. * A new TI dependency that checks if a sensor task is ready to be re-scheduled. Advantages: * This change is backward compatible. Existing sensors behave like before. But it's possible to set the "reschedule" flag. * The poke_interval, timeout, and soft_fail parameters are still respected and used to calculate the next schedule time. * Custom sensor implementations can even define the next sensible schedule date by raising AirflowRescheduleException themselves. * Existing TimeSensor and TimeDeltaSensor can also be changed to be rescheduled when the time is reached. * This mechanism can also be used by non-sensor operators (but then the new ReadyToRescheduleDep has to be added to deps or BaseOperator). Design decisions and caveats: * When handling AirflowRescheduleException the `try_number` is decremented. That means that subsequent runs use the same try number and write to the same log file. * Sensor TI dependency check now depends on `task_reschedule` table. However only the BaseSensorOperator includes the new ReadyToRescheduleDep for now. Open questions and TODOs: * Should a dedicated state `UP_FOR_RESCHEDULE` be used instead of setting the state back to `NONE`? This would require more changes in scheduler code and especially in the UI, but the state of a task would be more explicit and more transparent to the user. * Add example/test for a non-sensor operator * Document the new feature > Explicit re-schedule of sensors > --- > > Key: AIRFLOW-2747 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2747 > Project: Apache Airflow > I
[jira] [Created] (AIRFLOW-2748) dbexportcmd & dbimportcmd should support use_customer_cluster, use_customer_label in QuboleOperator
Joy Lal Chattaraj created AIRFLOW-2748: -- Summary: dbexportcmd & dbimportcmd should support use_customer_cluster, use_customer_label in QuboleOperator Key: AIRFLOW-2748 URL: https://issues.apache.org/jira/browse/AIRFLOW-2748 Project: Apache Airflow Issue Type: Improvement Reporter: Joy Lal Chattaraj Assignee: Joy Lal Chattaraj A customer pointed out that the QuboleOperator & qubole_hook in the Airflow codebase doesn't support `use_customer_cluster`, `customer_cluster_label` and `hive_serde` in Airflow. This is a request for adding the above functionality. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2747) Explicit re-schedule of sensors
[ https://issues.apache.org/jira/browse/AIRFLOW-2747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541546#comment-16541546 ] Stefan Seelmann commented on AIRFLOW-2747: -- Initial PR: https://github.com/apache/incubator-airflow/pull/3596 > Explicit re-schedule of sensors > --- > > Key: AIRFLOW-2747 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2747 > Project: Apache Airflow > Issue Type: Improvement > Components: core, operators >Affects Versions: 1.9.0 >Reporter: Stefan Seelmann >Assignee: Stefan Seelmann >Priority: Major > Fix For: 2.0.0 > > Attachments: Screenshot_2018-07-12_14-10-24.png > > > By default sensors block a worker and just sleep between pokes. This is very > inefficient, especially when there are many long-running sensors. > There is a hacky workaroud by setting a small timeout value and a high retry > number. But that has drawbacks: > * Errors throws by sensors are hidden and the sensor retries too often > * The sensor is retried in a fixed time interval (with optional exponential > backoff) > * There are many attempts and many log files are generated > I'd like to propose an explicit reschedule mechanism: > * A new "reschedule" flag for sensors, if set to True it will raise an > AirflowRescheduleException that causes a reschedule. > * AirflowRescheduleException contains the (earliest) re-schedule date. > * Reschedule requests are recorded in new `task_reschedule` table and > visualized in the Gantt view. > * A new TI dependency that checks if a sensor task is ready to be > re-scheduled. > Advantages: > * This change is backward compatible. Existing sensors behave like before. > But it's possible to set the "reschedule" flag. > * The poke_interval, timeout, and soft_fail parameters are still respected > and used to calculate the next schedule time. > * Custom sensor implementations can even define the next sensible schedule > date by raising AirflowRescheduleException themselves. > * Existing TimeSensor and TimeDeltaSensor can also be changed to be > rescheduled when the time is reached. > * This mechanism can also be used by non-sensor operators (but then the new > ReadyToRescheduleDep has to be added to deps or BaseOperator). > Design decisions and caveats: > * When handling AirflowRescheduleException the `try_number` is decremented. > That means that subsequent runs use the same try number and write to the same > log file. > * Sensor TI dependency check now depends on `task_reschedule` table. However > only the BaseSensorOperator includes the new ReadyToRescheduleDep for now. > Open questions and TODOs: > * Should a dedicated state `UP_FOR_RESCHEDULE` be used instead of setting > the state back to `NONE`? This would require more changes in scheduler code > and especially in the UI, but the state of a task would be more explicit and > more transparent to the user. > * Add example/test for a non-sensor operator > * Document the new feature -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-2747) Explicit re-schedule of sensors
[ https://issues.apache.org/jira/browse/AIRFLOW-2747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stefan Seelmann updated AIRFLOW-2747: - Attachment: Screenshot_2018-07-12_14-10-24.png > Explicit re-schedule of sensors > --- > > Key: AIRFLOW-2747 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2747 > Project: Apache Airflow > Issue Type: Improvement > Components: core, operators >Affects Versions: 1.9.0 >Reporter: Stefan Seelmann >Assignee: Stefan Seelmann >Priority: Major > Fix For: 2.0.0 > > Attachments: Screenshot_2018-07-12_14-10-24.png > > > By default sensors block a worker and just sleep between pokes. This is very > inefficient, especially when there are many long-running sensors. > There is a hacky workaroud by setting a small timeout value and a high retry > number. But that has drawbacks: > * Errors throws by sensors are hidden and the sensor retries too often > * The sensor is retried in a fixed time interval (with optional exponential > backoff) > * There are many attempts and many log files are generated > I'd like to propose an explicit reschedule mechanism: > * A new "reschedule" flag for sensors, if set to True it will raise an > AirflowRescheduleException that causes a reschedule. > * AirflowRescheduleException contains the (earliest) re-schedule date. > * Reschedule requests are recorded in new `task_reschedule` table and > visualized in the Gantt view. > * A new TI dependency that checks if a sensor task is ready to be > re-scheduled. > Advantages: > * This change is backward compatible. Existing sensors behave like before. > But it's possible to set the "reschedule" flag. > * The poke_interval, timeout, and soft_fail parameters are still respected > and used to calculate the next schedule time. > * Custom sensor implementations can even define the next sensible schedule > date by raising AirflowRescheduleException themselves. > * Existing TimeSensor and TimeDeltaSensor can also be changed to be > rescheduled when the time is reached. > * This mechanism can also be used by non-sensor operators (but then the new > ReadyToRescheduleDep has to be added to deps or BaseOperator). > Design decisions and caveats: > * When handling AirflowRescheduleException the `try_number` is decremented. > That means that subsequent runs use the same try number and write to the same > log file. > * Sensor TI dependency check now depends on `task_reschedule` table. However > only the BaseSensorOperator includes the new ReadyToRescheduleDep for now. > Open questions and TODOs: > * Should a dedicated state `UP_FOR_RESCHEDULE` be used instead of setting > the state back to `NONE`? This would require more changes in scheduler code > and especially in the UI, but the state of a task would be more explicit and > more transparent to the user. > * Add example/test for a non-sensor operator > * Document the new feature -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2747) Explicit re-schedule of sensors
[ https://issues.apache.org/jira/browse/AIRFLOW-2747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541539#comment-16541539 ] Stefan Seelmann commented on AIRFLOW-2747: -- Screenshot of the Gantt view for an example DAG run: !Screenshot_2018-07-12_14-10-24.png! And the corresponding rows in task_reschedule table: {noformat} $ select * from task_reschedule where execution_date='2018-07-12T12:06:28.988028' order by id; id | task_id | dag_id |execution_date | try_number | start_date | end_date| duration | reschedule_date +-++---++---+---+--+--- 42 | s3 | dummy | 2018-07-12 12:06:28.988028+00 | 1 | 2018-07-12 12:06:54.430185+00 | 2018-07-12 12:06:59.339554+00 |5 | 2018-07-12 12:07:14.312456+00 44 | s2 | dummy | 2018-07-12 12:06:28.988028+00 | 2 | 2018-07-12 12:07:09.381193+00 | 2018-07-12 12:07:12.480702+00 |3 | 2018-07-12 12:07:22.467206+00 45 | s3 | dummy | 2018-07-12 12:06:28.988028+00 | 1 | 2018-07-12 12:07:17.111816+00 | 2018-07-12 12:07:18.444199+00 |1 | 2018-07-12 12:07:33.4376+00 47 | s2 | dummy | 2018-07-12 12:06:28.988028+00 | 3 | 2018-07-12 12:07:34.499979+00 | 2018-07-12 12:07:35.834609+00 |1 | 2018-07-12 12:07:45.817533+00 49 | s2 | dummy | 2018-07-12 12:06:28.988028+00 | 3 | 2018-07-12 12:07:49.407569+00 | 2018-07-12 12:07:50.843526+00 |1 | 2018-07-12 12:08:00.834584+00 51 | s2 | dummy | 2018-07-12 12:06:28.988028+00 | 4 | 2018-07-12 12:08:14.526+00| 2018-07-12 12:08:15.768907+00 |1 | 2018-07-12 12:08:25.762619+00 53 | s2 | dummy | 2018-07-12 12:06:28.988028+00 | 4 | 2018-07-12 12:08:29.329766+00 | 2018-07-12 12:08:31.168762+00 |2 | 2018-07-12 12:08:41.160209+00 {noformat} > Explicit re-schedule of sensors > --- > > Key: AIRFLOW-2747 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2747 > Project: Apache Airflow > Issue Type: Improvement > Components: core, operators >Affects Versions: 1.9.0 >Reporter: Stefan Seelmann >Assignee: Stefan Seelmann >Priority: Major > Fix For: 2.0.0 > > Attachments: Screenshot_2018-07-12_14-10-24.png > > > By default sensors block a worker and just sleep between pokes. This is very > inefficient, especially when there are many long-running sensors. > There is a hacky workaroud by setting a small timeout value and a high retry > number. But that has drawbacks: > * Errors throws by sensors are hidden and the sensor retries too often > * The sensor is retried in a fixed time interval (with optional exponential > backoff) > * There are many attempts and many log files are generated > I'd like to propose an explicit reschedule mechanism: > * A new "reschedule" flag for sensors, if set to True it will raise an > AirflowRescheduleException that causes a reschedule. > * AirflowRescheduleException contains the (earliest) re-schedule date. > * Reschedule requests are recorded in new `task_reschedule` table and > visualized in the Gantt view. > * A new TI dependency that checks if a sensor task is ready to be > re-scheduled. > Advantages: > * This change is backward compatible. Existing sensors behave like before. > But it's possible to set the "reschedule" flag. > * The poke_interval, timeout, and soft_fail parameters are still respected > and used to calculate the next schedule time. > * Custom sensor implementations can even define the next sensible schedule > date by raising AirflowRescheduleException themselves. > * Existing TimeSensor and TimeDeltaSensor can also be changed to be > rescheduled when the time is reached. > * This mechanism can also be used by non-sensor operators (but then the new > ReadyToRescheduleDep has to be added to deps or BaseOperator). > Design decisions and caveats: > * When handling AirflowRescheduleException the `try_number` is decremented. > That means that subsequent runs use the same try number and write to the same > log file. > * Sensor TI dependency check now depends on `task_reschedule` table. However > only the BaseSensorOperator includes the new ReadyToRescheduleDep for now. > Open questions and TODOs: > * Should a dedicated state `UP_FOR_RESCHEDULE` be used instead of setting > the state back to `NONE`? This would require more changes in scheduler code > and especially in the UI, but the state of a task would be more explicit and > more transparent to the user. > * Add example/test for a non-sensor operator > * Document the new feature -- This
[jira] [Closed] (AIRFLOW-1566) 'airflow.www.gunicorn_config doesn't exist
[ https://issues.apache.org/jira/browse/AIRFLOW-1566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dud closed AIRFLOW-1566. Resolution: Won't Fix > 'airflow.www.gunicorn_config doesn't exist > -- > > Key: AIRFLOW-1566 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1566 > Project: Apache Airflow > Issue Type: Bug >Reporter: dud >Priority: Major > Fix For: 1.8.2 > > > Hello > starting Airflow v1.8.2 webserver yields this error : > {code} > $ airflow webserver --pid /run/airflow/webserver.pid -hn localhost > /home/airflow/.local/lib/python2.7/site-packages/airflow/configuration.py:540: > DeprecationWarning: This method will be removed in future versions. Use > 'parser.read_file()' instead. > self.readfp(StringIO.StringIO(string)) > [2017-09-05 15:19:05,012] {__init__.py:57} INFO - Using executor > CeleryExecutor > [2017-09-05 15:19:05,063] {driver.py:120} INFO - Generating grammar tables > from /usr/lib/python2.7/lib2to3/Grammar.txt > [2017-09-05 15:19:05,083] {driver.py:120} INFO - Generating grammar tables > from /usr/lib/python2.7/lib2to3/PatternGrammar.txt > /home/airflow/.local/lib/python2.7/site-packages/airflow/www/app.py:23: > FlaskWTFDeprecationWarning: "flask_wtf.CsrfProtect" has been renamed to > "CSRFProtect" and will be removed in 1.0. > csrf = CsrfProtect() > _ > |__( )_ __/__ / __ > /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / / > ___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / > _/_/ |_/_/ /_//_//_/ \//|__/ > /home/airflow/.local/lib/python2.7/site-packages/flask_cache/jinja2ext.py:33: > ExtDeprecationWarning: Importing flask.ext.cache is deprecated, use > flask_cache instead. > from flask.ext.cache import make_template_fragment_key > [2017-09-05 15:19:05,387] [26152] {models.py:168} INFO - Filling up the > DagBag from /home/airflow/dags > Running the Gunicorn Server with: > Workers: 4 sync > Host: localhost:8080 > Timeout: 120 > Logfiles: - - > = > Error: 'airflow.www.gunicorn_config' doesn't exist > {code} > The use of this file has been introduced by this > [commit|https://github.com/apache/incubator-airflow/commit/9d254a317dd54f555270ca568aff1cd0500e1e53] > authored by [~xuanji] > dud -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-2747) Explicit re-schedule of sensors
Stefan Seelmann created AIRFLOW-2747: Summary: Explicit re-schedule of sensors Key: AIRFLOW-2747 URL: https://issues.apache.org/jira/browse/AIRFLOW-2747 Project: Apache Airflow Issue Type: Improvement Components: core, operators Affects Versions: 1.9.0 Reporter: Stefan Seelmann Assignee: Stefan Seelmann Fix For: 2.0.0 By default sensors block a worker and just sleep between pokes. This is very inefficient, especially when there are many long-running sensors. There is a hacky workaroud by setting a small timeout value and a high retry number. But that has drawbacks: * Errors throws by sensors are hidden and the sensor retries too often * The sensor is retried in a fixed time interval (with optional exponential backoff) * There are many attempts and many log files are generated I'd like to propose an explicit reschedule mechanism: * A new "reschedule" flag for sensors, if set to True it will raise an AirflowRescheduleException that causes a reschedule. * AirflowRescheduleException contains the (earliest) re-schedule date. * Reschedule requests are recorded in new `task_reschedule` table and visualized in the Gantt view. * A new TI dependency that checks if a sensor task is ready to be re-scheduled. Advantages: * This change is backward compatible. Existing sensors behave like before. But it's possible to set the "reschedule" flag. * The poke_interval, timeout, and soft_fail parameters are still respected and used to calculate the next schedule time. * Custom sensor implementations can even define the next sensible schedule date by raising AirflowRescheduleException themselves. * Existing TimeSensor and TimeDeltaSensor can also be changed to be rescheduled when the time is reached. * This mechanism can also be used by non-sensor operators (but then the new ReadyToRescheduleDep has to be added to deps or BaseOperator). Design decisions and caveats: * When handling AirflowRescheduleException the `try_number` is decremented. That means that subsequent runs use the same try number and write to the same log file. * Sensor TI dependency check now depends on `task_reschedule` table. However only the BaseSensorOperator includes the new ReadyToRescheduleDep for now. Open questions and TODOs: * Should a dedicated state `UP_FOR_RESCHEDULE` be used instead of setting the state back to `NONE`? This would require more changes in scheduler code and especially in the UI, but the state of a task would be more explicit and more transparent to the user. * Add example/test for a non-sensor operator * Document the new feature -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-1566) 'airflow.www.gunicorn_config doesn't exist
[ https://issues.apache.org/jira/browse/AIRFLOW-1566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16541382#comment-16541382 ] Rehi, Deepesh commented on AIRFLOW-1566: Hi team, We have resolved this by downgrading to gunicorn- 19.3.0: pip install gunicorn==19.3.0 Regards, Deepesh Rehi > 'airflow.www.gunicorn_config doesn't exist > -- > > Key: AIRFLOW-1566 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1566 > Project: Apache Airflow > Issue Type: Bug >Reporter: dud >Priority: Major > Fix For: 1.8.2 > > > Hello > starting Airflow v1.8.2 webserver yields this error : > {code} > $ airflow webserver --pid /run/airflow/webserver.pid -hn localhost > /home/airflow/.local/lib/python2.7/site-packages/airflow/configuration.py:540: > DeprecationWarning: This method will be removed in future versions. Use > 'parser.read_file()' instead. > self.readfp(StringIO.StringIO(string)) > [2017-09-05 15:19:05,012] {__init__.py:57} INFO - Using executor > CeleryExecutor > [2017-09-05 15:19:05,063] {driver.py:120} INFO - Generating grammar tables > from /usr/lib/python2.7/lib2to3/Grammar.txt > [2017-09-05 15:19:05,083] {driver.py:120} INFO - Generating grammar tables > from /usr/lib/python2.7/lib2to3/PatternGrammar.txt > /home/airflow/.local/lib/python2.7/site-packages/airflow/www/app.py:23: > FlaskWTFDeprecationWarning: "flask_wtf.CsrfProtect" has been renamed to > "CSRFProtect" and will be removed in 1.0. > csrf = CsrfProtect() > _ > |__( )_ __/__ / __ > /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / / > ___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ / > _/_/ |_/_/ /_//_//_/ \//|__/ > /home/airflow/.local/lib/python2.7/site-packages/flask_cache/jinja2ext.py:33: > ExtDeprecationWarning: Importing flask.ext.cache is deprecated, use > flask_cache instead. > from flask.ext.cache import make_template_fragment_key > [2017-09-05 15:19:05,387] [26152] {models.py:168} INFO - Filling up the > DagBag from /home/airflow/dags > Running the Gunicorn Server with: > Workers: 4 sync > Host: localhost:8080 > Timeout: 120 > Logfiles: - - > = > Error: 'airflow.www.gunicorn_config' doesn't exist > {code} > The use of this file has been introduced by this > [commit|https://github.com/apache/incubator-airflow/commit/9d254a317dd54f555270ca568aff1cd0500e1e53] > authored by [~xuanji] > dud -- This message was sent by Atlassian JIRA (v7.6.3#76005)