sure thing. attached is a minimal example.
error i get is: [2018-06-19 09:44:23,133] {cli.py:374} INFO - Running on host airflow-worker-f796f6bd-7qzwc [2018-06-19 09:44:23,577] {models.py:1196} INFO - Dependencies all met for <TaskInstance: dev_dag.encode_posts 2018-06-19 09:15:00 [queued]> [2018-06-19 09:44:23,813] {models.py:1196} INFO - Dependencies all met for <TaskInstance: dev_dag.encode_posts 2018-06-19 09:15:00 [queued]> [2018-06-19 09:44:23,824] {models.py:1406} INFO - -------------------------------------------------------------------------------- Starting attempt 1 of 6 -------------------------------------------------------------------------------- [2018-06-19 09:44:24,188] {models.py:1427} INFO - Executing <Task(PythonOperator): encode_posts> on 2018-06-19 09:15:00 [2018-06-19 09:44:24,222] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run dev_dag encode_posts 2018-06-19T09:15:00 --job_id 5753 --raw -sd DAGS_FOLDER/dev_dag.py'] [2018-06-19 09:45:11,207] {base_task_runner.py:98} INFO - Subtask: [2018-06-19 09:45:11,164] {__init__.py:45} INFO - Using executor CeleryExecutor [2018-06-19 09:45:13,200] {base_task_runner.py:98} INFO - Subtask: [2018-06-19 09:45:13,168] {models.py:189} INFO - Filling up the DagBag from /home/airflow/gcs/dags/dev_dag.py [2018-06-19 09:45:13,222] {base_task_runner.py:98} INFO - Subtask: /usr/local/lib/python2.7/site-packages/airflow/utils/helpers.py:351: DeprecationWarning: Importing DummyOperator directly from <module 'airflow.operators' from '/usr/local/lib/python2.7/site-packages/airflow/operators/__init__.pyc'> has been deprecated. Please import from '<module 'airflow.operators' from '/usr/local/lib/python2.7/site-packages/airflow/operators/__init__.pyc'>.[operator_module]' instead. Support for direct imports will be dropped entirely in Airflow 2.0. [2018-06-19 09:45:13,224] {base_task_runner.py:98} INFO - Subtask: DeprecationWarning) [2018-06-19 09:45:27,573] {base_task_runner.py:98} INFO - Subtask: [2018-06-19 09:45:27,571] {dev_dag.py:52} INFO - ... begin - get module from tf-hub ... [2018-06-19 09:45:28,228] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last): [2018-06-19 09:45:28,230] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/bin/airflow", line 27, in <module> [2018-06-19 09:45:28,232] {base_task_runner.py:98} INFO - Subtask: args.func(args) [2018-06-19 09:45:28,232] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run [2018-06-19 09:45:28,233] {base_task_runner.py:98} INFO - Subtask: pool=args.pool, [2018-06-19 09:45:28,233] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper [2018-06-19 09:45:28,234] {base_task_runner.py:98} INFO - Subtask: result = func(*args, **kwargs) [2018-06-19 09:45:28,234] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task [2018-06-19 09:45:28,250] {base_task_runner.py:98} INFO - Subtask: result = task_copy.execute(context=context) [2018-06-19 09:45:28,250] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 89, in execute [2018-06-19 09:45:28,251] {base_task_runner.py:98} INFO - Subtask: return_value = self.execute_callable() [2018-06-19 09:45:28,251] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/airflow/operators/python_operator.py", line 94, in execute_callable [2018-06-19 09:45:28,252] {base_task_runner.py:98} INFO - Subtask: return self.python_callable(*self.op_args, **self.op_kwargs) [2018-06-19 09:45:28,252] {base_task_runner.py:98} INFO - Subtask: File "/home/airflow/gcs/dags/dev_dag.py", line 53, in fn_encode_posts [2018-06-19 09:45:28,253] {base_task_runner.py:98} INFO - Subtask: embed = hub.Module("https://tfhub.dev/google/nnlm-en-dim50/1") [2018-06-19 09:45:28,253] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/tensorflow_hub/module.py", line 105, in __init__ [2018-06-19 09:45:28,276] {base_task_runner.py:98} INFO - Subtask: self._spec = as_module_spec(spec) [2018-06-19 09:45:28,277] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/tensorflow_hub/module.py", line 31, in as_module_spec [2018-06-19 09:45:28,278] {base_task_runner.py:98} INFO - Subtask: return native_module.load_module_spec(spec) [2018-06-19 09:45:28,278] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/tensorflow_hub/native_module.py", line 99, in load_module_spec [2018-06-19 09:45:28,280] {base_task_runner.py:98} INFO - Subtask: path = compressed_module_resolver.get_default().get_module_path(path) [2018-06-19 09:45:28,280] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/tensorflow_hub/resolver.py", line 385, in get_module_path [2018-06-19 09:45:28,295] {base_task_runner.py:98} INFO - Subtask: return self._get_module_path(handle) [2018-06-19 09:45:28,296] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/tensorflow_hub/resolver.py", line 467, in _get_module_path [2018-06-19 09:45:28,297] {base_task_runner.py:98} INFO - Subtask: return resolver.get_module_path(handle) [2018-06-19 09:45:28,297] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/tensorflow_hub/resolver.py", line 385, in get_module_path [2018-06-19 09:45:28,298] {base_task_runner.py:98} INFO - Subtask: return self._get_module_path(handle) [2018-06-19 09:45:28,299] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/tensorflow_hub/compressed_module_resolver.py", line 105, in _get_module_path [2018-06-19 09:45:28,342] {base_task_runner.py:98} INFO - Subtask: self._lock_file_timeout_sec()) [2018-06-19 09:45:28,343] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/tensorflow_hub/resolver.py", line 313, in atomic_download [2018-06-19 09:45:28,343] {base_task_runner.py:98} INFO - Subtask: download_fn(handle, tmp_dir) [2018-06-19 09:45:28,344] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/tensorflow_hub/compressed_module_resolver.py", line 86, in download [2018-06-19 09:45:28,345] {base_task_runner.py:98} INFO - Subtask: request = url.Request(_append_compressed_format_query(handle)) [2018-06-19 09:45:28,345] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/tensorflow_hub/compressed_module_resolver.py", line 62, in _append_compressed_format_query [2018-06-19 09:45:28,346] {base_task_runner.py:98} INFO - Subtask: return urlparse.urlunparse(parsed) [2018-06-19 09:45:28,346] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/future/backports/urllib/parse.py", line 387, in urlunparse [2018-06-19 09:45:28,368] {base_task_runner.py:98} INFO - Subtask: _coerce_args(*components)) [2018-06-19 09:45:28,370] {base_task_runner.py:98} INFO - Subtask: File "/usr/local/lib/python2.7/site-packages/future/backports/urllib/parse.py", line 115, in _coerce_args [2018-06-19 09:45:28,371] {base_task_runner.py:98} INFO - Subtask: raise TypeError("Cannot mix str and non-str arguments") [2018-06-19 09:45:28,373] {base_task_runner.py:98} INFO - Subtask: TypeError: Cannot mix str and non-str arguments I'm running this on Google cloud composer which is airflow 1.9 i believe. Cheers, Andy On Tue, Jun 19, 2018 at 10:06 AM Ash Berlin-Taylor < ash_airflowl...@firemirror.com> wrote: > There's nothing directly in Airflow itself that would cause this kind of > issue that I can think of. > > It depends on what the PythonOperator you are using in the DAG does > really. Can you share that code? > > -ash > > > On 19 Jun 2018, at 10:01, Andrew Maguire <andrewm4...@gmail.com> wrote: > > > > Hi All, > > > > Just wondering if anyone might have a deeper insight into what if > anything > > airflow related might be causing this issue > > <https://github.com/tensorflow/hub/issues/76>. > > > > When i try load a tensorflow hub module within an airflow operator i get > > the error in that issue. > > > > Works fine if i just run the python script myself. > > > > Best i could figure out was something airflow was doing didn't agree with > > something tensorflow hub was expecting. And i'm not really sure if there > is > > anything i could do to resolve. > > > > Cheers, > > Andy > >
# -*- coding: utf-8 -*- """ tf_hub_encode_posts """ ######################################################### # SET UP ######################################################### from datetime import datetime, timedelta from airflow import DAG, macros from airflow.operators import DummyOperator from airflow.operators.python_operator import PythonOperator import tensorflow as tf import tensorflow_hub as hub import pandas as pd import logging ######################################################### # DEFINE DAG ######################################################### # define default args with helper function default_args = dict( owner = 'Airflow', depends_on_past = False, retries = 5, retry_delay = timedelta(minutes=2), catchup = False ) # Define the DAG dag = DAG(dag_id='dev_dag', description='dev_dag', start_date=datetime.strptime('2018-06-18', '%Y-%m-%d'), #schedule_interval='*/15 * * * *', schedule_interval='@once', default_args=default_args, catchup = False ) ######################################################### # PYTHON DAG FUNCTIONS ######################################################### def fn_encode_posts(**kwargs): tf.logging.set_verbosity(tf.logging.ERROR) # Import the Universal Sentence Encoder's TF Hub module logging.info('... begin - get module from tf-hub ...') embed = hub.Module("https://tfhub.dev/google/nnlm-en-dim50/1") logging.info('... done - get module from tf-hub ...') posts = ['a sentence to embedd', 'another sentence that should have a similar embedding'] # run tf session to get embedding with tf.Session() as session: session.run([tf.global_variables_initializer(), tf.tables_initializer()]) post_embeddings = session.run(embed(posts)) # wrangle df logging.info('... save to df ...') df_out = pd.DataFrame(post_embeddings,columns=['dim_{}'.format(i) for i in range(1,post_embeddings.shape[1]+1)]) logging.info('... look at df ...') logging.info(df_out.info()) logging.info(df_out.head().T) ######################################################### # TASKS ######################################################### # Set DummyOperator kick_off_dag = DummyOperator(task_id='kick_off_dag',dag=dag) #----------------------------------# # encode_posts #----------------------------------# encode_posts = PythonOperator( task_id='encode_posts', provide_context=True, python_callable=fn_encode_posts, dag = dag, priority_weight = 100 ) kick_off_dag >> encode_posts