[ https://issues.apache.org/jira/browse/AIRFLOW-503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15722248#comment-15722248 ]
Matthias Huschle commented on AIRFLOW-503: ------------------------------------------ I guess your problem lies in "allowed_states=[all]". "all" is a built-in function. The argument should be a list of states, as defined in the "State" class. > ExternalTaskSensor causes runtime exception > ------------------------------------------- > > Key: AIRFLOW-503 > URL: https://issues.apache.org/jira/browse/AIRFLOW-503 > Project: Apache Airflow > Issue Type: Bug > Components: db, operators > Affects Versions: Airflow 1.7.1 > Environment: airflow 1.7.1.3. > postgress 9.2.13 (backend DB) > OS Red Hat Enterprise Linux Server 7.2 (Maipo) > python 2.7.5 > Reporter: Hila Visan > Priority: Critical > > I just created a new task using ExternalTaskSensor between weekly dag and > daily dag (named 'services_daily_sensor') . > When I tried to test it, i ran the command: > 'airflow test weekly_agg services_daily_sensor 2016-09-11T06:00:00'. > The command failed with the following error: > > ervices_daily_sensor> on 2016-09-11 06:00:00 > [2016-09-11 08:59:09,602] {sensors.py:195} INFO - Poking for > daily_agg.services_daily_task on 2016-09-11 02:00:00 ... > [2016-09-11 08:59:09,614] {models.py:1286} ERROR - > (psycopg2.ProgrammingError) can't adapt type 'builtin_function_or_method' > [SQL: 'SELECT count(*) AS count_1 \nFROM (SELECT task_instance.task_id AS > task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, > task_instance.execution_date AS task_instance_execution_date, > task_instance.start_date AS task_instance_start_date, task_instance.end_date > AS task_instance_end_date, task_instance.duration AS task_instance_duration, > task_instance.state AS task_instance_state, task_instance.try_number AS > task_instance_try_number, task_instance.hostname AS task_instance_hostname, > task_instance.unixname AS task_instance_unixname, task_instance.job_id AS > task_instance_job_id, task_instance.pool AS task_instance_pool, > task_instance.queue AS task_instance_queue, task_instance.priority_weight AS > task_instance_priority_weight, task_instance.operator AS > task_instance_operator, task_instance.queued_dttm AS > task_instance_queued_dttm \nFROM task_instance \nWHERE task_instance.dag_id = > %(dag_id_1)s AND task_instance.task_id = %(task_id_1)s AND > task_instance.state IN (%(state_1)s) AND task_instance.execution_date = > %(execution_date_1)s) AS anon_1'] [parameters: {'state_1': <built-in function > all>, 'execution_date_1': datetime.datetime(2016, 9, 11, 2, 0), 'dag_id_1': > 'daily_agg', 'task_id_1': 'services_daily_task'}] > Traceback (most recent call last): > File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1242, in run > result = task_copy.execute(context=context) > File "/usr/lib/python2.7/site-packages/airflow/operators/sensors.py", line > 56, in execute > while not self.poke(context): > File "/usr/lib/python2.7/site-packages/airflow/operators/sensors.py", line > 203, in poke > TI.execution_date == dttm, > File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line > 2980, in count > return self.from_self(col).scalar() > File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line > 2749, in scalar > ret = self.one() > File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line > 2718, in one > ret = list(self) > File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line > 2761, in __iter__ > return self._execute_and_instances(context) > File "/usr/lib64/python2.7/site-packages/sqlalchemy/orm/query.py", line > 2776, in _execute_and_instances > result = conn.execute(querycontext.statement, self._params) > File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line > 914, in execute > return meth(self, multiparams, params) > File "/usr/lib64/python2.7/site-packages/sqlalchemy/sql/elements.py", line > 323, in _execute_on_connection > return connection._execute_clauseelement(self, multiparams, params) > File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line > 1010, in _execute_clauseelement > compiled_sql, distilled_params > File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line > 1146, in _execute_context > context) > File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line > 1341, in _handle_dbapi_exception > exc_info > File "/usr/lib64/python2.7/site-packages/sqlalchemy/util/compat.py", line > 202, in raise_from_cause > reraise(type(exception), exception, tb=exc_tb, cause=cause) > File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/base.py", line > 1139, in _execute_context > context) > File "/usr/lib64/python2.7/site-packages/sqlalchemy/engine/default.py", > line 450, in do_execute > cursor.execute(statement, parameters) -- This message was sent by Atlassian JIRA (v6.3.4#6332)