Are you running in UTC? Or are your nodes configured in a different tz?

B.

> On 17 May 2017, at 02:20, Russell Jurney <russell.jur...@gmail.com> wrote:
> 
> It seems that this entire file needs to be patched so that the
> datetime.now() calls use tz=pytz.utc. At some point Python started
> including timezones in datetime.now() and so this is broken.
> 
> I think I can patch it. But one problem I am having is how do I see the log
> messages of the scheduler when  I use for example:
> self.logger.error(job.latest_heartbeat)
> ?
> 
> Russell Jurney @rjurney <http://twitter.com/rjurney>
> russell.jur...@gmail.com LI <http://linkedin.com/in/russelljurney> FB
> <http://facebook.com/jurney> datasyndrome.com
> 
> On Tue, May 16, 2017 at 3:40 PM, Russell Jurney <russell.jur...@gmail.com>
> wrote:
> 
>> I setup conda to run python 3.4 and I still get this when a job is
>> scheduled and runs:
>> 
>> [2017-05-16 15:37:54,339] {jobs.py:1171} DagFileProcessor1 INFO -
>> Processing agl_p2p_api_worker_dag
>> [2017-05-16 15:37:55,601] {jobs.py:354} DagFileProcessor1 ERROR - Got an
>> exception! Propagating...
>> Traceback (most recent call last):
>>  File "/Users/rjurney/Software/incubator-airflow/airflow/jobs.py", line
>> 346, in helper
>>    pickle_dags)
>>  File "/Users/rjurney/Software/incubator-airflow/airflow/utils/db.py",
>> line 48, in wrapper
>>    result = func(*args, **kwargs)
>>  File "/Users/rjurney/Software/incubator-airflow/airflow/jobs.py", line
>> 1584, in process_file
>>    self._process_dags(dagbag, dags, ti_keys_to_schedule)
>>  File "/Users/rjurney/Software/incubator-airflow/airflow/jobs.py", line
>> 1173, in _process_dags
>>    dag_run = self.create_dag_run(dag)
>>  File "/Users/rjurney/Software/incubator-airflow/airflow/utils/db.py",
>> line 48, in wrapper
>>    result = func(*args, **kwargs)
>>  File "/Users/rjurney/Software/incubator-airflow/airflow/jobs.py", line
>> 815, in create_dag_run
>>    if next_run_date > datetime.now():
>> TypeError: can't compare offset-naive and offset-aware datetimes
>> 
>> 
>> What do I do?
>> 
>> Russell Jurney @rjurney <http://twitter.com/rjurney>
>> russell.jur...@gmail.com LI <http://linkedin.com/in/russelljurney> FB
>> <http://facebook.com/jurney> datasyndrome.com
>> 
>> On Tue, May 16, 2017 at 2:18 PM, Russell Jurney <russell.jur...@gmail.com>
>> wrote:
>> 
>>> Thanks, we're trying that now!
>>> 
>>> Russell Jurney @rjurney <http://twitter.com/rjurney>
>>> russell.jur...@gmail.com LI <http://linkedin.com/in/russelljurney> FB
>>> <http://facebook.com/jurney> datasyndrome.com
>>> 
>>> On Tue, May 16, 2017 at 2:02 PM, Bolke de Bruin <bdbr...@gmail.com>
>>> wrote:
>>> 
>>>> Did you try to run this on Py 2.7 / 3.4 as well? I notice you are
>>>> running on 3.6, which we are not testing against at the moment.
>>>> 
>>>> Bolke.
>>>> 
>>>>> On 16 May 2017, at 22:46, Russell Jurney <russell.jur...@gmail.com>
>>>> wrote:
>>>>> 
>>>>> We have tasks that run, but we can't get them to run as scheduled or to
>>>>> un-pause.
>>>>> 
>>>>> The code for the task looks like this:
>>>>> 
>>>>> # Run the API worker every 5 minutes
>>>>> api_worker_dag = DAG(
>>>>>   'agl_p2p_api_worker_dag',
>>>>>   default_args=default_args,
>>>>>   schedule_interval=timedelta(minutes=5)
>>>>> )
>>>>> 
>>>>> # Run the API worker[
>>>>> api_worker_task = BashOperator(
>>>>>   task_id="api_worker_task",
>>>>>   bash_command="""python {{ params.base_path
>>>>> }}/agl-p2p-api-worker/site/worker.py {{ ds }}""",
>>>>>   params={
>>>>>       "base_path": project_home
>>>>>   },
>>>>>   dag=api_worker_dag
>>>>> )
>>>>> 
>>>>> We run this command: airflow unpause agl_p2p_api_worker_dag
>>>>> 
>>>>> And we see this error:
>>>>> 
>>>>> [2017-05-16 20:26:48,722] {jobs.py:1408} INFO - Heartbeating the
>>>> process
>>>>> manager
>>>>> [2017-05-16 20:26:48,723] {dag_processing.py:559} INFO - Processor for
>>>>> /root/airflow/dags/setup.py finished
>>>>> [2017-05-16 20:26:48,723] {dag_processing.py:578} WARNING - Processor
>>>> for
>>>>> /root/airflow/dags/setup.py exited with return code 1. See
>>>>> /root/airflow/logs/scheduler/2017-05-16/setup.py.log for details.
>>>>> [2017-05-16 20:26:48,726] {dag_processing.py:627} INFO - Started a
>>>> process
>>>>> (PID: 110) to generate tasks for /root/airflow/dags/setup.py - logging
>>>> into
>>>>> /root/airflow/logs/scheduler/2017-05-16/setup.py.log
>>>>> [2017-05-16 20:26:48,727] {jobs.py:1444} INFO - Heartbeating the
>>>> executor
>>>>> [2017-05-16 20:26:48,727] {jobs.py:1454} INFO - Heartbeating the
>>>> scheduler
>>>>> Process DagFileProcessor19-Process:
>>>>> Traceback (most recent call last):
>>>>> File "/opt/conda/lib/python3.6/multiprocessing/process.py", line
>>>> 249, in
>>>>> _bootstrap
>>>>>   self.run()
>>>>> File "/opt/conda/lib/python3.6/multiprocessing/process.py", line 93,
>>>> in
>>>>> run
>>>>>   self._target(*self._args, **self._kwargs)
>>>>> File "/opt/conda/lib/python3.6/site-packages/airflow/jobs.py", line
>>>> 346,
>>>>> in helper
>>>>>   pickle_dags)
>>>>> File "/opt/conda/lib/python3.6/site-packages/airflow/utils/db.py",
>>>> line
>>>>> 53, in wrapper
>>>>>   result = func(*args, **kwargs)
>>>>> File "/opt/conda/lib/python3.6/site-packages/airflow/jobs.py", line
>>>> 1585,
>>>>> in process_file
>>>>>   self._process_dags(dagbag, dags, ti_keys_to_schedule)
>>>>> File "/opt/conda/lib/python3.6/site-packages/airflow/jobs.py", line
>>>> 1174,
>>>>> in _process_dags
>>>>>   dag_run = self.create_dag_run(dag)
>>>>> File "/opt/conda/lib/python3.6/site-packages/airflow/utils/db.py",
>>>> line
>>>>> 53, in wrapper
>>>>>   result = func(*args, **kwargs)
>>>>> File "/opt/conda/lib/python3.6/site-packages/airflow/jobs.py", line
>>>> 807,
>>>>> in create_dag_run
>>>>>   else max(next_run_date, dag.start_date))
>>>>> TypeError: can't compare offset-naive and offset-aware datetimes
>>>>> 
>>>>> 
>>>>> Note that we can run/schedule the example DAGs from the CLI, and our
>>>> DAGs
>>>>> are very closely derived from the examples, so we don't know what to
>>>> do!
>>>>> 
>>>>> For instance we can run:
>>>>> 
>>>>> docker@airflow:/mnt/airflow$ airflow run agl_p2p_api_worker_dag
>>>>> api_worker_task 2017-01-01
>>>>> [2017-05-16 20:44:41,046] {__init__.py:57} INFO - Using executor
>>>>> SequentialExecutor
>>>>> Sending to executor.
>>>>> [2017-05-16 20:44:43,274] {__init__.py:57} INFO - Using executor
>>>>> SequentialExecutor
>>>>> Logging into:
>>>>> /root/airflow/logs/agl_p2p_api_worker_dag/api_worker_task/20
>>>> 17-01-01T00:00:00
>>>>> 
>>>>> 
>>>>> And there are no errors. We are sure the scheduler is running and the
>>>>> webserver is running. The airflow run/test commands work, but we get
>>>> this
>>>>> same error above when we click the activate button on the web app or
>>>>> unpause the DAG from the CLI. We have wondered if our start date has
>>>> to be
>>>>> in the future maybe? We don't know.
>>>>> 
>>>>> Any help would be appreciated. Thanks!
>>>>> 
>>>>> Russell Jurney @rjurney <http://twitter.com/rjurney>
>>>>> russell.jur...@gmail.com LI <http://linkedin.com/in/russelljurney> FB
>>>>> <http://facebook.com/jurney> datasyndrome.com
>>>> 
>>>> 
>>> 
>> 

Reply via email to