Repository: incubator-airflow
Updated Branches:
  refs/heads/master b77e0dc69 -> 0bc6042ce


[AIRFLOW-2565] templatize cluster_label

Make cluster_label in QuboleOperator a templated
field.

Closes #3463 from milton0825/make-cluster-label-
template-field


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0bc6042c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0bc6042c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0bc6042c

Branch: refs/heads/master
Commit: 0bc6042ce83d3f7303c5bc1e7988e5a4daa44631
Parents: b77e0dc
Author: milton0825 <milton0...@gmail.com>
Authored: Wed Jun 6 11:09:54 2018 +0530
Committer: Sumit Maheshwari <sum...@qubole.com>
Committed: Wed Jun 6 11:09:54 2018 +0530

----------------------------------------------------------------------
 .../example_dags/example_qubole_operator.py     | 16 ++++++++++----
 airflow/contrib/operators/qubole_operator.py    |  2 +-
 tests/contrib/operators/test_qubole_operator.py | 22 +++++++++++++++++---
 3 files changed, 32 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0bc6042c/airflow/contrib/example_dags/example_qubole_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/example_dags/example_qubole_operator.py 
b/airflow/contrib/example_dags/example_qubole_operator.py
index bfa8355..826a50a 100644
--- a/airflow/contrib/example_dags/example_qubole_operator.py
+++ b/airflow/contrib/example_dags/example_qubole_operator.py
@@ -61,7 +61,7 @@ t1 = QuboleOperator(
     task_id='hive_show_table',
     command_type='hivecmd',
     query='show tables',
-    cluster_label='default',
+    cluster_label='{{ params.cluster_label }}',
     fetch_logs=True,
     # If `fetch_logs`=true, will fetch qubole command logs and concatenate
     # them into corresponding airflow task logs
@@ -69,7 +69,11 @@ t1 = QuboleOperator(
     # To attach tags to qubole command, auto attach 3 tags - dag_id, task_id, 
run_id
     qubole_conn_id='qubole_default',
     # Connection id to submit commands inside QDS, if not set "qubole_default" 
is used
-    dag=dag)
+    dag=dag,
+    params={
+        'cluster_label': 'default',
+    }
+)
 
 t2 = QuboleOperator(
     task_id='hive_s3_location',
@@ -115,9 +119,13 @@ t4 = QuboleOperator(
                 '-numReduceTasks 0 -input s3://paid-qubole/HadoopAPITests/'
                 'data/3.tsv -output '
                 's3://paid-qubole/HadoopAPITests/data/3_wc',
-    cluster_label='default',
+    cluster_label='{{ params.cluster_label }}',
     fetch_logs=True,
-    dag=dag)
+    dag=dag,
+    params={
+        'cluster_label': 'default',
+    }
+)
 
 t5 = QuboleOperator(
     task_id='pig_cmd',

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0bc6042c/airflow/contrib/operators/qubole_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/qubole_operator.py 
b/airflow/contrib/operators/qubole_operator.py
index 88e8bdf..82ee293 100755
--- a/airflow/contrib/operators/qubole_operator.py
+++ b/airflow/contrib/operators/qubole_operator.py
@@ -122,7 +122,7 @@ class QuboleOperator(BaseOperator):
                        'extract_query', 'boundary_query', 'macros', 'name', 
'parameters',
                        'dbtap_id', 'hive_table', 'db_table', 'split_column', 
'note_id',
                        'db_update_keys', 'export_dir', 'partition_spec', 
'qubole_conn_id',
-                       'arguments', 'user_program_arguments')
+                       'arguments', 'user_program_arguments', 'cluster_label')
 
     template_ext = ('.txt',)
     ui_color = '#3064A1'

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0bc6042c/tests/contrib/operators/test_qubole_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_qubole_operator.py 
b/tests/contrib/operators/test_qubole_operator.py
index ad66f75..bf61262 100644
--- a/tests/contrib/operators/test_qubole_operator.py
+++ b/tests/contrib/operators/test_qubole_operator.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,7 +21,7 @@
 import unittest
 from datetime import datetime
 
-from airflow.models import DAG, Connection
+from airflow.models import DAG, Connection, TaskInstance
 from airflow.utils import db
 
 from airflow.contrib.hooks.qubole_hook import QuboleHook
@@ -64,6 +64,22 @@ class QuboleOperatorTest(unittest.TestCase):
         self.assertEqual(task.task_id, TASK_ID)
         self.assertEqual(result, TEMPLATE_CONN)
 
+    def test_init_with_template_cluster_label(self):
+        dag = DAG(DAG_ID, start_date=DEFAULT_DATE)
+        task = QuboleOperator(
+            task_id=TASK_ID,
+            dag=dag,
+            cluster_label='{{ params.cluster_label }}',
+            params={
+                'cluster_label': 'default'
+            }
+        )
+
+        ti = TaskInstance(task, DEFAULT_DATE)
+        ti.render_templates()
+
+        self.assertEqual(task.cluster_label, 'default')
+
     def test_get_hook(self):
         dag = DAG(DAG_ID, start_date=DEFAULT_DATE)
 

Reply via email to