[ https://issues.apache.org/jira/browse/AIRFLOW-3964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yingbo Wang updated AIRFLOW-3964: --------------------------------- Summary: Consolidate and de-duplicate sensor tasks in newly created Operator (was: Reduce duplicated tasks and optimize with scheduler embedded sensor ) > Consolidate and de-duplicate sensor tasks in newly created Operator > ------------------------------------------------------------------- > > Key: AIRFLOW-3964 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3964 > Project: Apache Airflow > Issue Type: Improvement > Components: dependencies, operators, scheduler > Affects Versions: 1.10.0 > Reporter: Yingbo Wang > Assignee: Yingbo Wang > Priority: Critical > > h2. Problem > h3. Airflow Sensor: > Sensors are a certain type of operator that will keep running until a certain > criterion is met. Examples include a specific file landing in HDFS or S3, a > partition appearing in Hive, or a specific time of the day. Sensors are > derived from BaseSensorOperator and run a poke method at a specified > poke_interval until it returns True. > Airflow Sensor duplication is a normal problem for large scale airflow > project. There are duplicated partitions needing to be detected from > same/different DAG. In Airbnb there are 88 boxes running four different types > of sensors everyday. The number of running sensor tasks ranges from 8k to > 16k, which takes great amount of resources. Although Airflow team had > redirected all sensors to a specific queue to allocate relatively minor > resource, there is still large room to reduce the number of workers and > relief DB pressure by optimizing the sensor mechanism. > Existing sensor implementation creates an identical task for any sensor task > with specific dag_id, task_id and execution_date. This task is responsible of > keeping querying DB until the specified partitions exists. Even if two tasks > are waiting for same partition in DB, they are creating two connections with > the DB and checking the status in two separate processes. In one hand, DB > need to run duplicate jobs in multiple processes which will take both cpu and > memory resources. At the same time, Airflow need to maintain a process for > each sensor to query and wait for the partition/table to be created. > h1. ***Design* > There are several issues need to be resolved for our smart sensor. > h2. Persist sensor infor in DB and avoid file parsing before running > Current Airflow implementation need to parse the DAG python file before > running a task. Parsing multiple python file in a smart sensor make the case > low efficiency and overload. Since sensor tasks need relatively more “light > weight” executing information -- less number of properties with simple > structure (most are built in type instead of function or object). We propose > to skip the parsing for smart sensor. The easiest way is to persist all task > instance information in airflow metaDB. > h3. Solution: > It will be hard to dump the whole task instance object dictionary. And we do > not really need that much information. > We add two sets to the base sensor class as “persist_fields” and > “execute_fields”. > h4. “persist_fields” dump to airflow.task_instance column “attr_dict” > saves the attribute names that should be used to accomplish a sensor poking > job. For example: > # the “NamedHivePartitionSensor” define its persist_fields as > ('partition_names', 'metastore_conn_id', 'hook') since these properties are > enough for its poking function. > # While the HivePartitionSensor can be slightly different use persist_fields > as ('schema', 'table', 'partition', 'metastore_conn_id') > If we have two tasks that have same property value for all field in > persist_fields. That means these two tasks are poking the same item and they > are holding duplicate poking jobs in senser. > *The persist_fields can help us in deduplicate sensor tasks*. In a more > broader way. If we can list persist_fields for all operators, it can help to > dedup all airflow tasks. > h4. “Execute_fields” dump to airflow.task_instance column “exec_dict” > Saves the execution configuration such as “poke_interval”, “timeout”, > “execution_timeout” > Fields in this set do not contain information affecting the poking job > detail. They are related to how frequent should we poke, when should the task > timeout, how many times timeout should be a fail etc. We only put those logic > that we can easily handle in a smart sensor for now. This is a smart sensor > “doable whitelist” and can be extended with more logic being “unlocked” by > smart sensor implementation. > When we initialize a task instance object. We dump the attribute value of > these two sets and persist them in the Airflow metaDB. Smart sensor can visit > DB to get all required information of running sensor tasks and don’t need to > parse any DAG files. > h2. Airflow scheduler change > We do not want to break any existing logic in scheduler. The smart sensor is > a configurable mode and can be easily fallback to scheduler regular logic > when it detects the case is not good for smart sensor. > h3. Solution > h4. Scheduler process_file > Right before we set a task instance state to “scheduled”, add smart sensor > check to do: > # Check if Airflow is configured as use smart sensor > # Check if current task is good for smart sensor running > If both check got a “yes” that means the task instance is qualified for smart > sensor. Airflow scheduler set its state to “smart_pending” instead of > “scheduled” and this task instance will *NOT BE QUEUED* to the executor. It > is expected to be picked up by a smart sensor task from DB query. Smart > sensor will update the state to final state (“success” or “failed”) or > “up_for_retry” and it can come back to normal scheduler world. > If any of the above checks has a “no” answer, either current airflow cluster > is not configured to use smart sensor or the task itself is out of smart > sensor scope. the scheduler will schedule task instance just like no smart > sensor exist. > h4. Include smart sensor DAG by configuration > We are using a smart sensor DAG to kick off all smart sensor tasks. If > airflow is configured to use smart sensor. The DAG will be included in the > scheduler parsing paths. Implementation similar as example dags. > h2. Smart sensor operator > h3. Smart sensor logic > In each execute loop: > * refresh_all_dict(): Select all tasks from DB with state “smart_pending” or > “smart_running” and shardcode qualified. > * For all tasks in the task dictionary to poke: > * If task with same persist_field has been poked in this round > * If task poked has a final state, don’t need to do anything > * If task poked does not have a final state, need to handle timeout > * Else (not poked in this round) > * Execute the sensor job > * For success or failed state, mark states in airflow DB for all tasks that > * Have same persist_fields hashcode > * State in (“smart_pending”, “smart_running”) > * Check and handle timeout > Issue > Smart sensor need to handle the following issues: > # Get multiple tasks qualified for smart sensor. > # Do the work for all collected sensor tasks > # Sensor tasks duplication. > # Sharding when there are multiple smart sensor running. > Dedup and shard: > Attr_dict ⇒ hashcode ⇒ shardcode > Hashcode = hash(attr_dict) > Shardcode = Hashcode % (max_shard + 1) > The range of shardcode, number of smart sensor tasks can be configured in > airflow.cfg > Each smart sensor task has a _shardcode range_ and only query tasks whose > shardcode in this range. Duplicate sensor task will have the same hash code > and same shardcode so they are going to be handled by the same smart sensor > task. > h2. Schema change: > h3. Task_instance table: (add 4 columns and 2 indexes) > op.add_column('task_instance', sa.Column('attr_dict', sa.Text(), > nullable=True)) > op.add_column('task_instance', sa.Column('exec_dict', sa.Text(), > nullable=True)) > op.add_column('task_instance', sa.Column('hashcode', sa.BigInteger(), > nullable=True)) > op.add_column('task_instance', sa.Column('shardcode', sa.Integer(), > nullable=True)) > > op.create_index('ti_hashcode', 'task_instance', ['hashcode'], unique=False) > op.create_index('ti_shardcode', 'task_instance', ['shardcode'], unique=False) > h2. Remaining Issue > # Handle timeout: Save the timeout and execution_timeout in exec_dict column. > # When a timeout was detected, set the single sensor task to failed or > up_for_retry and expect scheduler set it to smart_pending as retry > # Calculate the total seconds of final failed duration and keep the task in > smart sensor state until it failed/success. (min(timeout, execution_timeout) > * (retries + 1)) > # Smart sensor DAG handle. Should it be manually or in source code. > # Special logic for smart sensor health check. -- This message was sent by Atlassian Jira (v8.3.4#803005)