[ 
https://issues.apache.org/jira/browse/AIRFLOW-503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15722248#comment-15722248
 ] 

Matthias Huschle commented on AIRFLOW-503:
------------------------------------------

I guess your problem lies in "allowed_states=[all]". "all" is a built-in 
function. The argument should be a list of states, as defined in the "State" 
class.


> ExternalTaskSensor causes runtime exception
> -------------------------------------------
>
>                 Key: AIRFLOW-503
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-503
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: db, operators
>    Affects Versions: Airflow 1.7.1
>         Environment: airflow 1.7.1.3.
> postgress 9.2.13 (backend DB)
> OS   Red Hat Enterprise Linux Server 7.2 (Maipo)
> python 2.7.5
>            Reporter: Hila Visan
>            Priority: Critical
>
> I just created a new task using ExternalTaskSensor between weekly dag and 
> daily dag (named 'services_daily_sensor') .
> When I tried to test it, i ran the command:
> 'airflow test weekly_agg services_daily_sensor 2016-09-11T06:00:00'.
> The command failed with the following error:
>  
> ervices_daily_sensor> on 2016-09-11 06:00:00
> [2016-09-11 08:59:09,602] {sensors.py:195} INFO - Poking for 
> daily_agg.services_daily_task on 2016-09-11 02:00:00 ...
> [2016-09-11 08:59:09,614] {models.py:1286} ERROR - 
> (psycopg2.ProgrammingError) can't adapt type 'builtin_function_or_method' 
> [SQL: 'SELECT count(*) AS count_1 \nFROM (SELECT task_instance.task_id AS 
> task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, 
> task_instance.execution_date AS task_instance_execution_date, 
> task_instance.start_date AS task_instance_start_date, task_instance.end_date 
> AS task_instance_end_date, task_instance.duration AS task_instance_duration, 
> task_instance.state AS task_instance_state, task_instance.try_number AS 
> task_instance_try_number, task_instance.hostname AS task_instance_hostname, 
> task_instance.unixname AS task_instance_unixname, task_instance.job_id AS 
> task_instance_job_id, task_instance.pool AS task_instance_pool, 
> task_instance.queue AS task_instance_queue, task_instance.priority_weight AS 
> task_instance_priority_weight, task_instance.operator AS 
> task_instance_operator, task_instance.queued_dttm AS 
> task_instance_queued_dttm \nFROM task_instance \nWHERE task_instance.dag_id = 
> %(dag_id_1)s AND task_instance.task_id = %(task_id_1)s AND 
> task_instance.state IN (%(state_1)s) AND task_instance.execution_date = 
> %(execution_date_1)s) AS anon_1'] [parameters: {'state_1': <built-in function 
> all>, 'execution_date_1': datetime.datetime(2016, 9, 11, 2, 0), 'dag_id_1': 
> 'daily_agg', 'task_id_1': 'services_daily_task'}]
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1242, in run
>     result = task_copy.execute(context=context)
>   File "/usr/lib/python2.7/site-packages/airflow/operators/sensors.py", line 
> 56, in execute
>     while not self.poke(context):
>   File "/usr/lib/python2.7/site-packages/airflow/operators/sensors.py", line 
> 203, in poke
>     TI.execution_date == dttm,
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 
> 2980, in count
>     return self.from_self(col).scalar()
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 
> 2749, in scalar
>     ret = self.one()
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 
> 2718, in one
>     ret = list(self)
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 
> 2761, in __iter__
>     return self._execute_and_instances(context)
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line 
> 2776, in _execute_and_instances
>     result = conn.execute(querycontext.statement, self._params)
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 
> 914, in execute
>     return meth(self, multiparams, params)
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/sql/elements.py", line 
> 323, in _execute_on_connection
>     return connection._execute_clauseelement(self, multiparams, params)
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 
> 1010, in _execute_clauseelement
>     compiled_sql, distilled_params
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 
> 1146, in _execute_context
>     context)
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 
> 1341, in _handle_dbapi_exception
>     exc_info
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/util/compat.py", line 
> 202, in raise_from_cause
>     reraise(type(exception), exception, tb=exc_tb, cause=cause)
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line 
> 1139, in _execute_context
>     context)
>   File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/default.py", 
> line 450, in do_execute
>     cursor.execute(statement, parameters)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to