Latest docs version as of 1.8.x
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/commit/5e574012 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/tree/5e574012 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/diff/5e574012 Branch: refs/heads/asf-site Commit: 5e5740122ed33a22a30047e75e6ca4c7da3961b4 Parents: 9c75ee9 Author: Maxime Beauchemin <maxime.beauche...@apache.org> Authored: Mon Mar 6 08:43:25 2017 -0800 Committer: Maxime Beauchemin <maxime.beauche...@apache.org> Committed: Mon Mar 6 08:43:25 2017 -0800 ---------------------------------------------------------------------- _images/latest_only_with_trigger.png | Bin 0 -> 40034 bytes _sources/api.rst.txt | 43 + _sources/cli.rst.txt | 11 + _sources/code.rst.txt | 255 + _sources/concepts.rst.txt | 833 +++ _sources/configuration.rst.txt | 284 + _sources/faq.rst.txt | 147 + _sources/index.rst.txt | 89 + _sources/installation.rst.txt | 90 + _sources/integration.rst.txt | 246 + _sources/license.rst.txt | 211 + _sources/plugins.rst.txt | 144 + _sources/profiling.rst.txt | 39 + _sources/project.rst.txt | 49 + _sources/scheduler.rst.txt | 153 + _sources/security.rst.txt | 334 + _sources/start.rst.txt | 49 + _sources/tutorial.rst.txt | 429 ++ _sources/ui.rst.txt | 102 + _static/fonts/Inconsolata.ttf | Bin 0 -> 63184 bytes _static/jquery-3.1.0.js | 10074 ++++++++++++++++++++++++++++ api.html | 279 + integration.html | 424 ++ 23 files changed, 14285 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/blob/5e574012/_images/latest_only_with_trigger.png ---------------------------------------------------------------------- diff --git a/_images/latest_only_with_trigger.png b/_images/latest_only_with_trigger.png new file mode 100644 index 0000000..629adfa Binary files /dev/null and b/_images/latest_only_with_trigger.png differ http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/blob/5e574012/_sources/api.rst.txt ---------------------------------------------------------------------- diff --git a/_sources/api.rst.txt b/_sources/api.rst.txt new file mode 100644 index 0000000..eef671c --- /dev/null +++ b/_sources/api.rst.txt @@ -0,0 +1,43 @@ +Experimental Rest API +===================== + +Airflow exposes an experimental Rest API. It is available through the webserver. Endpoints are +available at /api/experimental/. Please note that we expect the endpoint definitions to change. + +Endpoints +--------- + +This is a place holder until the swagger definitions are active + +* /api/experimental/dags/<DAG_ID>/tasks/<TASK_ID> returns info for a task (GET). +* /api/experimental/dags/<DAG_ID>/dag_runs creates a dag_run for a given dag id (POST). + +CLI +----- + +For some functions the cli can use the API. To configure the CLI to use the API when available +configure as follows: + +.. code-block:: bash + + [cli] + api_client = airflow.api.client.json_client + endpoint_url = http://<WEBSERVER>:<PORT> + + +Authentication +-------------- + +Only Kerberos authentication is currently supported for the API. To enable this set the following +in the configuration: + +.. code-block:: bash + + [api] + auth_backend = airflow.api.auth.backend.default + + [kerberos] + keytab = <KEYTAB> + +The Kerberos service is configured as `airflow/fully.qualified.domainname@REALM`. Make sure this +principal exists in the keytab file. http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/blob/5e574012/_sources/cli.rst.txt ---------------------------------------------------------------------- diff --git a/_sources/cli.rst.txt b/_sources/cli.rst.txt new file mode 100644 index 0000000..f05cbfb --- /dev/null +++ b/_sources/cli.rst.txt @@ -0,0 +1,11 @@ +Command Line Interface +====================== + +Airflow has a very rich command line interface that allows for +many types of operation on a DAG, starting services, and supporting +development and testing. + +.. argparse:: + :module: airflow.bin.cli + :func: get_parser + :prog: airflow http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/blob/5e574012/_sources/code.rst.txt ---------------------------------------------------------------------- diff --git a/_sources/code.rst.txt b/_sources/code.rst.txt new file mode 100644 index 0000000..fabe6db --- /dev/null +++ b/_sources/code.rst.txt @@ -0,0 +1,255 @@ +API Reference +============= + +Operators +--------- +Operators allow for generation of certain types of tasks that become nodes in +the DAG when instantiated. All operators derive from BaseOperator and +inherit many attributes and methods that way. Refer to the BaseOperator +documentation for more details. + +There are 3 main types of operators: + +- Operators that performs an **action**, or tell another system to + perform an action +- **Transfer** operators move data from one system to another +- **Sensors** are a certain type of operator that will keep running until a + certain criterion is met. Examples include a specific file landing in HDFS or + S3, a partition appearing in Hive, or a specific time of the day. Sensors + are derived from ``BaseSensorOperator`` and run a poke + method at a specified ``poke_interval`` until it returns ``True``. + +BaseOperator +'''''''''''' +All operators are derived from ``BaseOperator`` and acquire much +functionality through inheritance. Since this is the core of the engine, +it's worth taking the time to understand the parameters of ``BaseOperator`` +to understand the primitive features that can be leveraged in your +DAGs. + + +.. autoclass:: airflow.models.BaseOperator + + +BaseSensorOperator +''''''''''''''''''' +All sensors are derived from ``BaseSensorOperator``. All sensors inherit +the ``timeout`` and ``poke_interval`` on top of the ``BaseOperator`` +attributes. + +.. autoclass:: airflow.operators.sensors.BaseSensorOperator + + +Operator API +'''''''''''' + +.. automodule:: airflow.operators + :show-inheritance: + :members: + BashOperator, + BranchPythonOperator, + TriggerDagRunOperator, + DummyOperator, + EmailOperator, + ExternalTaskSensor, + GenericTransfer, + HdfsSensor, + Hive2SambaOperator, + HiveOperator, + HivePartitionSensor, + HiveToDruidTransfer, + HiveToMySqlTransfer, + SimpleHttpOperator, + HttpSensor, + MetastorePartitionSensor, + MsSqlOperator, + MsSqlToHiveTransfer, + MySqlOperator, + MySqlToHiveTransfer, + NamedHivePartitionSensor, + PostgresOperator, + PrestoCheckOperator, + PrestoIntervalCheckOperator, + PrestoValueCheckOperator, + PythonOperator, + S3KeySensor, + S3ToHiveTransfer, + ShortCircuitOperator, + SlackAPIOperator, + SlackAPIPostOperator, + SqlSensor, + SubDagOperator, + TimeSensor, + WebHdfsSensor + +.. autoclass:: airflow.operators.docker_operator.DockerOperator + + +Community-contributed Operators +''''''''''''''''''''''''''''''' + +.. automodule:: airflow.contrib.operators + :show-inheritance: + :members: + SSHExecuteOperator, + VerticaOperator, + VerticaToHiveTransfer + +.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryOperator +.. autoclass:: airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator +.. autoclass:: airflow.contrib.operators.ecs_operator.ECSOperator +.. autoclass:: airflow.contrib.operators.gcs_download_operator.GoogleCloudStorageDownloadOperator +.. autoclass:: airflow.contrib.operators.QuboleOperator +.. autoclass:: airflow.contrib.operators.hipchat_operator.HipChatAPIOperator +.. autoclass:: airflow.contrib.operators.hipchat_operator.HipChatAPISendRoomNotificationOperator + +.. _macros: + +Macros +--------- +Here's a list of variables and macros that can be used in templates + + +Default Variables +''''''''''''''''' +The Airflow engine passes a few variables by default that are accessible +in all templates + +================================= ==================================== +Variable Description +================================= ==================================== +``{{ ds }}`` the execution date as ``YYYY-MM-DD`` +``{{ ds_nodash }}`` the execution date as ``YYYYMMDD`` +``{{ yesterday_ds }}`` yesterday's date as ``YYYY-MM-DD`` +``{{ yesterday_ds_nodash }}`` yesterday's date as ``YYYYMMDD`` +``{{ tomorrow_ds }}`` tomorrow's date as ``YYYY-MM-DD`` +``{{ tomorrow_ds_nodash }}`` tomorrow's date as ``YYYYMMDD`` +``{{ ts }}`` same as ``execution_date.isoformat()`` +``{{ ts_nodash }}`` same as ``ts`` without ``-`` and ``:`` +``{{ execution_date }}`` the execution_date, (datetime.datetime) +``{{ prev_execution_date }}`` the previous execution date (if available) (datetime.datetime) +``{{ next_execution_date }}`` the next execution date (datetime.datetime) +``{{ dag }}`` the DAG object +``{{ task }}`` the Task object +``{{ macros }}`` a reference to the macros package, described below +``{{ task_instance }}`` the task_instance object +``{{ end_date }}`` same as ``{{ ds }}`` +``{{ latest_date }}`` same as ``{{ ds }}`` +``{{ ti }}`` same as ``{{ task_instance }}`` +``{{ params }}`` a reference to the user-defined params dictionary +``{{ var.value.my_var }}`` global defined variables represented as a dictionary +``{{ var.json.my_var.path }}`` global defined variables represented as a dictionary + with deserialized JSON object, append the path to the + key within the JSON object +``{{ task_instance_key_str }}`` a unique, human-readable key to the task instance + formatted ``{dag_id}_{task_id}_{ds}`` +``conf`` the full configuration object located at + ``airflow.configuration.conf`` which + represents the content of your + ``airflow.cfg`` +``run_id`` the ``run_id`` of the current DAG run +``dag_run`` a reference to the DagRun object +``test_mode`` whether the task instance was called using + the CLI's test subcommand +================================= ==================================== + +Note that you can access the object's attributes and methods with simple +dot notation. Here are some examples of what is possible: +``{{ task.owner }}``, ``{{ task.task_id }}``, ``{{ ti.hostname }}``, ... +Refer to the models documentation for more information on the objects' +attributes and methods. + +The ``var`` template variable allows you to access variables defined in Airflow's +UI. You can access them as either plain-text or JSON. If you use JSON, you are +also able to walk nested structures, such as dictionaries like: +``{{ var.json.my_dict_var.key1 }}`` + +Macros +'''''' +Macros are a way to expose objects to your templates and live under the +``macros`` namespace in your templates. + +A few commonly used libraries and methods are made available. + + +================================= ==================================== +Variable Description +================================= ==================================== +``macros.datetime`` The standard lib's ``datetime.datetime`` +``macros.timedelta`` The standard lib's ``datetime.timedelta`` +``macros.dateutil`` A reference to the ``dateutil`` package +``macros.time`` The standard lib's ``time`` +``macros.uuid`` The standard lib's ``uuid`` +``macros.random`` The standard lib's ``random`` +================================= ==================================== + + +Some airflow specific macros are also defined: + +.. automodule:: airflow.macros + :show-inheritance: + :members: + +.. automodule:: airflow.macros.hive + :show-inheritance: + :members: + +.. _models_ref: + +Models +------ + +Models are built on top of the SQLAlchemy ORM Base class, and instances are +persisted in the database. + + +.. automodule:: airflow.models + :show-inheritance: + :members: DAG, BaseOperator, TaskInstance, DagBag, Connection + +Hooks +----- +.. automodule:: airflow.hooks + :show-inheritance: + :members: + DbApiHook, + HiveCliHook, + HiveMetastoreHook, + HiveServer2Hook, + HttpHook, + DruidHook, + MsSqlHook, + MySqlHook, + PostgresHook, + PrestoHook, + S3Hook, + SqliteHook, + WebHDFSHook + +Community contributed hooks +''''''''''''''''''''''''''' + +.. automodule:: airflow.contrib.hooks + :show-inheritance: + :members: + BigQueryHook, + GoogleCloudStorageHook, + VerticaHook, + FTPHook, + SSHHook, + CloudantHook + +.. autoclass:: airflow.contrib.hooks.gcs_hook.GoogleCloudStorageHook + +Executors +--------- +Executors are the mechanism by which task instances get run. + +.. automodule:: airflow.executors + :show-inheritance: + :members: LocalExecutor, CeleryExecutor, SequentialExecutor + +Community-contributed executors +''''''''''''''''''''''''''''''' + +.. autoclass:: airflow.contrib.executors.mesos_executor.MesosExecutor http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/blob/5e574012/_sources/concepts.rst.txt ---------------------------------------------------------------------- diff --git a/_sources/concepts.rst.txt b/_sources/concepts.rst.txt new file mode 100644 index 0000000..48c15a4 --- /dev/null +++ b/_sources/concepts.rst.txt @@ -0,0 +1,833 @@ +Concepts +######## + +The Airflow Platform is a tool for describing, executing, and monitoring +workflows. + +Core Ideas +'''''''''' + +DAGs +==== + +In Airflow, a ``DAG`` -- or a Directed Acyclic Graph -- is a collection of all +the tasks you want to run, organized in a way that reflects their relationships +and dependencies. + +For example, a simple DAG could consist of three tasks: A, B, and C. It could +say that A has to run successfully before B can run, but C can run anytime. It +could say that task A times out after 5 minutes, and B can be restarted up to 5 +times in case it fails. It might also say that the workflow will run every night +at 10pm, but shouldn't start until a certain date. + +In this way, a DAG describes *how* you want to carry out your workflow; but +notice that we haven't said anything about *what* we actually want to do! A, B, +and C could be anything. Maybe A prepares data for B to analyze while C sends an +email. Or perhaps A monitors your location so B can open your garage door while +C turns on your house lights. The important thing is that the DAG isn't +concerned with what its constituent tasks do; its job is to make sure that +whatever they do happens at the right time, or in the right order, or with the +right handling of any unexpected issues. + +DAGs are defined in standard Python files that are placed in Airflow's +``DAG_FOLDER``. Airflow will execute the code in each file to dynamically build +the ``DAG`` objects. You can have as many DAGs as you want, each describing an +arbitrary number of tasks. In general, each one should correspond to a single +logical workflow. + +Scope +----- + +Airflow will load any ``DAG`` object it can import from a DAGfile. Critically, +that means the DAG must appear in ``globals()``. Consider the following two +DAGs. Only ``dag_1`` will be loaded; the other one only appears in a local +scope. + +.. code:: python + + dag_1 = DAG('this_dag_will_be_discovered') + + def my_function() + dag_2 = DAG('but_this_dag_will_not') + + my_function() + +Sometimes this can be put to good use. For example, a common pattern with +``SubDagOperator`` is to define the subdag inside a function so that Airflow +doesn't try to load it as a standalone DAG. + +Default Arguments +----------------- + +If a dictionary of ``default_args`` is passed to a DAG, it will apply them to +any of its operators. This makes it easy to apply a common parameter to many operators without having to type it many times. + +.. code:: python + + default_args=dict( + start_date=datetime(2016, 1, 1), + owner='Airflow') + + dag = DAG('my_dag', default_args=default_args) + op = DummyOperator(task_id='dummy', dag=dag) + print(op.owner) # Airflow + +Context Manager +--------------- + +*Added in Airflow 1.8* + +DAGs can be used as context managers to automatically assign new operators to that DAG. + +.. code:: python + + with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag: + op = DummyOperator('op') + + op.dag is dag # True + +Operators +========= + +While DAGs describe *how* to run a workflow, ``Operators`` determine what +actually gets done. + +An operator describes a single task in a workflow. Operators are usually (but +not always) atomic, meaning they can stand on their own and don't need to share +resources with any other operators. The DAG will make sure that operators run in +the correct certain order; other than those dependencies, operators generally +run independently. In fact, they may run on two completely different machines. + +This is a subtle but very important point: in general, if two operators need to +share information, like a filename or small amount of data, you should consider +combining them into a single operator. If it absolutely can't be avoided, +Airflow does have a feature for operator cross-communication called XCom that is +described elsewhere in this document. + +Airflow provides operators for many common tasks, including: + +- ``BashOperator`` - executes a bash command +- ``PythonOperator`` - calls an arbitrary Python function +- ``EmailOperator`` - sends an email +- ``HTTPOperator`` - sends an HTTP request +- ``SqlOperator`` - executes a SQL command +- ``Sensor`` - waits for a certain time, file, database row, S3 key, etc... + + +In addition to these basic building blocks, there are many more specific +operators: ``DockerOperator``, ``HiveOperator``, ``S3FileTransferOperator``, +``PrestoToMysqlOperator``, ``SlackOperator``... you get the idea! + +The ``airflow/contrib/`` directory contains yet more operators built by the +community. These operators aren't always as complete or well-tested as those in +the main distribution, but allow users to more easily add new functionality to +the platform. + +Operators are only loaded by Airflow if they are assigned to a DAG. + +DAG Assignment +-------------- + +*Added in Airflow 1.8* + +Operators do not have to be assigned to DAGs immediately (previously ``dag`` was +a required argument). However, once an operator is assigned to a DAG, it can not +be transferred or unassigned. DAG assignment can be done explicitly when the +operator is created, through deferred assignment, or even inferred from other +operators. + +.. code:: python + + dag = DAG('my_dag', start_date=datetime(2016, 1, 1)) + + # sets the DAG explicitly + explicit_op = DummyOperator(task_id='op1', dag=dag) + + # deferred DAG assignment + deferred_op = DummyOperator(task_id='op2') + deferred_op.dag = dag + + # inferred DAG assignment (linked operators must be in the same DAG) + inferred_op = DummyOperator(task_id='op3') + inferred_op.set_upstream(deferred_op) + + +Bitshift Composition +-------------------- + +*Added in Airflow 1.8* + +Traditionally, operator relationships are set with the ``set_upstream()`` and +``set_downstream()`` methods. In Airflow 1.8, this can be done with the Python +bitshift operators ``>>`` and ``<<``. The following four statements are all +functionally equivalent: + +.. code:: python + + op1 >> op2 + op1.set_downstream(op2) + + op2 << op1 + op2.set_upstream(op1) + +When using the bitshift to compose operators, the relationship is set in the +direction that the bitshift operator points. For example, ``op1 >> op2`` means +that ``op1`` runs first and ``op2`` runs second. Multiple operators can be +composed -- keep in mind the chain is executed left-to-right and the rightmost +object is always returned. For example: + +.. code:: python + + op1 >> op2 >> op3 << op4 + +is equivalent to: + +.. code:: python + + op1.set_downstream(op2) + op2.set_downstream(op3) + op3.set_upstream(op4) + +For convenience, the bitshift operators can also be used with DAGs. For example: + +.. code:: python + + dag >> op1 >> op2 + +is equivalent to: + +.. code:: python + + op1.dag = dag + op1.set_downstream(op2) + +We can put this all together to build a simple pipeline: + +.. code:: python + + with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag: + ( + DummyOperator(task_id='dummy_1') + >> BashOperator( + task_id='bash_1', + bash_command='echo "HELLO!"') + >> PythonOperator( + task_id='python_1', + python_callable=lambda: print("GOODBYE!")) + ) + +Tasks +===== + +Once an operator is instantiated, it is referred to as a "task". The +instantiation defines specific values when calling the abstract operator, and +the parameterized task becomes a node in a DAG. + +Task Instances +============== + +A task instance represents a specific run of a task and is characterized as the +combination of a dag, a task, and a point in time. Task instances also have an +indicative state, which could be "running", "success", "failed", "skipped", "up +for retry", etc. + +Workflows +========= + +You're now familiar with the core building blocks of Airflow. +Some of the concepts may sound very similar, but the vocabulary can +be conceptualized like this: + +- DAG: a description of the order in which work should take place +- Operator: a class that acts as a template for carrying out some work +- Task: a parameterized instance of an operator +- Task Instance: a task that 1) has been assigned to a DAG and 2) has a + state associated with a specific run of the DAG + +By combining ``DAGs`` and ``Operators`` to create ``TaskInstances``, you can +build complex workflows. + +Additional Functionality +'''''''''''''''''''''''' + +In addition to the core Airflow objects, there are a number of more complex +features that enable behaviors like limiting simultaneous access to resources, +cross-communication, conditional execution, and more. + +Hooks +===== + +Hooks are interfaces to external platforms and databases like Hive, S3, +MySQL, Postgres, HDFS, and Pig. Hooks implement a common interface when +possible, and act as a building block for operators. They also use +the ``airflow.models.Connection`` model to retrieve hostnames +and authentication information. Hooks keep authentication code and +information out of pipelines, centralized in the metadata database. + +Hooks are also very useful on their own to use in Python scripts, +Airflow airflow.operators.PythonOperator, and in interactive environments +like iPython or Jupyter Notebook. + +Pools +===== + +Some systems can get overwhelmed when too many processes hit them at the same +time. Airflow pools can be used to **limit the execution parallelism** on +arbitrary sets of tasks. The list of pools is managed in the UI +(``Menu -> Admin -> Pools``) by giving the pools a name and assigning +it a number of worker slots. Tasks can then be associated with +one of the existing pools by using the ``pool`` parameter when +creating tasks (i.e., instantiating operators). + +.. code:: python + + aggregate_db_message_job = BashOperator( + task_id='aggregate_db_message_job', + execution_timeout=timedelta(hours=3), + pool='ep_data_pipeline_db_msg_agg', + bash_command=aggregate_db_message_job_cmd, + dag=dag) + aggregate_db_message_job.set_upstream(wait_for_empty_queue) + +The ``pool`` parameter can +be used in conjunction with ``priority_weight`` to define priorities +in the queue, and which tasks get executed first as slots open up in the +pool. The default ``priority_weight`` is ``1``, and can be bumped to any +number. When sorting the queue to evaluate which task should be executed +next, we use the ``priority_weight``, summed up with all of the +``priority_weight`` values from tasks downstream from this task. You can +use this to bump a specific important task and the whole path to that task +gets prioritized accordingly. + +Tasks will be scheduled as usual while the slots fill up. Once capacity is +reached, runnable tasks get queued and their state will show as such in the +UI. As slots free up, queued tasks start running based on the +``priority_weight`` (of the task and its descendants). + +Note that by default tasks aren't assigned to any pool and their +execution parallelism is only limited to the executor's setting. + +Connections +=========== + +The connection information to external systems is stored in the Airflow +metadata database and managed in the UI (``Menu -> Admin -> Connections``) +A ``conn_id`` is defined there and hostname / login / password / schema +information attached to it. Airflow pipelines can simply refer to the +centrally managed ``conn_id`` without having to hard code any of this +information anywhere. + +Many connections with the same ``conn_id`` can be defined and when that +is the case, and when the **hooks** uses the ``get_connection`` method +from ``BaseHook``, Airflow will choose one connection randomly, allowing +for some basic load balancing and fault tolerance when used in conjunction +with retries. + +Airflow also has the ability to reference connections via environment +variables from the operating system. The environment variable needs to be +prefixed with ``AIRFLOW_CONN_`` to be considered a connection. When +referencing the connection in the Airflow pipeline, the ``conn_id`` should +be the name of the variable without the prefix. For example, if the ``conn_id`` +is named ``postgres_master`` the environment variable should be named +``AIRFLOW_CONN_POSTGRES_MASTER`` (note that the environment variable must be +all uppercase). Airflow assumes the value returned from the environment +variable to be in a URI format (e.g. +``postgres://user:password@localhost:5432/master`` or ``s3://accesskey:secretkey@S3``). + +Queues +====== + +When using the CeleryExecutor, the celery queues that tasks are sent to +can be specified. ``queue`` is an attribute of BaseOperator, so any +task can be assigned to any queue. The default queue for the environment +is defined in the ``airflow.cfg``'s ``celery -> default_queue``. This defines +the queue that tasks get assigned to when not specified, as well as which +queue Airflow workers listen to when started. + +Workers can listen to one or multiple queues of tasks. When a worker is +started (using the command ``airflow worker``), a set of comma delimited +queue names can be specified (e.g. ``airflow worker -q spark``). This worker +will then only pick up tasks wired to the specified queue(s). + +This can be useful if you need specialized workers, either from a +resource perspective (for say very lightweight tasks where one worker +could take thousands of tasks without a problem), or from an environment +perspective (you want a worker running from within the Spark cluster +itself because it needs a very specific environment and security rights). + +XComs +===== + +XComs let tasks exchange messages, allowing more nuanced forms of control and +shared state. The name is an abbreviation of "cross-communication". XComs are +principally defined by a key, value, and timestamp, but also track attributes +like the task/DAG that created the XCom and when it should become visible. Any +object that can be pickled can be used as an XCom value, so users should make +sure to use objects of appropriate size. + +XComs can be "pushed" (sent) or "pulled" (received). When a task pushes an +XCom, it makes it generally available to other tasks. Tasks can push XComs at +any time by calling the ``xcom_push()`` method. In addition, if a task returns +a value (either from its Operator's ``execute()`` method, or from a +PythonOperator's ``python_callable`` function), then an XCom containing that +value is automatically pushed. + +Tasks call ``xcom_pull()`` to retrieve XComs, optionally applying filters +based on criteria like ``key``, source ``task_ids``, and source ``dag_id``. By +default, ``xcom_pull()`` filters for the keys that are automatically given to +XComs when they are pushed by being returned from execute functions (as +opposed to XComs that are pushed manually). + +If ``xcom_pull`` is passed a single string for ``task_ids``, then the most +recent XCom value from that task is returned; if a list of ``task_ids`` is +passed, then a correpsonding list of XCom values is returned. + +.. code:: python + + # inside a PythonOperator called 'pushing_task' + def push_function(): + return value + + # inside another PythonOperator where provide_context=True + def pull_function(**context): + value = context['task_instance'].xcom_pull(task_ids='pushing_task') + +It is also possible to pull XCom directly in a template, here's an example +of what this may look like: + +.. code:: sql + + SELECT * FROM {{ task_instance.xcom_pull(task_ids='foo', key='table_name') }} + +Note that XComs are similar to `Variables`_, but are specifically designed +for inter-task communication rather than global settings. + + +Variables +========= + +Variables are a generic way to store and retrieve arbitrary content or +settings as a simple key value store within Airflow. Variables can be +listed, created, updated and deleted from the UI (``Admin -> Variables``), +code or CLI. While your pipeline code definition and most of your constants +and variables should be defined in code and stored in source control, +it can be useful to have some variables or configuration items +accessible and modifiable through the UI. + + +.. code:: python + + from airflow.models import Variable + foo = Variable.get("foo") + bar = Variable.get("bar", deserialize_json=True) + +The second call assumes ``json`` content and will be deserialized into +``bar``. Note that ``Variable`` is a sqlalchemy model and can be used +as such. + + +Branching +========= + +Sometimes you need a workflow to branch, or only go down a certain path +based on an arbitrary condition which is typically related to something +that happened in an upstream task. One way to do this is by using the +``BranchPythonOperator``. + +The ``BranchPythonOperator`` is much like the PythonOperator except that it +expects a python_callable that returns a task_id. The task_id returned +is followed, and all of the other paths are skipped. +The task_id returned by the Python function has to be referencing a task +directly downstream from the BranchPythonOperator task. + +Note that using tasks with ``depends_on_past=True`` downstream from +``BranchPythonOperator`` is logically unsound as ``skipped`` status +will invariably lead to block tasks that depend on their past successes. +``skipped`` states propagates where all directly upstream tasks are +``skipped``. + +If you want to skip some tasks, keep in mind that you can't have an empty +path, if so make a dummy task. + +like this, the dummy task "branch_false" is skipped + +.. image:: img/branch_good.png + +Not like this, where the join task is skipped + +.. image:: img/branch_bad.png + +SubDAGs +======= + +SubDAGs are perfect for repeating patterns. Defining a function that returns a +DAG object is a nice design pattern when using Airflow. + +Airbnb uses the *stage-check-exchange* pattern when loading data. Data is staged +in a temporary table, after which data quality checks are performed against +that table. Once the checks all pass the partition is moved into the production +table. + +As another example, consider the following DAG: + +.. image:: img/subdag_before.png + +We can combine all of the parallel ``task-*`` operators into a single SubDAG, +so that the resulting DAG resembles the following: + +.. image:: img/subdag_after.png + +Note that SubDAG operators should contain a factory method that returns a DAG +object. This will prevent the SubDAG from being treated like a separate DAG in +the main UI. For example: + +.. code:: python + + #dags/subdag.py + from airflow.models import DAG + from airflow.operators.dummy_operator import DummyOperator + + + # Dag is returned by a factory method + def sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval): + dag = DAG( + '%s.%s' % (parent_dag_name, child_dag_name), + schedule_interval=schedule_interval, + start_date=start_date, + ) + + dummy_operator = DummyOperator( + task_id='dummy_task', + dag=dag, + ) + + return dag + +This SubDAG can then be referenced in your main DAG file: + +.. code:: python + + # main_dag.py + from datetime import datetime, timedelta + from airflow.models import DAG + from airflow.operators.subdag_operator import SubDagOperator + from dags.subdag import sub_dag + + + PARENT_DAG_NAME = 'parent_dag' + CHILD_DAG_NAME = 'child_dag' + + main_dag = DAG( + dag_id=PARENT_DAG_NAME, + schedule_interval=timedelta(hours=1), + start_date=datetime(2016, 1, 1) + ) + + sub_dag = SubDagOperator( + subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, main_dag.start_date, + main_dag.schedule_interval), + task_id=CHILD_DAG_NAME, + dag=main_dag, + ) + +You can zoom into a SubDagOperator from the graph view of the main DAG to show +the tasks contained within the SubDAG: + +.. image:: img/subdag_zoom.png + +Some other tips when using SubDAGs: + +- by convention, a SubDAG's ``dag_id`` should be prefixed by its parent and + a dot. As in ``parent.child`` +- share arguments between the main DAG and the SubDAG by passing arguments to + the SubDAG operator (as demonstrated above) +- SubDAGs must have a schedule and be enabled. If the SubDAG's schedule is + set to ``None`` or ``@once``, the SubDAG will succeed without having done + anything +- clearing a SubDagOperator also clears the state of the tasks within +- marking success on a SubDagOperator does not affect the state of the tasks + within +- refrain from using ``depends_on_past=True`` in tasks within the SubDAG as + this can be confusing +- it is possible to specify an executor for the SubDAG. It is common to use + the SequentialExecutor if you want to run the SubDAG in-process and + effectively limit its parallelism to one. Using LocalExecutor can be + problematic as it may over-subscribe your worker, running multiple tasks in + a single slot + +See ``airflow/example_dags`` for a demonstration. + +SLAs +==== + +Service Level Agreements, or time by which a task or DAG should have +succeeded, can be set at a task level as a ``timedelta``. If +one or many instances have not succeeded by that time, an alert email is sent +detailing the list of tasks that missed their SLA. The event is also recorded +in the database and made available in the web UI under ``Browse->Missed SLAs`` +where events can be analyzed and documented. + + +Trigger Rules +============= + +Though the normal workflow behavior is to trigger tasks when all their +directly upstream tasks have succeeded, Airflow allows for more complex +dependency settings. + +All operators have a ``trigger_rule`` argument which defines the rule by which +the generated task get triggered. The default value for ``trigger_rule`` is +``all_success`` and can be defined as "trigger this task when all directly +upstream tasks have succeeded". All other rules described here are based +on direct parent tasks and are values that can be passed to any operator +while creating tasks: + +* ``all_success``: (default) all parents have succeeded +* ``all_failed``: all parents are in a ``failed`` or ``upstream_failed`` state +* ``all_done``: all parents are done with their execution +* ``one_failed``: fires as soon as at least one parent has failed, it does not wait for all parents to be done +* ``one_success``: fires as soon as at least one parent succeeds, it does not wait for all parents to be done +* ``dummy``: dependencies are just for show, trigger at will + +Note that these can be used in conjunction with ``depends_on_past`` (boolean) +that, when set to ``True``, keeps a task from getting triggered if the +previous schedule for the task hasn't succeeded. + + +Latest Run Only +=============== + +Standard workflow behavior involves running a series of tasks for a +particular date/time range. Some workflows, however, perform tasks that +are independent of run time but need to be run on a schedule, much like a +standard cron job. In these cases, backfills or running jobs missed during +a pause just wastes CPU cycles. + +For situations like this, you can use the ``LatestOnlyOperator`` to skip +tasks that are not being run during the most recent scheduled run for a +DAG. The ``LatestOnlyOperator`` skips all immediate downstream tasks, and +itself, if the time right now is not between its ``execution_time`` and the +next scheduled ``execution_time``. + +One must be aware of the interaction between skipped tasks and trigger +rules. Skipped tasks will cascade through trigger rules ``all_success`` +and ``all_failed`` but not ``all_done``, ``one_failed``, ``one_success``, +and ``dummy``. If you would like to use the ``LatestOnlyOperator`` with +trigger rules that do not cascade skips, you will need to ensure that the +``LatestOnlyOperator`` is **directly** upstream of the task you would like +to skip. + +It is possible, through use of trigger rules to mix tasks that should run +in the typical date/time dependent mode and those using the +``LatestOnlyOperator``. + +For example, consider the following dag: + +.. code:: python + + #dags/latest_only_with_trigger.py + import datetime as dt + + from airflow.models import DAG + from airflow.operators.dummy_operator import DummyOperator + from airflow.operators.latest_only_operator import LatestOnlyOperator + from airflow.utils.trigger_rule import TriggerRule + + + dag = DAG( + dag_id='latest_only_with_trigger', + schedule_interval=dt.timedelta(hours=4), + start_date=dt.datetime(2016, 9, 20), + ) + + latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag) + + task1 = DummyOperator(task_id='task1', dag=dag) + task1.set_upstream(latest_only) + + task2 = DummyOperator(task_id='task2', dag=dag) + + task3 = DummyOperator(task_id='task3', dag=dag) + task3.set_upstream([task1, task2]) + + task4 = DummyOperator(task_id='task4', dag=dag, + trigger_rule=TriggerRule.ALL_DONE) + task4.set_upstream([task1, task2]) + +In the case of this dag, the ``latest_only`` task will show up as skipped +for all runs except the latest run. ``task1`` is directly downstream of +``latest_only`` and will also skip for all runs except the latest. +``task2`` is entirely independent of ``latest_only`` and will run in all +scheduled periods. ``task3`` is downstream of ``task1`` and ``task2`` and +because of the default ``trigger_rule`` being ``all_success`` will receive +a cascaded skip from ``task1``. ``task4`` is downstream of ``task1`` and +``task2`` but since its ``trigger_rule`` is set to ``all_done`` it will +trigger as soon as ``task1`` has been skipped (a valid completion state) +and ``task2`` has succeeded. + +.. image:: img/latest_only_with_trigger.png + + +Zombies & Undeads +================= + +Task instances die all the time, usually as part of their normal life cycle, +but sometimes unexpectedly. + +Zombie tasks are characterized by the absence +of an heartbeat (emitted by the job periodically) and a ``running`` status +in the database. They can occur when a worker node can't reach the database, +when Airflow processes are killed externally, or when a node gets rebooted +for instance. Zombie killing is performed periodically by the scheduler's +process. + +Undead processes are characterized by the existence of a process and a matching +heartbeat, but Airflow isn't aware of this task as ``running`` in the database. +This mismatch typically occurs as the state of the database is altered, +most likely by deleting rows in the "Task Instances" view in the UI. +Tasks are instructed to verify their state as part of the heartbeat routine, +and terminate themselves upon figuring out that they are in this "undead" +state. + + +Cluster Policy +============== + +Your local airflow settings file can define a ``policy`` function that +has the ability to mutate task attributes based on other task or DAG +attributes. It receives a single argument as a reference to task objects, +and is expected to alter its attributes. + +For example, this function could apply a specific queue property when +using a specific operator, or enforce a task timeout policy, making sure +that no tasks run for more than 48 hours. Here's an example of what this +may look like inside your ``airflow_settings.py``: + + +.. code:: python + + def policy(task): + if task.__class__.__name__ == 'HivePartitionSensor': + task.queue = "sensor_queue" + if task.timeout > timedelta(hours=48): + task.timeout = timedelta(hours=48) + + +Documentation & Notes +===================== + +It's possible to add documentation or notes to your dags & task objects that +become visible in the web interface ("Graph View" for dags, "Task Details" for +tasks). There are a set of special task attributes that get rendered as rich +content if defined: + +========== ================ +attribute rendered to +========== ================ +doc monospace +doc_json json +doc_yaml yaml +doc_md markdown +doc_rst reStructuredText +========== ================ + +Please note that for dags, dag_md is the only attribute interpreted. + +This is especially useful if your tasks are built dynamically from +configuration files, it allows you to expose the configuration that led +to the related tasks in Airflow. + +.. code:: python + + """ + ### My great DAG + """ + + dag = DAG('my_dag', default_args=default_args) + dag.doc_md = __doc__ + + t = BashOperator("foo", dag=dag) + t.doc_md = """\ + #Title" + Here's a [url](www.airbnb.com) + """ + +This content will get rendered as markdown respectively in the "Graph View" and +"Task Details" pages. + +Jinja Templating +================ + +Airflow leverages the power of +`Jinja Templating <http://jinja.pocoo.org/docs/dev/>`_ and this can be a +powerful tool to use in combination with macros (see the :ref:`macros` section). + +For example, say you want to pass the execution date as an environment variable +to a Bash script using the ``BashOperator``. + +.. code:: python + + # The execution date as YYYY-MM-DD + date = "{{ ds }}" + t = BashOperator( + task_id='test_env', + bash_command='/tmp/test.sh ', + dag=dag, + env={'EXECUTION_DATE': date}) + +Here, ``{{ ds }}`` is a macro, and because the ``env`` parameter of the +``BashOperator`` is templated with Jinja, the execution date will be available +as an environment variable named ``EXECUTION_DATE`` in your Bash script. + +You can use Jinja templating with every parameter that is marked as "templated" +in the documentation. + +Packaged dags +''''''''''''' +While often you will specify dags in a single ``.py`` file it might sometimes +be required to combine dag and its dependencies. For example, you might want +to combine several dags together to version them together or you might want +to manage them together or you might need an extra module that is not available +by default on the system you are running airflow on. To allow this you can create +a zip file that contains the dag(s) in the root of the zip file and have the extra +modules unpacked in directories. + +For instance you can create a zip file that looks like this: + +.. code-block:: bash + + my_dag1.py + my_dag2.py + package1/__init__.py + package1/functions.py + +Airflow will scan the zip file and try to load ``my_dag1.py`` and ``my_dag2.py``. +It will not go into subdirectories as these are considered to be potential +packages. + +In case you would like to add module dependencies to your DAG you basically would +do the same, but then it is more to use a virtualenv and pip. + +.. code-block:: bash + + virtualenv zip_dag + source zip_dag/bin/activate + + mkdir zip_dag_contents + cd zip_dag_contents + + pip install --install-option="--install-lib=$PWD" my_useful_package + cp ~/my_dag.py . + + zip -r zip_dag.zip * + +.. note:: the zip file will be inserted at the beginning of module search list + (sys.path) and as such it will be available to any other code that resides + within the same interpreter. + +.. note:: packaged dags cannot be used with pickling turned on. + +.. note:: packaged dags cannot contain dynamic libraries (eg. libz.so) these need + to be available on the system if a module needs those. In other words only + pure python modules can be packaged. + http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/blob/5e574012/_sources/configuration.rst.txt ---------------------------------------------------------------------- diff --git a/_sources/configuration.rst.txt b/_sources/configuration.rst.txt new file mode 100644 index 0000000..5ff4284 --- /dev/null +++ b/_sources/configuration.rst.txt @@ -0,0 +1,284 @@ +Configuration +------------- + +Setting up the sandbox in the :doc:`start` section was easy; +building a production-grade environment requires a bit more work! + +.. _setting-options: + +Setting Configuration Options +''''''''''''''''''''''''''''' + +The first time you run Airflow, it will create a file called ``airflow.cfg`` in +your ``$AIRFLOW_HOME`` directory (``~/airflow`` by default). This file contains Airflow's configuration and you +can edit it to change any of the settings. You can also set options with environment variables by using this format: +``$AIRFLOW__{SECTION}__{KEY}`` (note the double underscores). + +For example, the +metadata database connection string can either be set in ``airflow.cfg`` like this: + +.. code-block:: bash + + [core] + sql_alchemy_conn = my_conn_string + +or by creating a corresponding environment variable: + +.. code-block:: bash + + AIRFLOW__CORE__SQL_ALCHEMY_CONN=my_conn_string + +You can also derive the connection string at run time by appending ``_cmd`` to the key like this: + +.. code-block:: bash + + [core] + sql_alchemy_conn_cmd = bash_command_to_run + +But only three such configuration elements namely sql_alchemy_conn, broker_url and celery_result_backend can be fetched as a command. The idea behind this is to not store passwords on boxes in plain text files. The order of precedence is as follows - + +1. environment variable +2. configuration in airflow.cfg +3. command in airflow.cfg +4. default + +Setting up a Backend +'''''''''''''''''''' +If you want to take a real test drive of Airflow, you should consider +setting up a real database backend and switching to the LocalExecutor. + +As Airflow was built to interact with its metadata using the great SqlAlchemy +library, you should be able to use any database backend supported as a +SqlAlchemy backend. We recommend using **MySQL** or **Postgres**. + +.. note:: If you decide to use **Postgres**, we recommend using the ``psycopg2`` + driver and specifying it in your SqlAlchemy connection string. + Also note that since SqlAlchemy does not expose a way to target a + specific schema in the Postgres connection URI, you may + want to set a default schema for your role with a + command similar to ``ALTER ROLE username SET search_path = airflow, foobar;`` + +Once you've setup your database to host Airflow, you'll need to alter the +SqlAlchemy connection string located in your configuration file +``$AIRFLOW_HOME/airflow.cfg``. You should then also change the "executor" +setting to use "LocalExecutor", an executor that can parallelize task +instances locally. + +.. code-block:: bash + + # initialize the database + airflow initdb + +Connections +''''''''''' +Airflow needs to know how to connect to your environment. Information +such as hostname, port, login and passwords to other systems and services is +handled in the ``Admin->Connection`` section of the UI. The pipeline code you +will author will reference the 'conn_id' of the Connection objects. + +.. image:: img/connections.png + +By default, Airflow will save the passwords for the connection in plain text +within the metadata database. The ``crypto`` package is highly recommended +during installation. The ``crypto`` package does require that your operating +system have libffi-dev installed. + +Connections in Airflow pipelines can be created using environment variables. +The environment variable needs to have a prefix of ``AIRFLOW_CONN_`` for +Airflow with the value in a URI format to use the connection properly. Please +see the :doc:`concepts` documentation for more information on environment +variables and connections. + +Scaling Out with Celery +''''''''''''''''''''''' +``CeleryExecutor`` is one of the ways you can scale out the number of workers. For this +to work, you need to setup a Celery backend (**RabbitMQ**, **Redis**, ...) and +change your ``airflow.cfg`` to point the executor parameter to +``CeleryExecutor`` and provide the related Celery settings. + +For more information about setting up a Celery broker, refer to the +exhaustive `Celery documentation on the topic <http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html>`_. + +Here are a few imperative requirements for your workers: + +- ``airflow`` needs to be installed, and the CLI needs to be in the path +- Airflow configuration settings should be homogeneous across the cluster +- Operators that are executed on the worker need to have their dependencies + met in that context. For example, if you use the ``HiveOperator``, + the hive CLI needs to be installed on that box, or if you use the + ``MySqlOperator``, the required Python library needs to be available in + the ``PYTHONPATH`` somehow +- The worker needs to have access to its ``DAGS_FOLDER``, and you need to + synchronize the filesystems by your own means. A common setup would be to + store your DAGS_FOLDER in a Git repository and sync it across machines using + Chef, Puppet, Ansible, or whatever you use to configure machines in your + environment. If all your boxes have a common mount point, having your + pipelines files shared there should work as well + + +To kick off a worker, you need to setup Airflow and kick off the worker +subcommand + +.. code-block:: bash + + airflow worker + +Your worker should start picking up tasks as soon as they get fired in +its direction. + +Note that you can also run "Celery Flower", a web UI built on top of Celery, +to monitor your workers. You can use the shortcut command ``airflow flower`` +to start a Flower web server. + + +Scaling Out with Dask +''''''''''''''''''''' + +``DaskExecutor`` allows you to run Airflow tasks in a Dask Distributed cluster. + +Dask clusters can be run on a single machine or on remote networks. For complete +details, consult the `Distributed documentation <https://distributed.readthedocs.io/>`_. + +To create a cluster, first start a Scheduler: + +.. code-block:: bash + + # default settings for a local cluster + DASK_HOST=127.0.0.1 + DASK_PORT=8786 + + dask-scheduler --host $DASK_HOST --port $DASK_PORT + +Next start at least one Worker on any machine that can connect to the host: + +.. code-block:: bash + + dask-worker $DASK_HOST:$DASK_PORT + +Edit your ``airflow.cfg`` to set your executor to ``DaskExecutor`` and provide +the Dask Scheduler address in the ``[dask]`` section. + +Please note: + +- Each Dask worker must be able to import Airflow and any dependencies you + require. +- Dask does not support queues. If an Airflow task was created with a queue, a + warning will be raised but the task will be submitted to the cluster. + + +Logs +'''' +Users can specify a logs folder in ``airflow.cfg``. By default, it is in +the ``AIRFLOW_HOME`` directory. + +In addition, users can supply a remote location for storing logs and log backups +in cloud storage. At this time, Amazon S3 and Google Cloud Storage are supported. +To enable this feature, ``airflow.cfg`` must be configured as in this example: + +.. code-block:: bash + + [core] + # Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users + # must supply a remote location URL (starting with either 's3://...' or + # 'gs://...') and an Airflow connection id that provides access to the storage + # location. + remote_base_log_folder = s3://my-bucket/path/to/logs + remote_log_conn_id = MyS3Conn + # Use server-side encryption for logs stored in S3 + encrypt_s3_logs = False + +Remote logging uses an existing Airflow connection to read/write logs. If you don't +have a connection properly setup, this will fail. In the above example, Airflow will +try to use ``S3Hook('MyS3Conn')``. + +In the Airflow Web UI, local logs take precedance over remote logs. If local logs +can not be found or accessed, the remote logs will be displayed. Note that logs +are only sent to remote storage once a task completes (including failure). In other +words, remote logs for running tasks are unavailable. + +Scaling Out on Mesos (community contributed) +'''''''''''''''''''''''''''''''''''''''''''' +``MesosExecutor`` allows you to schedule airflow tasks on a Mesos cluster. +For this to work, you need a running mesos cluster and you must perform the following +steps - + +1. Install airflow on a machine where web server and scheduler will run, + let's refer to this as the "Airflow server". +2. On the Airflow server, install mesos python eggs from `mesos downloads <http://open.mesosphere.com/downloads/mesos/>`_. +3. On the Airflow server, use a database (such as mysql) which can be accessed from mesos + slave machines and add configuration in ``airflow.cfg``. +4. Change your ``airflow.cfg`` to point executor parameter to + `MesosExecutor` and provide related Mesos settings. +5. On all mesos slaves, install airflow. Copy the ``airflow.cfg`` from + Airflow server (so that it uses same sql alchemy connection). +6. On all mesos slaves, run the following for serving logs: + +.. code-block:: bash + + airflow serve_logs + +7. On Airflow server, to start processing/scheduling DAGs on mesos, run: + +.. code-block:: bash + + airflow scheduler -p + +Note: We need -p parameter to pickle the DAGs. + +You can now see the airflow framework and corresponding tasks in mesos UI. +The logs for airflow tasks can be seen in airflow UI as usual. + +For more information about mesos, refer to `mesos documentation <http://mesos.apache.org/documentation/latest/>`_. +For any queries/bugs on `MesosExecutor`, please contact `@kapil-malik <https://github.com/kapil-malik>`_. + +Integration with systemd +'''''''''''''''''''''''' +Airflow can integrate with systemd based systems. This makes watching your +daemons easy as systemd can take care of restarting a daemon on failure. +In the ``scripts/systemd`` directory you can find unit files that +have been tested on Redhat based systems. You can copy those to +``/usr/lib/systemd/system``. It is assumed that Airflow will run under +``airflow:airflow``. If not (or if you are running on a non Redhat +based system) you probably need to adjust the unit files. + +Environment configuration is picked up from ``/etc/sysconfig/airflow``. +An example file is supplied. Make sure to specify the ``SCHEDULER_RUNS`` +variable in this file when you run the scheduler. You +can also define here, for example, ``AIRFLOW_HOME`` or ``AIRFLOW_CONFIG``. + +Integration with upstart +'''''''''''''''''''''''' +Airflow can integrate with upstart based systems. Upstart automatically starts all airflow services for which you +have a corresponding ``*.conf`` file in ``/etc/init`` upon system boot. On failure, upstart automatically restarts +the process (until it reaches re-spawn limit set in a ``*.conf`` file). + +You can find sample upstart job files in the ``scripts/upstart`` directory. These files have been tested on +Ubuntu 14.04 LTS. You may have to adjust ``start on`` and ``stop on`` stanzas to make it work on other upstart +systems. Some of the possible options are listed in ``scripts/upstart/README``. + +Modify ``*.conf`` files as needed and copy to ``/etc/init`` directory. It is assumed that airflow will run +under ``airflow:airflow``. Change ``setuid`` and ``setgid`` in ``*.conf`` files if you use other user/group + +You can use ``initctl`` to manually start, stop, view status of the airflow process that has been +integrated with upstart + +.. code-block:: bash + + initctl airflow-webserver status + +Test Mode +''''''''' +Airflow has a fixed set of "test mode" configuration options. You can load these +at any time by calling ``airflow.configuration.load_test_config()`` (note this +operation is not reversible!). However, some options (like the DAG_FOLDER) are +loaded before you have a chance to call load_test_config(). In order to eagerly load +the test configuration, set test_mode in airflow.cfg: + +.. code-block:: bash + + [tests] + unit_test_mode = True + +Due to Airflow's automatic environment variable expansion (see :ref:`setting-options`), +you can also set the env var ``AIRFLOW__CORE__UNIT_TEST_MODE`` to temporarily overwrite +airflow.cfg. http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/blob/5e574012/_sources/faq.rst.txt ---------------------------------------------------------------------- diff --git a/_sources/faq.rst.txt b/_sources/faq.rst.txt new file mode 100644 index 0000000..1e4c038 --- /dev/null +++ b/_sources/faq.rst.txt @@ -0,0 +1,147 @@ +FAQ +======== + +Why isn't my task getting scheduled? +------------------------------------ + +There are very many reasons why your task might not be getting scheduled. +Here are some of the common causes: + +- Does your script "compile", can the Airflow engine parse it and find your + DAG object. To test this, you can run ``airflow list_dags`` and + confirm that your DAG shows up in the list. You can also run + ``airflow list_tasks foo_dag_id --tree`` and confirm that your task + shows up in the list as expected. If you use the CeleryExecutor, you + may way to confirm that this works both where the scheduler runs as well + as where the worker runs. + +- Is your ``start_date`` set properly? The Airflow scheduler triggers the + task soon after the ``start_date + scheduler_interval`` is passed. + +- Is your ``schedule_interval`` set properly? The default ``schedule_interval`` + is one day (``datetime.timedelta(1)``). You must specify a different ``schedule_interval`` + directly to the DAG object you instantiate, not as a ``default_param``, as task instances + do not override their parent DAG's ``schedule_interval``. + +- Is your ``start_date`` beyond where you can see it in the UI? If you + set your it to some time say 3 months ago, you won't be able to see + it in the main view in the UI, but you should be able to see it in the + ``Menu -> Browse ->Task Instances``. + +- Are the dependencies for the task met. The task instances directly + upstream from the task need to be in a ``success`` state. Also, + if you have set ``depends_on_past=True``, the previous task instance + needs to have succeeded (except if it is the first run for that task). + Also, if ``wait_for_downstream=True``, make sure you understand + what it means. + You can view how these properties are set from the ``Task Instance Details`` + page for your task. + +- Are the DagRuns you need created and active? A DagRun represents a specific + execution of an entire DAG and has a state (running, success, failed, ...). + The scheduler creates new DagRun as it moves forward, but never goes back + in time to create new ones. The scheduler only evaluates ``running`` DagRuns + to see what task instances it can trigger. Note that clearing tasks + instances (from the UI or CLI) does set the state of a DagRun back to + running. You can bulk view the list of DagRuns and alter states by clicking + on the schedule tag for a DAG. + +- Is the ``concurrency`` parameter of your DAG reached? ``concurency`` defines + how many ``running`` task instances a DAG is allowed to have, beyond which + point things get queued. + +- Is the ``max_active_runs`` parameter of your DAG reached? ``max_active_runs`` defines + how many ``running`` concurrent instances of a DAG there are allowed to be. + +You may also want to read the Scheduler section of the docs and make +sure you fully understand how it proceeds. + + +How do I trigger tasks based on another task's failure? +------------------------------------------------------- + +Check out the ``Trigger Rule`` section in the Concepts section of the +documentation + +Why are connection passwords still not encrypted in the metadata db after I installed airflow[crypto]? +------------------------------------------------------------------------------------------------------ + +- Verify that the ``fernet_key`` defined in ``$AIRFLOW_HOME/airflow.cfg`` is a valid Fernet key. It must be a base64-encoded 32-byte key. You need to restart the webserver after you update the key +- For existing connections (the ones that you had defined before installing ``airflow[crypto]`` and creating a Fernet key), you need to open each connection in the connection admin UI, re-type the password, and save it + +What's the deal with ``start_date``? +------------------------------------ + +``start_date`` is partly legacy from the pre-DagRun era, but it is still +relevant in many ways. When creating a new DAG, you probably want to set +a global ``start_date`` for your tasks using ``default_args``. The first +DagRun to be created will be based on the ``min(start_date)`` for all your +task. From that point on, the scheduler creates new DagRuns based on +your ``schedule_interval`` and the corresponding task instances run as your +dependencies are met. When introducing new tasks to your DAG, you need to +pay special attention to ``start_date``, and may want to reactivate +inactive DagRuns to get the new task to get onboarded properly. + +We recommend against using dynamic values as ``start_date``, especially +``datetime.now()`` as it can be quite confusing. The task is triggered +once the period closes, and in theory an ``@hourly`` DAG would never get to +an hour after now as ``now()`` moves along. + + +Previously we also recommended using rounded ``start_date`` in relation to your +``schedule_interval``. This meant an ``@hourly`` would be at ``00:00`` +minutes:seconds, a ``@daily`` job at midnight, a ``@monthly`` job on the +first of the month. This is no longer required. Airflow will now auto align +the ``start_date`` and the ``schedule_interval``, by using the ``start_date`` +as the moment to start looking. + +You can use any sensor or a ``TimeDeltaSensor`` to delay +the execution of tasks within the schedule interval. +While ``schedule_interval`` does allow specifying a ``datetime.timedelta`` +object, we recommend using the macros or cron expressions instead, as +it enforces this idea of rounded schedules. + +When using ``depends_on_past=True`` it's important to pay special attention +to ``start_date`` as the past dependency is not enforced only on the specific +schedule of the ``start_date`` specified for the task. It' also +important to watch DagRun activity status in time when introducing +new ``depends_on_past=True``, unless you are planning on running a backfill +for the new task(s). + +Also important to note is that the tasks ``start_date``, in the context of a +backfill CLI command, get overridden by the backfill's command ``start_date``. +This allows for a backfill on tasks that have ``depends_on_past=True`` to +actually start, if it wasn't the case, the backfill just wouldn't start. + +How can I create DAGs dynamically? +---------------------------------- + +Airflow looks in you ``DAGS_FOLDER`` for modules that contain ``DAG`` objects +in their global namespace, and adds the objects it finds in the +``DagBag``. Knowing this all we need is a way to dynamically assign +variable in the global namespace, which is easily done in python using the +``globals()`` function for the standard library which behaves like a +simple dictionary. + +.. code:: python + + for i in range(10): + dag_id = 'foo_{}'.format(i) + globals()[dag_id] = DAG(dag_id) + # or better, call a function that returns a DAG object! + +What are all the ``airflow run`` commands in my process list? +--------------------------------------------------------------- + +There are many layers of ``airflow run`` commands, meaning it can call itself. + +- Basic ``airflow run``: fires up an executor, and tell it to run an + ``airflow run --local`` command. if using Celery, this means it puts a + command in the queue for it to run remote, on the worker. If using + LocalExecutor, that translates into running it in a subprocess pool. +- Local ``airflow run --local``: starts an ``airflow run --raw`` + command (described below) as a subprocess and is in charge of + emitting heartbeats, listening for external kill signals + and ensures some cleanup takes place if the subprocess fails +- Raw ``airflow run --raw`` runs the actual operator's execute method and + performs the actual work http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/blob/5e574012/_sources/index.rst.txt ---------------------------------------------------------------------- diff --git a/_sources/index.rst.txt b/_sources/index.rst.txt new file mode 100644 index 0000000..2a1f1c1 --- /dev/null +++ b/_sources/index.rst.txt @@ -0,0 +1,89 @@ + +.. image:: img/pin_large.png + :width: 100 +.. image:: img/incubator.jpg + :width: 150 + +Apache Airflow (incubating) Documentation +========================================= + +.. important:: + + **Disclaimer**: Apache Airflow is an effort undergoing incubation at The + Apache Software Foundation (ASF), sponsored by the Apache Incubator. + Incubation is required of all newly accepted projects until a further + review indicates that the infrastructure, communications, and + decision making process have stabilized in a manner consistent with + other successful ASF projects. While incubation status is not + necessarily a reflection of the completeness or stability of + the code, it does indicate that the project has yet to be fully + endorsed by the ASF. + + +Airflow is a platform to programmatically author, schedule and monitor +workflows. + +Use airflow to author workflows as directed acyclic graphs (DAGs) of tasks. +The airflow scheduler executes your tasks on an array of workers while +following the specified dependencies. Rich command line utilities make +performing complex surgeries on DAGs a snap. The rich user interface +makes it easy to visualize pipelines running in production, +monitor progress, and troubleshoot issues when needed. + +When workflows are defined as code, they become more maintainable, +versionable, testable, and collaborative. + + + +.. image:: img/airflow.gif + +------------ + +Principles +---------- + +- **Dynamic**: Airflow pipelines are configuration as code (Python), allowing for dynamic pipeline generation. This allows for writing code that instantiates pipelines dynamically. +- **Extensible**: Easily define your own operators, executors and extend the library so that it fits the level of abstraction that suits your environment. +- **Elegant**: Airflow pipelines are lean and explicit. Parameterizing your scripts is built into the core of Airflow using the powerful **Jinja** templating engine. +- **Scalable**: Airflow has a modular architecture and uses a message queue to orchestrate an arbitrary number of workers. Airflow is ready to scale to infinity. + + +Beyond the Horizon +------------------ + +Airflow **is not** a data streaming solution. Tasks do not move data from +one to the other (though tasks can exchange metadata!). Airflow is not +in the `Spark Streaming <http://spark.apache.org/streaming/>`_ +or `Storm <https://storm.apache.org/>`_ space, it is more comparable to +`Oozie <http://oozie.apache.org/>`_ or +`Azkaban <http://data.linkedin.com/opensource/azkaban>`_. + +Workflows are expected to be mostly static or slowly changing. You can think +of the structure of the tasks in your workflow as slightly more dynamic +than a database structure would be. Airflow workflows are expected to look +similar from a run to the next, this allows for clarity around +unit of work and continuity. + + +Content +------- +.. toctree:: + :maxdepth: 4 + + project + license + start + installation + tutorial + configuration + ui + concepts + profiling + cli + scheduler + plugins + security + api + integration + faq + code http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/blob/5e574012/_sources/installation.rst.txt ---------------------------------------------------------------------- diff --git a/_sources/installation.rst.txt b/_sources/installation.rst.txt new file mode 100644 index 0000000..289f64f --- /dev/null +++ b/_sources/installation.rst.txt @@ -0,0 +1,90 @@ +Installation +------------ + +Getting Airflow +''''''''''''''' + +The easiest way to install the latest stable version of Airflow is with ``pip``: + +.. code-block:: bash + + pip install airflow + +You can also install Airflow with support for extra features like ``s3`` or ``postgres``: + +.. code-block:: bash + + pip install "airflow[s3, postgres]" + +Extra Packages +'''''''''''''' + +The ``airflow`` PyPI basic package only installs what's needed to get started. +Subpackages can be installed depending on what will be useful in your +environment. For instance, if you don't need connectivity with Postgres, +you won't have to go through the trouble of installing the ``postgres-devel`` +yum package, or whatever equivalent applies on the distribution you are using. + +Behind the scenes, Airflow does conditional imports of operators that require +these extra dependencies. + +Here's the list of the subpackages and what they enable: + ++---------------+-------------------------------------+-------------------------------------------------+ +| subpackage | install command | enables | ++===============+=====================================+=================================================+ +| all | ``pip install airflow[all]`` | All Airflow features known to man | ++---------------+-------------------------------------+-------------------------------------------------+ +| all_dbs | ``pip install airflow[all_dbs]`` | All databases integrations | ++---------------+-------------------------------------+-------------------------------------------------+ +| async | ``pip install airflow[async]`` | Async worker classes for gunicorn | ++---------------+-------------------------------------+-------------------------------------------------+ +| devel | ``pip install airflow[devel]`` | Minimum dev tools requirements | ++---------------+-------------------------------------+-------------------------------------------------+ +| devel_hadoop |``pip install airflow[devel_hadoop]``| Airflow + dependencies on the Hadoop stack | ++---------------+-------------------------------------+-------------------------------------------------+ +| celery | ``pip install airflow[celery]`` | CeleryExecutor | ++---------------+-------------------------------------+-------------------------------------------------+ +| crypto | ``pip install airflow[crypto]`` | Encrypt connection passwords in metadata db | ++---------------+-------------------------------------+-------------------------------------------------+ +| druid | ``pip install airflow[druid]`` | Druid.io related operators & hooks | ++---------------+-------------------------------------+-------------------------------------------------+ +| gcp_api | ``pip install airflow[gcp_api]`` | Google Cloud Platform hooks and operators | +| | | (using ``google-api-python-client``) | ++---------------+-------------------------------------+-------------------------------------------------+ +| jdbc | ``pip install airflow[jdbc]`` | JDBC hooks and operators | ++---------------+-------------------------------------+-------------------------------------------------+ +| hdfs | ``pip install airflow[hdfs]`` | HDFS hooks and operators | ++---------------+-------------------------------------+-------------------------------------------------+ +| hive | ``pip install airflow[hive]`` | All Hive related operators | ++---------------+-------------------------------------+-------------------------------------------------+ +| kerberos | ``pip install airflow[kerberos]`` | kerberos integration for kerberized hadoop | ++---------------+-------------------------------------+-------------------------------------------------+ +| ldap | ``pip install airflow[ldap]`` | ldap authentication for users | ++---------------+-------------------------------------+-------------------------------------------------+ +| mssql | ``pip install airflow[mssql]`` | Microsoft SQL operators and hook, | +| | | support as an Airflow backend | ++---------------+-------------------------------------+-------------------------------------------------+ +| mysql | ``pip install airflow[mysql]`` | MySQL operators and hook, support as | +| | | an Airflow backend | ++---------------+-------------------------------------+-------------------------------------------------+ +| password | ``pip install airflow[password]`` | Password Authentication for users | ++---------------+-------------------------------------+-------------------------------------------------+ +| postgres | ``pip install airflow[postgres]`` | Postgres operators and hook, support | +| | | as an Airflow backend | ++---------------+-------------------------------------+-------------------------------------------------+ +| qds | ``pip install airflow[qds]`` | Enable QDS (qubole data services) support | ++---------------+-------------------------------------+-------------------------------------------------+ +| rabbitmq | ``pip install airflow[rabbitmq]`` | Rabbitmq support as a Celery backend | ++---------------+-------------------------------------+-------------------------------------------------+ +| s3 | ``pip install airflow[s3]`` | ``S3KeySensor``, ``S3PrefixSensor`` | ++---------------+-------------------------------------+-------------------------------------------------+ +| samba | ``pip install airflow[samba]`` | ``Hive2SambaOperator`` | ++---------------+-------------------------------------+-------------------------------------------------+ +| slack | ``pip install airflow[slack]`` | ``SlackAPIPostOperator`` | ++---------------+-------------------------------------+-------------------------------------------------+ +| vertica | ``pip install airflow[vertica]`` | Vertica hook | +| | | support as an Airflow backend | ++---------------+-------------------------------------+-------------------------------------------------+ +| cloudant | ``pip install airflow[cloudant]`` | Cloudant hook | ++---------------+-------------------------------------+-------------------------------------------------+ http://git-wip-us.apache.org/repos/asf/incubator-airflow-site/blob/5e574012/_sources/integration.rst.txt ---------------------------------------------------------------------- diff --git a/_sources/integration.rst.txt b/_sources/integration.rst.txt new file mode 100644 index 0000000..10bc038 --- /dev/null +++ b/_sources/integration.rst.txt @@ -0,0 +1,246 @@ +Integration +=========== + +- :ref:`AWS` +- :ref:`GCP` + +.. _AWS: + +AWS: Amazon Webservices +----------------------- + +--- + +.. _GCP: + +GCP: Google Cloud Platform +-------------------------- + +Airflow has extensive support for the Google Cloud Platform. But note that most Hooks and +Operators are in the contrib section. Meaning that they have a *beta* status, meaning that +they can have breaking changes between minor releases. + +BigQuery +'''''''' + +BigQuery Operators +^^^^^^^^^^^^^^^^^^ + +- :ref:`BigQueryCheckOperator` : Performs checks against a SQL query that will return a single row with different values. +- :ref:`BigQueryValueCheckOperator` : Performs a simple value check using SQL code. +- :ref:`BigQueryIntervalCheckOperator` : Checks that the values of metrics given as SQL expressions are within a certain tolerance of the ones from days_back before. +- :ref:`BigQueryOperator` : Executes BigQuery SQL queries in a specific BigQuery database. +- :ref:`BigQueryToBigQueryOperator` : Copy a BigQuery table to another BigQuery table. +- :ref:`BigQueryToCloudStorageOperator` : Transfers a BigQuery table to a Google Cloud Storage bucket + + +.. _BigQueryCheckOperator: + +BigQueryCheckOperator +""""""""""""""""""""" + +.. autoclass:: airflow.contrib.operators.bigquery_check_operator.BigQueryCheckOperator + +.. _BigQueryValueCheckOperator: + +BigQueryValueCheckOperator +"""""""""""""""""""""""""" + +.. autoclass:: airflow.contrib.operators.bigquery_check_operator.BigQueryValueCheckOperator + +.. _BigQueryIntervalCheckOperator: + +BigQueryIntervalCheckOperator +""""""""""""""""""""""""""""" + +.. autoclass:: airflow.contrib.operators.bigquery_check_operator.BigQueryIntervalCheckOperator + +.. _BigQueryOperator: + +BigQueryOperator +"""""""""""""""" + +.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryOperator + +.. _BigQueryToBigQueryOperator: + +BigQueryToBigQueryOperator +"""""""""""""""""""""""""" + +.. autoclass:: airflow.contrib.operators.bigquery_to_bigquery.BigQueryToBigQueryOperator + +.. _BigQueryToCloudStorageOperator: + +BigQueryToCloudStorageOperator +"""""""""""""""""""""""""""""" + +.. autoclass:: airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator + + +BigQueryHook +^^^^^^^^^^^^ + +.. autoclass:: airflow.contrib.hooks.bigquery_hook.BigQueryHook + :members: + + +Cloud DataFlow +'''''''''''''' + +DataFlow Operators +^^^^^^^^^^^^^^^^^^ + +- :ref:`DataFlowJavaOperator` : + +.. _DataFlowJavaOperator: + +DataFlowJavaOperator +"""""""""""""""""""" + +.. autoclass:: airflow.contrib.operators.dataflow_operator.DataFlowJavaOperator + +.. code:: python + + default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': + (2016, 8, 1), + 'email': ['a...@vanboxel.be'], + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=30), + 'dataflow_default_options': { + 'project': 'my-gcp-project', + 'zone': 'us-central1-f', + 'stagingLocation': 'gs://bucket/tmp/dataflow/staging/', + } + } + + dag = DAG('test-dag', default_args=default_args) + + task = DataFlowJavaOperator( + gcp_conn_id='gcp_default', + task_id='normalize-cal', + jar='{{var.value.gcp_dataflow_base}}pipeline-ingress-cal-normalize-1.0.jar', + options={ + 'autoscalingAlgorithm': 'BASIC', + 'maxNumWorkers': '50', + 'start': '{{ds}}', + 'partitionType': 'DAY' + + }, + dag=dag) + +DataFlowHook +^^^^^^^^^^^^ + +.. autoclass:: airflow.contrib.hooks.gcp_dataflow_hook.DataFlowHook + :members: + + + +Cloud DataProc +'''''''''''''' + +DataProc Operators +^^^^^^^^^^^^^^^^^^ + +- :ref:`DataProcPigOperator` : Start a Pig query Job on a Cloud DataProc cluster. +- :ref:`DataProcHiveOperator` : Start a Hive query Job on a Cloud DataProc cluster. +- :ref:`DataProcSparkSqlOperator` : Start a Spark SQL query Job on a Cloud DataProc cluster. +- :ref:`DataProcSparkOperator` : Start a Spark Job on a Cloud DataProc cluster. +- :ref:`DataProcHadoopOperator` : Start a Hadoop Job on a Cloud DataProc cluster. +- :ref:`DataProcPySparkOperator` : Start a PySpark Job on a Cloud DataProc cluster. + +.. _DataProcPigOperator: + +DataProcPigOperator +""""""""""""""""""" + +.. autoclass:: airflow.contrib.operators.dataproc_operator.DataProcPigOperator + +.. _DataProcHiveOperator: + +DataProcHiveOperator +"""""""""""""""""""" + +.. autoclass:: airflow.contrib.operators.dataproc_operator.DataProcHiveOperator + +.. _DataProcSparkSqlOperator: + +DataProcSparkSqlOperator +"""""""""""""""""""""""" + +.. autoclass:: airflow.contrib.operators.dataproc_operator.DataProcSparkSqlOperator + +.. _DataProcSparkOperator: + +DataProcSparkOperator +""""""""""""""""""""" + +.. autoclass:: airflow.contrib.operators.dataproc_operator.DataProcSparkOperator + +.. _DataProcHadoopOperator: + +DataProcHadoopOperator +"""""""""""""""""""""" + +.. autoclass:: airflow.contrib.operators.dataproc_operator.DataProcHadoopOperator + +.. _DataProcPySparkOperator: + +DataProcPySparkOperator +^^^^^^^^^^^^^^^^^^^^^^^ + +.. autoclass:: airflow.contrib.operators.dataproc_operator.DataProcPySparkOperator + :members: + + +Cloud Datastore +''''''''''''''' + +Datastore Operators +^^^^^^^^^^^^^^^^^^^ + +DatastoreHook +~~~~~~~~~~~~~ + +.. autoclass:: airflow.contrib.hooks.datastore_hook.DatastoreHook + :members: + + + +Cloud Storage +''''''''''''' + +Storage Operators +^^^^^^^^^^^^^^^^^ + +- :ref:`GoogleCloudStorageDownloadOperator` : Downloads a file from Google Cloud Storage. +- :ref:`GoogleCloudStorageToBigQueryOperator` : Loads files from Google cloud storage into BigQuery. + +.. _GoogleCloudStorageDownloadOperator: + +GoogleCloudStorageDownloadOperator +"""""""""""""""""""""""""""""""""" + +.. autoclass:: airflow.contrib.operators.gcs_download_operator.GoogleCloudStorageDownloadOperator + :members: + +.. _GoogleCloudStorageToBigQueryOperator: + +GoogleCloudStorageToBigQueryOperator +"""""""""""""""""""""""""""""""""""" + +.. autoclass:: airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator + :members: + + +GoogleCloudStorageHook +^^^^^^^^^^^^^^^^^^^^^^ + +.. autoclass:: airflow.contrib.hooks.gcs_hook.GoogleCloudStorageHook + :members: +