Repository: incubator-airflow Updated Branches: refs/heads/master b77e0dc69 -> 0bc6042ce
[AIRFLOW-2565] templatize cluster_label Make cluster_label in QuboleOperator a templated field. Closes #3463 from milton0825/make-cluster-label- template-field Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0bc6042c Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0bc6042c Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0bc6042c Branch: refs/heads/master Commit: 0bc6042ce83d3f7303c5bc1e7988e5a4daa44631 Parents: b77e0dc Author: milton0825 <milton0...@gmail.com> Authored: Wed Jun 6 11:09:54 2018 +0530 Committer: Sumit Maheshwari <sum...@qubole.com> Committed: Wed Jun 6 11:09:54 2018 +0530 ---------------------------------------------------------------------- .../example_dags/example_qubole_operator.py | 16 ++++++++++---- airflow/contrib/operators/qubole_operator.py | 2 +- tests/contrib/operators/test_qubole_operator.py | 22 +++++++++++++++++--- 3 files changed, 32 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0bc6042c/airflow/contrib/example_dags/example_qubole_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/example_dags/example_qubole_operator.py b/airflow/contrib/example_dags/example_qubole_operator.py index bfa8355..826a50a 100644 --- a/airflow/contrib/example_dags/example_qubole_operator.py +++ b/airflow/contrib/example_dags/example_qubole_operator.py @@ -61,7 +61,7 @@ t1 = QuboleOperator( task_id='hive_show_table', command_type='hivecmd', query='show tables', - cluster_label='default', + cluster_label='{{ params.cluster_label }}', fetch_logs=True, # If `fetch_logs`=true, will fetch qubole command logs and concatenate # them into corresponding airflow task logs @@ -69,7 +69,11 @@ t1 = QuboleOperator( # To attach tags to qubole command, auto attach 3 tags - dag_id, task_id, run_id qubole_conn_id='qubole_default', # Connection id to submit commands inside QDS, if not set "qubole_default" is used - dag=dag) + dag=dag, + params={ + 'cluster_label': 'default', + } +) t2 = QuboleOperator( task_id='hive_s3_location', @@ -115,9 +119,13 @@ t4 = QuboleOperator( '-numReduceTasks 0 -input s3://paid-qubole/HadoopAPITests/' 'data/3.tsv -output ' 's3://paid-qubole/HadoopAPITests/data/3_wc', - cluster_label='default', + cluster_label='{{ params.cluster_label }}', fetch_logs=True, - dag=dag) + dag=dag, + params={ + 'cluster_label': 'default', + } +) t5 = QuboleOperator( task_id='pig_cmd', http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0bc6042c/airflow/contrib/operators/qubole_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/qubole_operator.py b/airflow/contrib/operators/qubole_operator.py index 88e8bdf..82ee293 100755 --- a/airflow/contrib/operators/qubole_operator.py +++ b/airflow/contrib/operators/qubole_operator.py @@ -122,7 +122,7 @@ class QuboleOperator(BaseOperator): 'extract_query', 'boundary_query', 'macros', 'name', 'parameters', 'dbtap_id', 'hive_table', 'db_table', 'split_column', 'note_id', 'db_update_keys', 'export_dir', 'partition_spec', 'qubole_conn_id', - 'arguments', 'user_program_arguments') + 'arguments', 'user_program_arguments', 'cluster_label') template_ext = ('.txt',) ui_color = '#3064A1' http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0bc6042c/tests/contrib/operators/test_qubole_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_qubole_operator.py b/tests/contrib/operators/test_qubole_operator.py index ad66f75..bf61262 100644 --- a/tests/contrib/operators/test_qubole_operator.py +++ b/tests/contrib/operators/test_qubole_operator.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -21,7 +21,7 @@ import unittest from datetime import datetime -from airflow.models import DAG, Connection +from airflow.models import DAG, Connection, TaskInstance from airflow.utils import db from airflow.contrib.hooks.qubole_hook import QuboleHook @@ -64,6 +64,22 @@ class QuboleOperatorTest(unittest.TestCase): self.assertEqual(task.task_id, TASK_ID) self.assertEqual(result, TEMPLATE_CONN) + def test_init_with_template_cluster_label(self): + dag = DAG(DAG_ID, start_date=DEFAULT_DATE) + task = QuboleOperator( + task_id=TASK_ID, + dag=dag, + cluster_label='{{ params.cluster_label }}', + params={ + 'cluster_label': 'default' + } + ) + + ti = TaskInstance(task, DEFAULT_DATE) + ti.render_templates() + + self.assertEqual(task.cluster_label, 'default') + def test_get_hook(self): dag = DAG(DAG_ID, start_date=DEFAULT_DATE)