ashb commented on a change in pull request #6348: [AIRFLOW-XXX] GSoD: Adding 
'Create a custom operator' doc
URL: https://github.com/apache/airflow/pull/6348#discussion_r336410072
 
 

 ##########
 File path: docs/howto/operator/custom-operator.rst
 ##########
 @@ -0,0 +1,185 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+
+Create Custom Operator
+=======================
+
+
+Airflow allows you to create new operators to suit the requirements of you or 
your team. 
+The extensibility is one of the many reasons which makes Apache Airflow 
powerful. 
+
+You can create any operator you want by extending the 
:class:`airflow.models.baseoperator.BaseOperator`
+
+There are two methods that you need to override in a derived class:
+
+* Constructor - Define the parameters required for the operator. You only need 
to specify the arguments specific to your operator.
+  Use ``@apply_defaults`` decorator function to fill unspecified arguments 
with ``default_args``. You can specify the ``default_args``
+  in the dag file. See :ref:`Default args <default-args>` for more details.
+
+* Execute - The code to execute when the runner calls the operator. The method 
contains the 
+  airflow context as a parameter that can be used to read config values.
+
+Let's implement an example ``HelloOperator``:
+
+.. code::  python
+        
+        from airflow.models.baseoperator import BaseOperator
+        from airflow.utils.decorators import apply_defaults
+        
+        class HelloOperator(BaseOperator):
+
+            @apply_defaults
+            def __init__(
+                    self,
+                    name: str,
+                    *args, **kwargs) -> None:
+                super().__init__(*args, **kwargs)
+                self.name = name
+
+            def execute(self, context):
+                message = "Hello {}".format(name)
+                print(message)
+                return message
+
+You can now use the derived custom operator as follows:
+
+.. code:: python
+
+    hello_task = HelloOperator(task_id='sample-task', dag=dag, name='foo_bar')
+
+Hooks
+^^^^^
+Hooks act as an interface to communicate with the external shared resources in 
a DAG.
+For example, multiple tasks in a DAG can require access to a MySQL database. 
Instead of
+creating a connection per task, you can retrieve a connection from the hook 
and utilize it.
+Hook also helps to avoid storing connection auth parameters in a DAG. 
+See :doc:`../connection/index` for how to create and manage connections.
+
+Let's extend our previous example to fetch name from MySQL:
+
+.. code:: python
+
+    class HelloDBOperator(BaseOperator):
+
+            @apply_defaults
+            def __init__(
+                    self,
+                    name: str,
+                    conn_id: str,
+                    database: str,
+                    *args, **kwargs) -> None:
+                super().__init__(*args, **kwargs)
+                self.name = name
+                self.conn_id = conn_id
+                self.database = database
+
+            def execute(self, context):
+                hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
+                         schema=self.database)
+                sql = "select name from user"
+                result = hook.get_first(sql)
+                message = "Hello {}".format(result['name'])
+                print(message)
+                return message
+
+When the operator invokes the query on the hook object, a new connection gets 
created if it doesn't exist. 
+The hook retrieves the auth parameters such as username and password from 
Airflow
+backend and passes the params to the 
:py:func:`airflow.hooks.base_hook.BaseHook.get_connection`. 
+
+
+User interface
+^^^^^^^^^^^^^^^
+Airflow also allows the developer to control how the operator shows up in the 
DAG UI.
+Override ``ui_color`` to change the background color of the operator in UI. 
+Override ``ui_fgcolor`` to change the color of the label.
+
+.. code::  python
+
+        class HelloOperator(BaseOperator):
+            ui_color = '#ff0000'
+            ui_fgcolor = '#000000'
+            ....
+
+Templating
+^^^^^^^^^^^
+You can use :ref:`Jinja templates <jinja-templating>` to parameterize your 
operator.
+Airflow considers the field names present in ``template_fields``  for 
templating while rendering
+the operator.
+
+.. code:: python
+    
+        class HelloOperator(BaseOperator):
+            
+            template_fields = ['name']
+            
+            @apply_defaults
+            def __init__(
+                    self,
+                    name: str,
+                    *args, **kwargs) -> None:
+                super().__init__(*args, **kwargs)
+                self.name = name
+
+            def execute(self, context):
+                message = "Hello from {}".format(name)
+                print(message)
+                return message
+
+        hello_task = HelloOperator(task_id='task_id_1', dag=dag, name='{{ 
task_id }}')
+
+In this example, Jinja looks for the ``name`` parameter and substitutes ``{{ 
task_id }}`` with
+task_id_1.
 
 Review comment:
   ```suggestion
   ``task_id_1``.
   ```
   
   This substition happens just before the ``execute()`` function is called, so 
don't do anything in the constructor assuming it will have it's final value.
   
   
   It might also be worth saying that the template_fields apply to the object 
attribute names, not the constructor arg names.
   
   I.e. if we do this:
   
   ```
               def __init__(
                       self,
                       name: str,
                       *args, **kwargs) -> None:
                   super().__init__(*args, **kwargs)
                   self._name = name
   ```
   
   then we'd need `template_fields = ['_name']`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to