Hi all, I have problem with spark operator. I get exception user@host:/# airflow test myDAG myTask 2018-04-26 [2018-04-26 15:32:11,279] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/Grammar.txt [2018-04-26 15:32:11,323] {driver.py:120} INFO - Generating grammar tables from /usr/lib/python3.5/lib2to3/PatternGrammar.txt [2018-04-26 15:32:11,456] {__init__.py:45} INFO - Using executor SequentialExecutor [2018-04-26 15:32:11,535] {models.py:189} INFO - Filling up the DagBag from /usr/local/airflow/dags [2018-04-26 15:32:11,811] {base_hook.py:80} INFO - Using connection to: sparkhost Traceback (most recent call last): File "/usr/local/bin/airflow", line 27, in <module> args.func(args) File "/usr/local/lib/python3.5/dist-packages/airflow/bin/cli.py", line 528, in test ti.run(ignore_task_deps=True, ignore_ti_state=True, test_mode=True) File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py", line 50, in wrapper result = func(*args, **kwargs) File "/usr/local/lib/python3.5/dist-packages/airflow/models.py", line 1584, in run session=session) File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py", line 50, in wrapper result = func(*args, **kwargs) File "/usr/local/lib/python3.5/dist-packages/airflow/models.py", line 1493, in _run_raw_task result = task_copy.execute(context=context) File "/usr/local/lib/python3.5/dist-packages/airflow/contrib/operators/spark_submit_operator.py", line 145, in execute self._hook.submit(self._application) File "/usr/local/lib/python3.5/dist-packages/airflow/contrib/hooks/spark_submit_hook.py", line 231, in submit **kwargs) File "/usr/lib/python3.5/subprocess.py", line 947, in __init__ restore_signals, start_new_session) File "/usr/lib/python3.5/subprocess.py", line 1490, in _execute_child restore_signals, start_new_session, preexec_fn) TypeError: Can't convert 'list' object to str implicitly
My DAG look like: from airflow import DAG from datetime import datetime, timedelta, date from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator default_args = { 'owner': 'spark', 'depends_on_past': False, 'start_date': datetime.now(), 'retries': 1, 'retry_delay': timedelta(minutes=1) } dag = DAG('myDAG', default_args=default_args,) connection_id = "SPARK" os.environ[('AIRFLOW_CONN_%s' % connection_id)] = 'spark://sparkhost:7077' _config = { 'jars': 'spark_job.jar', 'executor_memory': '2g', 'name': 'myJob', 'conn_id': connection_id, 'java_class':'org.Job' } operator = SparkSubmitOperator( task_id='myTask', dag=dag, **_config ) What is wrong? Could somebody help me wit it?