It would be nice to include DAG serialization in the smart sensor however we may want to put it in a follow up PR. Cong Zhu has explained why it would need more investigation to put the DAG serialization in a separate PR here https://github.com/apache/airflow/pull/5499#issuecomment-657828649.
In my opinion, the current PR is already too big and it is becoming harder and harder for us to make the branch tracking with the master branch. At the moment, I think it makes more sense to merge this change with the master before we make additional incremental improvement. Yingbo On Sun, Jun 21, 2020 at 1:57 PM Jarek Potiuk <jarek.pot...@polidea.com> wrote: > I also llke the idea - it is really nice optimisation. As Kaxil mentioned - > it would be great to include Dag Serialisation. > > I think for some of the other operators that just send query and poll > execution - I think once this one is implemented for sensors, we can think > about splitting some of the "operators" to support it. There was already a > PokeReschedule discussion > > https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a6555609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E > which > i think might benefit from this architecture - but I'd say - let's > implement it for sensors first and then we can think about implementing it > also for operators. > > One thing though - is it something that we could plan for 2.1 ? I think our > 2.0 release gets some important features that focus on delivering most > value for Airflow users. And it keeps on slipping. I think implementing > such a change will keep some of the committers distracted (even if it is > already at a PR stage). Maybe we can start prioritising new features to 2.1 > by default unless we have very good reason to implement it in 2.0? > > J. > > > > On Sat, Jun 20, 2020 at 5:37 PM Kaxil Naik <kaxiln...@gmail.com> wrote: > > > Hi Yingbo, > > > > I like the concept described in the AIP. I was wondering if we could > > leverage Dag Serialization ( > > https://airflow.readthedocs.io/en/latest/dag-serialization.html) to get > > the > > "task level fields" without re-parsing the DAGs or storing it in the new > > table. > > > > And can we use some of the operators like the BigQueryOperator, > > SparkOperator which just submits the SQL query and polls until > completion? > > > > Regards, > > Kaxil > > > > On Sat, Jun 20, 2020 at 8:26 AM Yingbo Wang <ybw...@gmail.com> wrote: > > > > > Thanks everyone for the feedback. I will also add the details mentioned > > in > > > this thread into the AIP > > > > > > > > > Q: From an implementation perspective, my one area of concern is the > > > > > > "sharding" concept and the configuration / management overhead > involved. > > I > > > > > > may have missed it in the AIP, but would it be possible to add > > auto-scaling > > > > > > to minimize this configuration? > > > > > > The “sharding” configuration is an integer which implies the number of > > > concurrently running smart sensor jobs for the whole airflow cluster. A > > > proper sharding setting mainly depends on the following issues: 1. > > Cluster > > > load -- how many sensor tasks need to be executed at the same time. 2. > > How > > > often should each sensor be poked at least once. 3. The response time > > for a > > > sensor task in the current system. As these answers may vary for > > different > > > systems we leave “sharding” as a configurable field for users to > satisfy > > > different use cases. > > > > > > Also, a couple of clarifying questions: > > > > > > 1. Do you know if this is more suitable to certain kinds of sensors vs. > > > > > > Others? > > > > > > Most sensors should be suitable for the smart sensor. Except if the > > > argument needed to initialize a sensor object is unserializable, e.g. a > > > function. Serialize more complex types other than builtin and datetime > is > > > not supported right now but we are planning to add them in the future. > > > > > > 2. What do you think about leveraging this to enable "async" operations > > > > > > using Airflow i.e. submit a task and then use a "smart sensor" to check > > for > > > > > > Completion? > > > > > > This is a very good point. We do notice the relationship between these > > two > > > ideas. Technically this logic should also work. The “task submission” > map > > > to the pre_execute() in a sensor task logic and “check for completion” > > map > > > to the sensor’s poke() function. The current implementation of > > > SubDagOperator > > > < > > > > > > https://github.com/apache/airflow/blob/master/airflow/operators/subdag_operator.py#L144-L177 > > > > > > > actually follows this pattern. If the operator requires no > unserializable > > > argument to be instantiated, we should already be able to leverage the > > > async operation in SmartSensor for it. > > > > > > > > > > > > > > > Q: How would a user enable their own smart sensors? I don’t see any > added > > > documentation for this. It looks like they need to manually add the > name > > of > > > the class to the airflow configuration and do *something* to their > sensor > > > class, including override the "is_smart_sensor" method (why a method > and > > > not an attribute?) > > > > > > Having to enable it in multiple places seems a little cumbersome, why > not > > > have a "BaseSmartSensor" that the user inherits from like most of the > > rest > > > of Airflow? Sensors inherited from BaseSmartSensor would be "Smart" > when > > > smart sensors are enabled in the configuration and not smart when smart > > > sensors are not enabled. > > > > > > Enabling/Disabling the smart sensor is a system level config which is > > > transparent to the individual users. An example of smart sensor enabled > > > cluster config is as follows: > > > > > > [smart_sensor] > > > > > > use_smart_sensor = true > > > > > > shard_code_upper_limit = 10000 > > > > > > shards = 5 > > > > > > sensor_enabled = NamedHivePartitionSensor, MetastorePartitionSensor > > > > > > > > > The "use_smart_sensor" config indicates if the smart sensor is enabled. > > The > > > "shards" config indicates the number of concurrently running smart > sensor > > > jobs for the airflow cluster. The "sensor_enabled" config is a list of > > > sensor class names that will use the smart sensor. The users use the > > same > > > class names (e.g. HivePartitionSensor) in their DAGs and they don’t > have > > > the control to use smart sensors or not, unless they exclude their > tasks > > > explicits. > > > > > > > > > Existing DAGs don't need to be changed for enabling/disabling the smart > > > sensor. > > > > > > > > > “Is_smart_sensor_compatible” is a class level configuration (instead of > > > instance-level) so that the system knows if a particular sensor > operator > > > can use the smart sensor. Currently only NamedHivePartitionSensor and > > > MetastorePartitionSensor > > > are enabled to use the smart sensor in the PR. > > > > > > To include other sensor operators for smart sensors that are not > included > > > in this PR: > > > > > > 1. > > > > > > Add a class attribute "poke_context_fields" to the operator. > > > "poke_context_fields" include all key names used for initializing a > > > sensor > > > object. > > > 2. > > > > > > In airflow.cfg, add the operator’s classname to the session of > > > “[smart_sensor]” with the field “sensors_enabled” as follows. > > > > > > > > > Yingbo > > > > > > On Fri, Jun 19, 2020 at 7:27 AM Shaw, Damian P. < > > > damian.sha...@credit-suisse.com> wrote: > > > > > > > Also +1 (non-binding) on the AIP but questions on the implementation. > > > > > > > > How would a user enable their own smart sensors? I don’t see any > added > > > > documentation for this. It looks like they need to manually add the > > name > > > of > > > > the class to the airflow configuration and do *something* to their > > sensor > > > > class, including override the "is_smart_sensor" method (why a method > > and > > > > not an attribute?) > > > > > > > > Having to enable it in multiple places seems a little cumbersome, why > > not > > > > have a "BaseSmartSensor" that the user inherits from like most of the > > > rest > > > > of Airflow? Sensors inherited from BaseSmartSensor would be "Smart" > > when > > > > smart sensors are enabled in the configuration and not smart when > smart > > > > sensors are not enaled. > > > > > > > > Damian > > > > > > > > -----Original Message----- > > > > From: Vikram Koka <vik...@astronomer.io> > > > > Sent: Friday, June 19, 2020 00:57 > > > > To: dev@airflow.apache.org > > > > Subject: Re: [VOTE] AIP-17: Consolidate and de-duplicate sensor tasks > > in > > > > airflow Smart Sensor > > > > > > > > +1 (non-binding) for this AIP. > > > > > > > > I really like the concept and the efficiency improvements. The > general > > > > SmartSensor concept and the ability to add additional sensor classes > is > > > > elegant. > > > > > > > > From an implementation perspective, my one area of concern is the > > > > "sharding" concept and the configuration / management overhead > > involved. > > > I > > > > may have missed it in the AIP, but would it be possible to add > > > auto-scaling > > > > to minimize this configuration? > > > > > > > > Also, a couple of clarifying questions: > > > > 1. Do you know if this is more suitable to certain kinds of sensors > vs. > > > > others? > > > > 2. What do you think about leveraging this to enable "async" > operations > > > > using Airflow i.e. submit a task and then use a "smart sensor" to > check > > > for > > > > completion? > > > > > > > > Best regards, > > > > > > > > Vikram > > > > > > > > > > > > > > > > > > > > On Thu, Jun 18, 2020 at 3:38 PM Yingbo Wang <ybw...@gmail.com> > wrote: > > > > > > > > > Hello everyone! > > > > > > > > > > This email calls for a vote to add the airflow smart sensor at > > > > > https://github.com/apache/airflow/pull/5499 > > > > > > > > > > AIP: > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-17%3A+Consolid > > > > > ate+and+de-duplicate+sensor+tasks+in+airflow+Smart+Sensor > > > > > > > > > > Change summary: > > > > > > > > > > - Add a new mode called “smart sensor mode”. In smart sensor > mode, > > > > > instead of holding a long running process for each sensor and > > poking > > > > > periodically, a sensor will only store poke context at > > > sensor_instance > > > > > table and then exits with a ‘sensing’ state. > > > > > - When the smart sensor mode is enabled, a special set of > builtin > > > > smart > > > > > sensor DAGs (named smart_sensor_group_shard_xxx) is created by > the > > > > > system; > > > > > These DAGs contain SmartSensorOperator task and manage the smart > > > > sensor > > > > > jobs for the airflow cluster. The SmartSensorOperator task can > > fetch > > > > > hundreds of ‘sensing’ instances from sensor_instance table and > > poke > > > on > > > > > behalf of them in batches. Users don’t need to change their > > > > > existing DAGs. > > > > > - The smart sensor mode currently supports > > NamedHivePartitionSensor > > > > and > > > > > MetastorePartitionSensor however it can easily be extended to > > > > > support more > > > > > sensor classes. > > > > > - Smart sensor mode on/off, the list of smart sensor enabled > > > classes, > > > > > and the number of SmartSensorOperator tasks can be configured in > > > > airflow > > > > > config. > > > > > - Sensor logs in smart sensors are populated to each task > instance > > > log > > > > > UI. > > > > > > > > > > > > > > > A PR https://github.com/apache/airflow/pull/5499 is ready for > review > > > > > from the committers and community. > > > > > > > > > > > > > > > This email is formally calling for a vote to accept the AIP and PR. > > > > > Please note that we will update the PR / feature to fix bugs if we > > find > > > > any. > > > > > > > > > > > > > > > Best > > > > > > > > > > Yingbo > > > > > > > > > > > > > > > > > > > > > > > > > > > =============================================================================== > > > > > > > > Please access the attached hyperlink for an important electronic > > > > communications disclaimer: > > > > http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html > > > > > > > > > > =============================================================================== > > > > > > > > > > > > > > > > -- > > Jarek Potiuk > Polidea <https://www.polidea.com/> | Principal Software Engineer > > M: +48 660 796 129 <+48660796129> > [image: Polidea] <https://www.polidea.com/> >