I also llke the idea - it is really nice optimisation. As Kaxil mentioned - it would be great to include Dag Serialisation.
I think for some of the other operators that just send query and poll execution - I think once this one is implemented for sensors, we can think about splitting some of the "operators" to support it. There was already a PokeReschedule discussion https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a6555609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E which i think might benefit from this architecture - but I'd say - let's implement it for sensors first and then we can think about implementing it also for operators. One thing though - is it something that we could plan for 2.1 ? I think our 2.0 release gets some important features that focus on delivering most value for Airflow users. And it keeps on slipping. I think implementing such a change will keep some of the committers distracted (even if it is already at a PR stage). Maybe we can start prioritising new features to 2.1 by default unless we have very good reason to implement it in 2.0? J. On Sat, Jun 20, 2020 at 5:37 PM Kaxil Naik <kaxiln...@gmail.com> wrote: > Hi Yingbo, > > I like the concept described in the AIP. I was wondering if we could > leverage Dag Serialization ( > https://airflow.readthedocs.io/en/latest/dag-serialization.html) to get > the > "task level fields" without re-parsing the DAGs or storing it in the new > table. > > And can we use some of the operators like the BigQueryOperator, > SparkOperator which just submits the SQL query and polls until completion? > > Regards, > Kaxil > > On Sat, Jun 20, 2020 at 8:26 AM Yingbo Wang <ybw...@gmail.com> wrote: > > > Thanks everyone for the feedback. I will also add the details mentioned > in > > this thread into the AIP > > > > > > Q: From an implementation perspective, my one area of concern is the > > > > "sharding" concept and the configuration / management overhead involved. > I > > > > may have missed it in the AIP, but would it be possible to add > auto-scaling > > > > to minimize this configuration? > > > > The “sharding” configuration is an integer which implies the number of > > concurrently running smart sensor jobs for the whole airflow cluster. A > > proper sharding setting mainly depends on the following issues: 1. > Cluster > > load -- how many sensor tasks need to be executed at the same time. 2. > How > > often should each sensor be poked at least once. 3. The response time > for a > > sensor task in the current system. As these answers may vary for > different > > systems we leave “sharding” as a configurable field for users to satisfy > > different use cases. > > > > Also, a couple of clarifying questions: > > > > 1. Do you know if this is more suitable to certain kinds of sensors vs. > > > > Others? > > > > Most sensors should be suitable for the smart sensor. Except if the > > argument needed to initialize a sensor object is unserializable, e.g. a > > function. Serialize more complex types other than builtin and datetime is > > not supported right now but we are planning to add them in the future. > > > > 2. What do you think about leveraging this to enable "async" operations > > > > using Airflow i.e. submit a task and then use a "smart sensor" to check > for > > > > Completion? > > > > This is a very good point. We do notice the relationship between these > two > > ideas. Technically this logic should also work. The “task submission” map > > to the pre_execute() in a sensor task logic and “check for completion” > map > > to the sensor’s poke() function. The current implementation of > > SubDagOperator > > < > > > https://github.com/apache/airflow/blob/master/airflow/operators/subdag_operator.py#L144-L177 > > > > > actually follows this pattern. If the operator requires no unserializable > > argument to be instantiated, we should already be able to leverage the > > async operation in SmartSensor for it. > > > > > > > > > > Q: How would a user enable their own smart sensors? I don’t see any added > > documentation for this. It looks like they need to manually add the name > of > > the class to the airflow configuration and do *something* to their sensor > > class, including override the "is_smart_sensor" method (why a method and > > not an attribute?) > > > > Having to enable it in multiple places seems a little cumbersome, why not > > have a "BaseSmartSensor" that the user inherits from like most of the > rest > > of Airflow? Sensors inherited from BaseSmartSensor would be "Smart" when > > smart sensors are enabled in the configuration and not smart when smart > > sensors are not enabled. > > > > Enabling/Disabling the smart sensor is a system level config which is > > transparent to the individual users. An example of smart sensor enabled > > cluster config is as follows: > > > > [smart_sensor] > > > > use_smart_sensor = true > > > > shard_code_upper_limit = 10000 > > > > shards = 5 > > > > sensor_enabled = NamedHivePartitionSensor, MetastorePartitionSensor > > > > > > The "use_smart_sensor" config indicates if the smart sensor is enabled. > The > > "shards" config indicates the number of concurrently running smart sensor > > jobs for the airflow cluster. The "sensor_enabled" config is a list of > > sensor class names that will use the smart sensor. The users use the > same > > class names (e.g. HivePartitionSensor) in their DAGs and they don’t have > > the control to use smart sensors or not, unless they exclude their tasks > > explicits. > > > > > > Existing DAGs don't need to be changed for enabling/disabling the smart > > sensor. > > > > > > “Is_smart_sensor_compatible” is a class level configuration (instead of > > instance-level) so that the system knows if a particular sensor operator > > can use the smart sensor. Currently only NamedHivePartitionSensor and > > MetastorePartitionSensor > > are enabled to use the smart sensor in the PR. > > > > To include other sensor operators for smart sensors that are not included > > in this PR: > > > > 1. > > > > Add a class attribute "poke_context_fields" to the operator. > > "poke_context_fields" include all key names used for initializing a > > sensor > > object. > > 2. > > > > In airflow.cfg, add the operator’s classname to the session of > > “[smart_sensor]” with the field “sensors_enabled” as follows. > > > > > > Yingbo > > > > On Fri, Jun 19, 2020 at 7:27 AM Shaw, Damian P. < > > damian.sha...@credit-suisse.com> wrote: > > > > > Also +1 (non-binding) on the AIP but questions on the implementation. > > > > > > How would a user enable their own smart sensors? I don’t see any added > > > documentation for this. It looks like they need to manually add the > name > > of > > > the class to the airflow configuration and do *something* to their > sensor > > > class, including override the "is_smart_sensor" method (why a method > and > > > not an attribute?) > > > > > > Having to enable it in multiple places seems a little cumbersome, why > not > > > have a "BaseSmartSensor" that the user inherits from like most of the > > rest > > > of Airflow? Sensors inherited from BaseSmartSensor would be "Smart" > when > > > smart sensors are enabled in the configuration and not smart when smart > > > sensors are not enaled. > > > > > > Damian > > > > > > -----Original Message----- > > > From: Vikram Koka <vik...@astronomer.io> > > > Sent: Friday, June 19, 2020 00:57 > > > To: dev@airflow.apache.org > > > Subject: Re: [VOTE] AIP-17: Consolidate and de-duplicate sensor tasks > in > > > airflow Smart Sensor > > > > > > +1 (non-binding) for this AIP. > > > > > > I really like the concept and the efficiency improvements. The general > > > SmartSensor concept and the ability to add additional sensor classes is > > > elegant. > > > > > > From an implementation perspective, my one area of concern is the > > > "sharding" concept and the configuration / management overhead > involved. > > I > > > may have missed it in the AIP, but would it be possible to add > > auto-scaling > > > to minimize this configuration? > > > > > > Also, a couple of clarifying questions: > > > 1. Do you know if this is more suitable to certain kinds of sensors vs. > > > others? > > > 2. What do you think about leveraging this to enable "async" operations > > > using Airflow i.e. submit a task and then use a "smart sensor" to check > > for > > > completion? > > > > > > Best regards, > > > > > > Vikram > > > > > > > > > > > > > > > On Thu, Jun 18, 2020 at 3:38 PM Yingbo Wang <ybw...@gmail.com> wrote: > > > > > > > Hello everyone! > > > > > > > > This email calls for a vote to add the airflow smart sensor at > > > > https://github.com/apache/airflow/pull/5499 > > > > > > > > AIP: > > > > > > > > > https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-17%3A+Consolid > > > > ate+and+de-duplicate+sensor+tasks+in+airflow+Smart+Sensor > > > > > > > > Change summary: > > > > > > > > - Add a new mode called “smart sensor mode”. In smart sensor mode, > > > > instead of holding a long running process for each sensor and > poking > > > > periodically, a sensor will only store poke context at > > sensor_instance > > > > table and then exits with a ‘sensing’ state. > > > > - When the smart sensor mode is enabled, a special set of builtin > > > smart > > > > sensor DAGs (named smart_sensor_group_shard_xxx) is created by the > > > > system; > > > > These DAGs contain SmartSensorOperator task and manage the smart > > > sensor > > > > jobs for the airflow cluster. The SmartSensorOperator task can > fetch > > > > hundreds of ‘sensing’ instances from sensor_instance table and > poke > > on > > > > behalf of them in batches. Users don’t need to change their > > > > existing DAGs. > > > > - The smart sensor mode currently supports > NamedHivePartitionSensor > > > and > > > > MetastorePartitionSensor however it can easily be extended to > > > > support more > > > > sensor classes. > > > > - Smart sensor mode on/off, the list of smart sensor enabled > > classes, > > > > and the number of SmartSensorOperator tasks can be configured in > > > airflow > > > > config. > > > > - Sensor logs in smart sensors are populated to each task instance > > log > > > > UI. > > > > > > > > > > > > A PR https://github.com/apache/airflow/pull/5499 is ready for review > > > > from the committers and community. > > > > > > > > > > > > This email is formally calling for a vote to accept the AIP and PR. > > > > Please note that we will update the PR / feature to fix bugs if we > find > > > any. > > > > > > > > > > > > Best > > > > > > > > Yingbo > > > > > > > > > > > > > > > > > > > =============================================================================== > > > > > > Please access the attached hyperlink for an important electronic > > > communications disclaimer: > > > http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html > > > > > > =============================================================================== > > > > > > > > > -- Jarek Potiuk Polidea <https://www.polidea.com/> | Principal Software Engineer M: +48 660 796 129 <+48660796129> [image: Polidea] <https://www.polidea.com/>