This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 45a583f4d51d8bf7ca786a1a67b60ddf53e87c7c Author: Kamil Breguła <mik-...@users.noreply.github.com> AuthorDate: Sat Jun 20 22:02:13 2020 +0200 Enforce code-block directives in doc (#9443) (cherry picked from commit 60d19dc2db79f6eb9be146b59e65a92323d25c08) --- docs/best-practices.rst | 38 +++---- docs/build | 25 +++++ docs/concepts.rst | 227 +++++++++++++++++++++++--------------- docs/dag-run.rst | 10 +- docs/dag-serialization.rst | 2 +- docs/faq.rst | 2 +- docs/howto/custom-operator.rst | 14 +-- docs/howto/email-config.rst | 4 +- docs/howto/secure-connections.rst | 2 +- docs/kubernetes.rst | 4 +- docs/lineage.rst | 4 +- docs/plugins.rst | 6 +- docs/scheduler.rst | 2 +- docs/security.rst | 10 +- docs/timezone.rst | 10 +- docs/tutorial.rst | 2 +- docs/ui.rst | 2 +- 17 files changed, 222 insertions(+), 142 deletions(-) diff --git a/docs/best-practices.rst b/docs/best-practices.rst index 1a27e73..fe25977 100644 --- a/docs/best-practices.rst +++ b/docs/best-practices.rst @@ -89,13 +89,13 @@ It can result in a lot of open connections. The best way of using variables is via a Jinja template which will delay reading the value until the task execution. The template synaxt to do this is: -.. code:: +.. code-block:: {{ var.value.<variable_name> }} or if you need to deserialize a json object from the variable : -.. code:: +.. code-block:: {{ var.json.<variable_name> }} @@ -117,7 +117,7 @@ DAG Loader Test This test should ensure that your DAG does not contain a piece of code that raises error while loading. No additional code needs to be written by the user to run this test. -.. code:: +.. code-block:: python your-dag-file.py @@ -132,7 +132,7 @@ Unit tests ensure that there is no incorrect code in your DAG. You can write a u **Unit test for loading a DAG:** -.. code:: +.. code-block:: from airflow.models import DagBag import unittest @@ -149,9 +149,9 @@ Unit tests ensure that there is no incorrect code in your DAG. You can write a u self.assertEqual(len(dag.tasks), 1) **Unit test a DAG structure:** -This is an example test want to verify the structure of a code-generated DAG against a dict object +This is an example test want to verify the structure of a code-generated DAG against a dict object -.. code:: +.. code-block:: import unittest class testClass(unittest.TestCase): @@ -163,16 +163,16 @@ This is an example test want to verify the structure of a code-generated DAG aga self.assertEqual(task.downstream_task_ids, set(downstream_list), msg="unexpected downstream link in {}".format(task_id)) def test_dag(self): - self.assertDagDictEqual({ - "DummyInstruction_0": ["DummyInstruction_1"], - "DummyInstruction_1": ["DummyInstruction_2"], - "DummyInstruction_2": ["DummyInstruction_3"], - "DummyInstruction_3": [] - },dag) + self.assertDagDictEqual({ + "DummyInstruction_0": ["DummyInstruction_1"], + "DummyInstruction_1": ["DummyInstruction_2"], + "DummyInstruction_2": ["DummyInstruction_3"], + "DummyInstruction_3": [] + },dag) **Unit test for custom operator:** -.. code:: +.. code-block:: import unittest from airflow.utils.state import State @@ -205,7 +205,7 @@ make sure that the partition is created in S3 and perform some simple checks to Similarly, if you have a task that starts a microservice in Kubernetes or Mesos, you should check if the service has started or not using :class:`airflow.sensors.http_sensor.HttpSensor`. -.. code:: +.. code-block:: task = PushToS3(...) check = S3KeySensor( @@ -227,7 +227,7 @@ Do not hard code values inside the DAG and then change them manually according t You can use environment variables to parameterize the DAG. -.. code:: +.. code-block:: import os @@ -252,7 +252,7 @@ If you want to run Airflow in production, make sure you :doc:`configure the back You can change the backend using the following config -.. code:: ini +.. code-block:: ini [core] sql_alchemy_conn = my_conn_string @@ -261,7 +261,7 @@ Once you have changed the backend, airflow needs to create all the tables requir Create an empty DB and give airflow's user the permission to ``CREATE/ALTER`` it. Once that is done, you can run - -.. code:: +.. code-block:: airflow upgradedb @@ -305,14 +305,14 @@ Airflow comes bundles with a default ``airflow.cfg`` configuration file. You should use environment variables for configurations that change across deployments e.g. metadata DB, password. You can do it using the format ``$AIRFLOW__{SECTION}__{KEY}`` -.. code:: +.. code-block:: AIRFLOW__CORE__SQL_ALCHEMY_CONN=my_conn_id AIRFLOW__WEBSERVER__BASE_URL=http://host:port Some configurations such as Airflow Backend connection URI can be derived from bash commands as well: -.. code:: +.. code-block:: sql_alchemy_conn_cmd = bash_command_to_run diff --git a/docs/build b/docs/build index 72b0665..3739dab 100755 --- a/docs/build +++ b/docs/build @@ -144,6 +144,16 @@ def find_modules(deprecated_only: bool = False) -> Set[str]: return modules_names +def assert_file_not_contains(file_path: str, pattern: str, message: str) -> None: + with open(file_path, "rb", 0) as doc_file: + pattern_compiled = re.compile(pattern) + + for num, line in enumerate(doc_file, 1): + line_decode = line.decode() + if re.search(pattern_compiled, line_decode): + build_errors.append(DocBuildError(file_path=file_path, line_no=num, message=message)) + + def check_exampleinclude_for_example_dags(): all_docs_files = glob("**/*rst", recursive=True) @@ -158,6 +168,20 @@ def check_exampleinclude_for_example_dags(): ) +def check_enforce_code_block(): + all_docs_files = glob("**/*rst", recursive=True) + + for doc_file in all_docs_files: + assert_file_not_contains( + file_path=doc_file, + pattern=r"^.. code::", + message=( + "We recommend using the code-block directive instead of the code directive. " + "The code-block directive is more feature-full." + ) + ) + + def prepare_code_snippet(file_path: str, line_no: int, context_lines_count: int=5) -> str: def guess_lexer_for_filename(filename): from pygments.util import ClassNotFound @@ -256,6 +280,7 @@ print("Current working directory: ", os.getcwd()) prepare_directories() clean_files() +check_enforce_code_block() check_exampleinclude_for_example_dags() if build_errors: diff --git a/docs/concepts.rst b/docs/concepts.rst index b3a054f..e85c5b3 100644 --- a/docs/concepts.rst +++ b/docs/concepts.rst @@ -70,7 +70,7 @@ that means the DAG must appear in ``globals()``. Consider the following two DAGs. Only ``dag_1`` will be loaded; the other one only appears in a local scope. -.. code:: python +.. code-block:: python dag_1 = DAG('this_dag_will_be_discovered') @@ -91,7 +91,7 @@ Default Arguments If a dictionary of ``default_args`` is passed to a DAG, it will apply them to any of its operators. This makes it easy to apply a common parameter to many operators without having to type it many times. -.. code:: python +.. code-block:: python default_args = { 'start_date': datetime(2016, 1, 1), @@ -109,7 +109,7 @@ Context Manager DAGs can be used as context managers to automatically assign new operators to that DAG. -.. code:: python +.. code-block:: python with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag: op = DummyOperator('op') @@ -144,6 +144,104 @@ A DAG run and all task instances created within it are instanced with the same ` that logically you can think of a DAG run as simulating the DAG running all of its tasks at some previous date & time specified by the ``execution_date``. +.. _concepts:tasks: + +Tasks +===== + +A Task defines a unit of work within a DAG; it is represented as a node in the DAG graph, and it is written in Python. + +Each task is an implementation of an Operator, for example a ``PythonOperator`` to execute some Python code, +or a ``BashOperator`` to run a Bash command. + +The task implements an operator by defining specific values for that operator, +such as a Python callable in the case of ``PythonOperator`` or a Bash command in the case of ``BashOperator``. + +Relations between Tasks +----------------------- + +Consider the following DAG with two tasks. +Each task is a node in our DAG, and there is a dependency from task_1 to task_2: + +.. code-block:: python + + with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag: + task_1 = DummyOperator('task_1') + task_2 = DummyOperator('task_2') + task_1 >> task_2 # Define dependencies + +We can say that task_1 is *upstream* of task_2, and conversely task_2 is *downstream* of task_1. +When a DAG Run is created, task_1 will start running and task_2 waits for task_1 to complete successfully before it may start. + +Task Instances +============== + +A task instance represents a specific run of a task and is characterized as the +combination of a DAG, a task, and a point in time (``execution_date``). Task instances +also have an indicative state, which could be "running", "success", "failed", "skipped", "up +for retry", etc. + +Tasks are defined in DAGs, and both are written in Python code to define what you want to do. +Task Instances belong to DAG Runs, have an associated ``execution_date``, and are instantiated, runnable entities. + +Relations between Task Instances +-------------------------------- + +Again consider the following tasks, defined for some DAG: + +.. code-block:: python + + with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag: + task_1 = DummyOperator('task_1') + task_2 = DummyOperator('task_2') + task_1 >> task_2 # Define dependencies + +When we enable this DAG, the scheduler creates several DAG Runs - one with ``execution_date`` of 2016-01-01, +one with ``execution_date`` of 2016-01-02, and so on up to the current date. + +Each DAG Run will contain a task_1 Task Instance and a task_2 Task instance. Both Task Instances will +have ``execution_date`` equal to the DAG Run's ``execution_date``, and each task_2 will be *downstream* of +(depends on) its task_1. + +We can also say that task_1 for 2016-01-01 is the *previous* task instance of the task_1 for 2016-01-02. +Or that the DAG Run for 2016-01-01 is the *previous* DAG Run to the DAG Run of 2016-01-02. +Here, *previous* refers to the logical past/prior ``execution_date``, that runs independently of other runs, +and *upstream* refers to a dependency within the same run and having the same ``execution_date``. + +.. note:: + The Airflow documentation sometimes refers to *previous* instead of *upstream* in places, and vice-versa. + If you find any occurrences of this, please help us improve by contributing some corrections! + +Task Lifecycle +============== + +A task goes through various stages from start to completion. In the Airflow UI +(graph and tree views), these stages are displayed by a color representing each +stage: + +.. image:: img/task_stages.png + +The complete lifecycle of the task looks like this: + +.. image:: img/task_lifecycle_diagram.png + +The happy flow consists of the following stages: + +1. No status (scheduler created empty task instance) +2. Scheduled (scheduler determined task instance needs to run) +3. Queued (scheduler sent task to executor to run on the queue) +4. Running (worker picked up a task and is now running it) +5. Success (task completed)8 + +There is also visual difference between scheduled and manually triggered +DAGs/tasks: + +.. image:: img/task_manual_vs_scheduled.png + +The DAGs/tasks with a black border are scheduled runs, whereas the non-bordered +DAGs/tasks are manually triggered, i.e. by ``airflow trigger_dag``. + + .. _concepts:operators: Operators @@ -201,7 +299,7 @@ be transferred or unassigned. DAG assignment can be done explicitly when the operator is created, through deferred assignment, or even inferred from other operators. -.. code:: python +.. code-block:: python dag = DAG('my_dag', start_date=datetime(2016, 1, 1)) @@ -230,7 +328,7 @@ Traditionally, operator relationships are set with the ``set_upstream()`` and bitshift operators ``>>`` and ``<<``. The following four statements are all functionally equivalent: -.. code:: python +.. code-block:: python op1 >> op2 op1.set_downstream(op2) @@ -244,13 +342,13 @@ that ``op1`` runs first and ``op2`` runs second. Multiple operators can be composed -- keep in mind the chain is executed left-to-right and the rightmost object is always returned. For example: -.. code:: python +.. code-block:: python op1 >> op2 >> op3 << op4 is equivalent to: -.. code:: python +.. code-block:: python op1.set_downstream(op2) op2.set_downstream(op3) @@ -258,20 +356,20 @@ is equivalent to: For convenience, the bitshift operators can also be used with DAGs. For example: -.. code:: python +.. code-block:: python dag >> op1 >> op2 is equivalent to: -.. code:: python +.. code-block:: python op1.dag = dag op1.set_downstream(op2) We can put this all together to build a simple pipeline: -.. code:: python +.. code-block:: python with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag: ( @@ -286,20 +384,20 @@ We can put this all together to build a simple pipeline: Bitshift can also be used with lists. For example: -.. code:: python +.. code-block:: python op1 >> [op2, op3] >> op4 is equivalent to: -.. code:: python +.. code-block:: python op1 >> op2 >> op4 op1 >> op3 >> op4 and equivalent to: -.. code:: python +.. code-block:: python op1.set_downstream([op2, op3]) @@ -318,7 +416,7 @@ When setting a relationship between two lists, if we want all operators in one list to be upstream to all operators in the other, we cannot use a single bitshift composition. Instead we have to split one of the lists: -.. code:: python +.. code-block:: python [op1, op2, op3] >> op4 [op1, op2, op3] >> op5 @@ -326,50 +424,50 @@ we cannot use a single bitshift composition. Instead we have to split one of the ``cross_downstream`` could handle list relationships easier. -.. code:: python +.. code-block:: python cross_downstream([op1, op2, op3], [op4, op5, op6]) When setting single direction relationships to many operators, we could concat them with bitshift composition. -.. code:: python +.. code-block:: python op1 >> op2 >> op3 >> op4 >> op5 This can be accomplished using ``chain`` -.. code:: python +.. code-block:: python chain(op1, op2, op3, op4, op5) even without operator's name -.. code:: python +.. code-block:: python chain([DummyOperator(task_id='op' + i, dag=dag) for i in range(1, 6)]) ``chain`` can handle a list of operators -.. code:: python +.. code-block:: python chain(op1, [op2, op3], op4) is equivalent to: -.. code:: python +.. code-block:: python op1 >> [op2, op3] >> op4 When ``chain`` sets relationships between two lists of operators, they must have the same size. -.. code:: python +.. code-block:: python chain(op1, [op2, op3], [op4, op5], op6) is equivalent to: -.. code:: python +.. code-block:: python op1 >> [op2, op3] op2 >> op4 @@ -377,49 +475,6 @@ is equivalent to: [op4, op5] >> op6 -Tasks -===== - -Once an operator is instantiated, it is referred to as a "task". The -instantiation defines specific values when calling the abstract operator, and -the parameterized task becomes a node in a DAG. - -Task Instances -============== - -A task instance represents a specific run of a task and is characterized as the -combination of a DAG, a task, and a point in time. Task instances also have an -indicative state, which could be "running", "success", "failed", "skipped", "up -for retry", etc. - -Task Lifecycle -============== - -A task goes through various stages from start to completion. In the Airflow UI -(graph and tree views), these stages are displayed by a color representing each -stage: - -.. image:: img/task_stages.png - -The complete lifecycle of the task looks like this: - -.. image:: img/task_lifecycle_diagram.png - -The happy flow consists of the following stages: - -1. No status (scheduler created empty task instance) -2. Scheduled (scheduler determined task instance needs to run) -3. Queued (scheduler sent task to executor to run on the queue) -4. Running (worker picked up a task and is now running it) -5. Success (task completed) - -There is also visual difference between scheduled and manually triggered -DAGs/tasks: - -.. image:: img/task_manual_vs_scheduled.png - -The DAGs/tasks with a black border are scheduled runs, whereas the non-bordered -DAGs/tasks are manually triggered, i.e. by ``airflow trigger_dag``. Workflows ========= @@ -475,7 +530,7 @@ it a number of worker slots. Tasks can then be associated with one of the existing pools by using the ``pool`` parameter when creating tasks (i.e., instantiating operators). -.. code:: python +.. code-block:: python aggregate_db_message_job = BashOperator( task_id='aggregate_db_message_job', @@ -585,7 +640,7 @@ If ``xcom_pull`` is passed a single string for ``task_ids``, then the most recent XCom value from that task is returned; if a list of ``task_ids`` is passed, then a corresponding list of XCom values is returned. -.. code:: python +.. code-block:: python # inside a PythonOperator called 'pushing_task' def push_function(): @@ -598,7 +653,7 @@ passed, then a corresponding list of XCom values is returned. It is also possible to pull XCom directly in a template, here's an example of what this may look like: -.. code:: jinja +.. code-block:: jinja SELECT * FROM {{ task_instance.xcom_pull(task_ids='foo', key='table_name') }} @@ -620,7 +675,7 @@ it can be useful to have some variables or configuration items accessible and modifiable through the UI. -.. code:: python +.. code-block:: python from airflow.models import Variable foo = Variable.get("foo") @@ -636,13 +691,13 @@ doesn't exist and no default is provided. You can use a variable from a jinja template with the syntax : -.. code:: bash +.. code-block:: bash echo {{ var.value.<variable_name> }} or if you need to deserialize a json object from the variable : -.. code:: bash +.. code-block:: bash echo {{ var.json.<variable_name> }} @@ -657,7 +712,7 @@ So if your variable key is ``FOO`` then the variable name should be ``AIRFLOW_VA For example, -.. code:: bash +.. code-block:: bash export AIRFLOW_VAR_FOO=BAR @@ -666,7 +721,7 @@ For example, You can use them in your DAGs as: -.. code:: python +.. code-block:: python from airflow.models import Variable foo = Variable.get("foo") @@ -712,7 +767,7 @@ The ``BranchPythonOperator`` can also be used with XComs allowing branching context to dynamically decide what branch to follow based on upstream tasks. For example: -.. code:: python +.. code-block:: python def branch_func(**kwargs): ti = kwargs['ti'] @@ -746,7 +801,7 @@ an implementation of the method ``choose_branch``. As with the callable for ``BranchPythonOperator``, this method should return the ID of a downstream task, or a list of task IDs, which will be run, and all others will be skipped. -.. code:: python +.. code-block:: python class MyBranchOperator(BaseBranchOperator): def choose_branch(self, context): @@ -888,7 +943,7 @@ in schedule level. Skipped tasks will cascade through trigger rules For example, consider the following DAG: -.. code:: python +.. code-block:: python #dags/branch_without_trigger.py import datetime as dt @@ -929,7 +984,7 @@ skipped tasks will cascade through ``all_success``. By setting ``trigger_rule`` to ``none_failed_or_skipped`` in ``join`` task, -.. code:: python +.. code-block:: python #dags/branch_with_trigger.py ... @@ -1022,7 +1077,7 @@ that no tasks run for more than 48 hours. Here's an example of what this may look like inside your ``airflow_local_settings.py``: -.. code:: python +.. code-block:: python def policy(task): if task.__class__.__name__ == 'HivePartitionSensor': @@ -1046,7 +1101,7 @@ that mutates the task instance. For example, this function re-routes the task to execute in a different queue during retries: -.. code:: python +.. code-block:: python def task_instance_mutation_hook(ti): if ti.try_number >= 1: @@ -1084,7 +1139,7 @@ This is especially useful if your tasks are built dynamically from configuration files, it allows you to expose the configuration that led to the related tasks in Airflow. -.. code:: python +.. code-block:: python """ ### My great DAG @@ -1114,7 +1169,7 @@ powerful tool to use in combination with macros (see the :doc:`macros-ref` secti For example, say you want to pass the execution date as an environment variable to a Bash script using the ``BashOperator``. -.. code:: python +.. code-block:: python # The execution date as YYYY-MM-DD date = "{{ ds }}" @@ -1137,7 +1192,7 @@ are marked as templated in the structure they belong to: fields registered in ``template_fields`` property will be submitted to template substitution, like the ``path`` field in the example below: -.. code:: python +.. code-block:: python class MyDataReader: template_fields = ['path'] @@ -1161,7 +1216,7 @@ are marked as templated in the structure they belong to: fields registered in Deep nested fields can also be substituted, as long as all intermediate fields are marked as template fields: -.. code:: python +.. code-block:: python class MyDataTransformer: template_fields = ['reader'] @@ -1191,7 +1246,7 @@ You can pass custom options to the Jinja ``Environment`` when creating your DAG. One common usage is to avoid Jinja from dropping a trailing newline from a template string: -.. code:: python +.. code-block:: python my_dag = DAG(dag_id='my-dag', jinja_environment_kwargs={ @@ -1220,7 +1275,7 @@ behavior is desired: This example illustrates some possibilities -.. code:: python +.. code-block:: python from airflow.exceptions import AirflowFailException, AirflowSkipException @@ -1311,7 +1366,7 @@ characters on a line following a ``#`` will be ignored. ``.airflowignore`` file should be put in your ``DAG_FOLDER``. For example, you can prepare a ``.airflowignore`` file with contents -.. code:: +.. code-block:: project_a tenant_[\d] diff --git a/docs/dag-run.rst b/docs/dag-run.rst index 313ac8e..2477d6a 100644 --- a/docs/dag-run.rst +++ b/docs/dag-run.rst @@ -84,7 +84,7 @@ If your DAG is written to handle its catchup (i.e., not limited to the interval, then you will want to turn catchup off. This can be done by setting ``catchup = False`` in DAG or ``catchup_by_default = False`` in the configuration file. When turned off, the scheduler creates a DAG run only for the latest interval. -.. code:: python +.. code-block:: python """ Code that goes along with the Airflow tutorial located at: @@ -136,7 +136,7 @@ This process is known as Backfill. You may want to backfill the data even in the cases when catchup is disabled. This can be done through CLI. Run the below command -.. code:: bash +.. code-block:: bash airflow backfill -s START_DATE -e END_DATE dag_id @@ -163,14 +163,14 @@ There are multiple options you can select to re-run - You can also clear the task through CLI using the command: -.. code:: bash +.. code-block:: bash airflow clear dag_id -t task_regex -s START_DATE -d END_DATE For the specified ``dag_id`` and time interval, the command clears all instances of the tasks matching the regex. For more options, you can check the help of the `clear command <cli-ref.html#clear>`_ : -.. code:: bash +.. code-block:: bash airflow clear -h @@ -179,7 +179,7 @@ External Triggers Note that DAG Runs can also be created manually through the CLI. Just run the command - -.. code:: bash +.. code-block:: bash airflow trigger_dag -e execution_date run_id diff --git a/docs/dag-serialization.rst b/docs/dag-serialization.rst index 3b65138..0edd644 100644 --- a/docs/dag-serialization.rst +++ b/docs/dag-serialization.rst @@ -89,7 +89,7 @@ Using a different JSON Library To use a different JSON library instead of the standard ``json`` library like ``ujson``, you need to define a ``json`` variable in local Airflow settings (``airflow_local_settings.py``) file as follows: -.. code:: python +.. code-block:: python import ujson json = ujson diff --git a/docs/faq.rst b/docs/faq.rst index d9eb1c3..80849e0 100644 --- a/docs/faq.rst +++ b/docs/faq.rst @@ -141,7 +141,7 @@ variable in the global namespace. This is easily done in python using the ``globals()`` function for the standard library, which behaves like a simple dictionary. -.. code:: python +.. code-block:: python def create_dag(dag_id): """ diff --git a/docs/howto/custom-operator.rst b/docs/howto/custom-operator.rst index a9733d2..ad3a808 100644 --- a/docs/howto/custom-operator.rst +++ b/docs/howto/custom-operator.rst @@ -36,7 +36,7 @@ There are two methods that you need to override in a derived class: Let's implement an example ``HelloOperator`` in a new file ``hello_operator.py``: -.. code:: python +.. code-block:: python from airflow.models.baseoperator import BaseOperator from airflow.utils.decorators import apply_defaults @@ -65,7 +65,7 @@ Let's implement an example ``HelloOperator`` in a new file ``hello_operator.py`` You can now use the derived custom operator as follows: -.. code:: python +.. code-block:: python from custom_operator.hello_operator import HelloOperator @@ -82,7 +82,7 @@ See :doc:`connection/index` for how to create and manage connections. Let's extend our previous example to fetch name from MySQL: -.. code:: python +.. code-block:: python class HelloDBOperator(BaseOperator): @@ -120,7 +120,7 @@ Airflow also allows the developer to control how the operator shows up in the DA Override ``ui_color`` to change the background color of the operator in UI. Override ``ui_fgcolor`` to change the color of the label. -.. code:: python +.. code-block:: python class HelloOperator(BaseOperator): ui_color = '#ff0000' @@ -133,7 +133,7 @@ You can use :ref:`Jinja templates <jinja-templating>` to parameterize your opera Airflow considers the field names present in ``template_fields`` for templating while rendering the operator. -.. code:: python +.. code-block:: python class HelloOperator(BaseOperator): @@ -154,7 +154,7 @@ the operator. You can use the template as follows: -.. code:: python +.. code-block:: python with dag: hello_task = HelloOperator(task_id='task_id_1', dag=dag, name='{{ task_instance.task_id }}') @@ -168,7 +168,7 @@ the extension of your file in ``template_ext``. If a ``template_field`` contains the extension mentioned in ``template_ext``, Jinja reads the content of the file and replace the templates with actual value. Note that Jinja substitutes the operator attributes and not the args. -.. code:: python +.. code-block:: python class HelloOperator(BaseOperator): diff --git a/docs/howto/email-config.rst b/docs/howto/email-config.rst index 3c7d301..2d95b7c 100644 --- a/docs/howto/email-config.rst +++ b/docs/howto/email-config.rst @@ -22,7 +22,7 @@ You can configure the email that is being sent in your ``airflow.cfg`` by setting a ``subject_template`` and/or a ``html_content_template`` in the ``email`` section. -.. code:: +.. code-block:: [email] @@ -35,7 +35,7 @@ To access the task's information you use `Jinja Templating <http://jinja.pocoo.o For example a ``html_content_template`` file could look like this: -.. code:: +.. code-block:: Try {{try_number}} out of {{max_tries + 1}}<br> Exception:<br>{{exception_html}}<br> diff --git a/docs/howto/secure-connections.rst b/docs/howto/secure-connections.rst index 57d8fef..d14bac7 100644 --- a/docs/howto/secure-connections.rst +++ b/docs/howto/secure-connections.rst @@ -32,7 +32,7 @@ You can still enable encryption for passwords within connections by following be #. Install crypto package ``pip install 'apache-airflow[crypto]'`` #. Generate fernet_key, using this code snippet below. ``fernet_key`` must be a base64-encoded 32-byte key: - .. code:: python + .. code-block:: python from cryptography.fernet import Fernet fernet_key= Fernet.generate_key() diff --git a/docs/kubernetes.rst b/docs/kubernetes.rst index 014df2e..e8a53bd 100644 --- a/docs/kubernetes.rst +++ b/docs/kubernetes.rst @@ -32,7 +32,7 @@ KubernetesPodOperator The :class:`~airflow.contrib.operators.kubernetes_pod_operator.KubernetesPodOperator` allows you to create Pods on Kubernetes. It works with any type of executor. -.. code:: python +.. code-block:: python from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator from airflow.kubernetes.secret import Secret @@ -148,7 +148,7 @@ This could be used, for instance, to add sidecar or init containers to every worker pod launched by KubernetesExecutor or KubernetesPodOperator. -.. code:: python +.. code-block:: python def pod_mutation_hook(pod: Pod): pod.annotations['airflow.apache.org/launched-by'] = 'Tests' diff --git a/docs/lineage.rst b/docs/lineage.rst index fd0618f..d32b41b 100644 --- a/docs/lineage.rst +++ b/docs/lineage.rst @@ -28,7 +28,7 @@ audit trails and data governance, but also debugging of data flows. Airflow tracks data by means of inlets and outlets of the tasks. Let's work from an example and see how it works. -.. code:: python +.. code-block:: python from airflow.operators.bash_operator import BashOperator from airflow.operators.dummy_operator import DummyOperator @@ -99,7 +99,7 @@ Apache Atlas Airflow can send its lineage metadata to Apache Atlas. You need to enable the ``atlas`` backend and configure it properly, e.g. in your ``airflow.cfg``: -.. code:: ini +.. code-block:: ini [lineage] backend = airflow.lineage.backend.atlas.AtlasBackend diff --git a/docs/plugins.rst b/docs/plugins.rst index 88559e0..4ca18cd 100644 --- a/docs/plugins.rst +++ b/docs/plugins.rst @@ -73,7 +73,7 @@ you want to plug into Airflow. Here's what the class you need to derive looks like: -.. code:: python +.. code-block:: python class AirflowPlugin(object): # The name of your plugin (str) @@ -122,7 +122,7 @@ After the plugin is imported into Airflow, you can invoke it using statement like -.. code:: python +.. code-block:: python from airflow.{type, like "operators", "sensors"}.{name specificed inside the plugin class} import * @@ -145,7 +145,7 @@ Example The code below defines a plugin that injects a set of dummy object definitions in Airflow. -.. code:: python +.. code-block:: python # This is the class you derive to create a plugin from airflow.plugins_manager import AirflowPlugin diff --git a/docs/scheduler.rst b/docs/scheduler.rst index 974a934..ebdc697 100644 --- a/docs/scheduler.rst +++ b/docs/scheduler.rst @@ -34,7 +34,7 @@ The scheduler uses the configured :doc:`Executor </executor/index>` to run tasks To start a scheduler, simply run the command: -.. code:: bash +.. code-block:: bash airflow scheduler diff --git a/docs/security.rst b/docs/security.rst index 0e49885..863a454 100644 --- a/docs/security.rst +++ b/docs/security.rst @@ -531,7 +531,7 @@ Viewer ^^^^^^ ``Viewer`` users have limited viewer permissions -.. code:: python +.. code-block:: python VIEWER_PERMS = { 'menu_access', @@ -562,7 +562,7 @@ Viewer on limited web views -.. code:: python +.. code-block:: python VIEWER_VMS = { 'Airflow', @@ -590,7 +590,7 @@ User ^^^^ ``User`` users have ``Viewer`` permissions plus additional user permissions -.. code:: python +.. code-block:: python USER_PERMS = { 'can_dagrun_clear', @@ -617,7 +617,7 @@ Op ^^ ``Op`` users have ``User`` permissions plus additional op permissions -.. code:: python +.. code-block:: python OP_PERMS = { 'can_conf', @@ -626,7 +626,7 @@ Op on ``User`` web views plus these additional op web views -.. code:: python +.. code-block:: python OP_VMS = { 'Admin', diff --git a/docs/timezone.rst b/docs/timezone.rst index 6a035cb..9249576 100644 --- a/docs/timezone.rst +++ b/docs/timezone.rst @@ -70,7 +70,7 @@ You can use ``timezone.is_localized()`` and ``timezone.is_naive()`` to determine Because Airflow uses time-zone-aware datetime objects. If your code creates datetime objects they need to be aware too. -.. code:: python +.. code-block:: python from airflow.utils import timezone @@ -88,7 +88,7 @@ in such a way that it is assumed that the naive date time is already in the defa words if you have a default time zone setting of ``Europe/Amsterdam`` and create a naive datetime ``start_date`` of ``datetime(2017,1,1)`` it is assumed to be a ``start_date`` of Jan 1, 2017 Amsterdam time. -.. code:: python +.. code-block:: python default_args=dict( start_date=datetime(2016, 1, 1), @@ -117,7 +117,7 @@ you just installed Airflow it will be set to ``utc``, which is recommended. You it is therefore important to make sure this setting is equal on all Airflow nodes. -.. code:: python +.. code-block:: python [core] default_timezone = utc @@ -129,7 +129,7 @@ Time zone aware DAGs Creating a time zone aware DAG is quite simple. Just make sure to supply a time zone aware ``start_date`` using ``pendulum``. -.. code:: python +.. code-block:: python import pendulum @@ -155,7 +155,7 @@ Templates Airflow returns time zone aware datetimes in templates, but does not convert them to local time so they remain in UTC. It is left up to the DAG to handle this. -.. code:: python +.. code-block:: python import pendulum diff --git a/docs/tutorial.rst b/docs/tutorial.rst index 873900d..926b149 100644 --- a/docs/tutorial.rst +++ b/docs/tutorial.rst @@ -191,7 +191,7 @@ Setting up Dependencies We have tasks ``t1``, ``t2`` and ``t3`` that do not depend on each other. Here's a few ways you can define dependencies between them: -.. code:: python +.. code-block:: python t1.set_downstream(t2) diff --git a/docs/ui.rst b/docs/ui.rst index fa55593..3cfc57a 100644 --- a/docs/ui.rst +++ b/docs/ui.rst @@ -34,7 +34,7 @@ In order to filter DAGs (e.g by team), you can add tags in each dag. The filter is saved in a cookie and can be reset by the reset button. For example: -.. code:: python +.. code-block:: python dag = DAG('dag', tags=['team1', 'sql'])