I also llke the idea - it is really nice optimisation. As Kaxil mentioned -
it would be great to include Dag Serialisation.

I think for some of the other operators that just send query and poll
execution - I think once this one is implemented for sensors, we can think
about splitting some of the "operators" to support it. There was already a
PokeReschedule discussion
https://lists.apache.org/thread.html/rc6f56234342c87f154865489e3a6555609e4b98a8c62ca4997cb6a6c%40%3Cdev.airflow.apache.org%3E
which
i think might benefit from this architecture - but I'd say - let's
implement it for sensors first and then we can think about implementing it
also for operators.

One thing though - is it something that we could plan for 2.1 ? I think our
2.0 release gets some important features that focus on delivering most
value for Airflow users. And it keeps on slipping. I think implementing
such a change will keep some of the committers distracted (even if it is
already at a PR stage). Maybe we can start prioritising new features to 2.1
by default unless we have very good reason to implement it in 2.0?

J.



On Sat, Jun 20, 2020 at 5:37 PM Kaxil Naik <kaxiln...@gmail.com> wrote:

> Hi Yingbo,
>
> I like the concept described in the AIP. I was wondering if we could
> leverage Dag Serialization (
> https://airflow.readthedocs.io/en/latest/dag-serialization.html) to get
> the
> "task level fields" without re-parsing the DAGs or storing it in the new
> table.
>
> And can we use some of the operators like the BigQueryOperator,
> SparkOperator which just submits the SQL query and polls until completion?
>
> Regards,
> Kaxil
>
> On Sat, Jun 20, 2020 at 8:26 AM Yingbo Wang <ybw...@gmail.com> wrote:
>
> > Thanks everyone for the feedback. I will also add the details mentioned
> in
> > this thread into the AIP
> >
> >
> > Q: From an implementation perspective, my one area of concern is the
> >
> > "sharding" concept and the configuration / management overhead involved.
> I
> >
> > may have missed it in the AIP, but would it be possible to add
> auto-scaling
> >
> > to minimize this configuration?
> >
> > The “sharding” configuration is an integer which implies the number of
> > concurrently running smart sensor jobs for the whole airflow cluster. A
> > proper sharding setting mainly depends on the following issues: 1.
> Cluster
> > load -- how many sensor tasks need to be executed at the same time. 2.
> How
> > often should each sensor be poked at least once. 3. The response time
> for a
> > sensor task in the current system. As these answers may vary for
> different
> > systems we leave “sharding” as a configurable field for users to satisfy
> > different use cases.
> >
> > Also, a couple of clarifying questions:
> >
> > 1. Do you know if this is more suitable to certain kinds of sensors vs.
> >
> > Others?
> >
> > Most sensors should be suitable for the smart sensor. Except if the
> > argument needed to initialize a sensor object is unserializable, e.g. a
> > function. Serialize more complex types other than builtin and datetime is
> > not supported right now but we are planning to add them in the future.
> >
> > 2. What do you think about leveraging this to enable "async" operations
> >
> > using Airflow i.e. submit a task and then use a "smart sensor" to check
> for
> >
> > Completion?
> >
> > This is a very good point. We do notice the relationship between these
> two
> > ideas. Technically this logic should also work. The “task submission” map
> > to the pre_execute() in a sensor task logic and “check for completion”
> map
> > to the sensor’s poke() function. The current implementation of
> > SubDagOperator
> > <
> >
> https://github.com/apache/airflow/blob/master/airflow/operators/subdag_operator.py#L144-L177
> > >
> > actually follows this pattern. If the operator requires no unserializable
> > argument to be instantiated, we should already be able to leverage the
> > async operation in SmartSensor for it.
> >
> >
> >
> >
> > Q: How would a user enable their own smart sensors? I don’t see any added
> > documentation for this. It looks like they need to manually add the name
> of
> > the class to the airflow configuration and do *something* to their sensor
> > class, including override the "is_smart_sensor" method (why a method and
> > not an attribute?)
> >
> > Having to enable it in multiple places seems a little cumbersome, why not
> > have a "BaseSmartSensor" that the user inherits from like most of the
> rest
> > of Airflow? Sensors inherited from BaseSmartSensor would be "Smart" when
> > smart sensors are enabled in the configuration and not smart when smart
> > sensors are not enabled.
> >
> > Enabling/Disabling the smart sensor is a system level config which is
> > transparent to the individual users. An example of smart sensor enabled
> > cluster config is as follows:
> >
> > [smart_sensor]
> >
> > use_smart_sensor = true
> >
> > shard_code_upper_limit = 10000
> >
> > shards = 5
> >
> > sensor_enabled = NamedHivePartitionSensor, MetastorePartitionSensor
> >
> >
> > The "use_smart_sensor" config indicates if the smart sensor is enabled.
> The
> > "shards" config indicates the number of concurrently running smart sensor
> > jobs for the airflow cluster. The "sensor_enabled" config is a list of
> > sensor class names that will use the smart sensor.  The users use the
> same
> > class names (e.g. HivePartitionSensor) in their DAGs and they don’t have
> > the control to use smart sensors or not, unless they exclude their tasks
> > explicits.
> >
> >
> > Existing DAGs don't need to be changed for enabling/disabling the smart
> > sensor.
> >
> >
> > “Is_smart_sensor_compatible” is a class level configuration (instead of
> > instance-level) so that the system knows if a particular sensor operator
> > can use the smart sensor. Currently only NamedHivePartitionSensor and
> > MetastorePartitionSensor
> > are enabled to use the smart sensor in the PR.
> >
> > To include other sensor operators for smart sensors that are not included
> > in this PR:
> >
> >    1.
> >
> >    Add a class attribute "poke_context_fields" to the operator.
> >    "poke_context_fields" include all key names used for initializing a
> > sensor
> >    object.
> >    2.
> >
> >    In airflow.cfg, add the operator’s classname to the session of
> >    “[smart_sensor]” with the field “sensors_enabled” as follows.
> >
> >
> > Yingbo
> >
> > On Fri, Jun 19, 2020 at 7:27 AM Shaw, Damian P. <
> > damian.sha...@credit-suisse.com> wrote:
> >
> > > Also +1 (non-binding) on the AIP but questions on the implementation.
> > >
> > > How would a user enable their own smart sensors? I don’t see any added
> > > documentation for this. It looks like they need to manually add the
> name
> > of
> > > the class to the airflow configuration and do *something* to their
> sensor
> > > class, including override the "is_smart_sensor" method (why a method
> and
> > > not an attribute?)
> > >
> > > Having to enable it in multiple places seems a little cumbersome, why
> not
> > > have a "BaseSmartSensor" that the user inherits from like most of the
> > rest
> > > of Airflow? Sensors inherited from BaseSmartSensor would be "Smart"
> when
> > > smart sensors are enabled in the configuration and not smart when smart
> > > sensors are not enaled.
> > >
> > > Damian
> > >
> > > -----Original Message-----
> > > From: Vikram Koka <vik...@astronomer.io>
> > > Sent: Friday, June 19, 2020 00:57
> > > To: dev@airflow.apache.org
> > > Subject: Re: [VOTE] AIP-17: Consolidate and de-duplicate sensor tasks
> in
> > > airflow Smart Sensor
> > >
> > > +1 (non-binding) for this AIP.
> > >
> > > I really like the concept and the efficiency improvements. The general
> > > SmartSensor concept and the ability to add additional sensor classes is
> > > elegant.
> > >
> > > From an implementation perspective, my one area of concern is the
> > > "sharding" concept and the configuration / management overhead
> involved.
> > I
> > > may have missed it in the AIP, but would it be possible to add
> > auto-scaling
> > > to minimize this configuration?
> > >
> > > Also, a couple of clarifying questions:
> > > 1. Do you know if this is more suitable to certain kinds of sensors vs.
> > > others?
> > > 2. What do you think about leveraging this to enable "async" operations
> > > using Airflow i.e. submit a task and then use a "smart sensor" to check
> > for
> > > completion?
> > >
> > > Best regards,
> > >
> > > Vikram
> > >
> > >
> > >
> > >
> > > On Thu, Jun 18, 2020 at 3:38 PM Yingbo Wang <ybw...@gmail.com> wrote:
> > >
> > > > Hello everyone!
> > > >
> > > > This email calls for a vote to add the airflow smart sensor at
> > > > https://github.com/apache/airflow/pull/5499
> > > >
> > > > AIP:
> > > >
> > > >
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-17%3A+Consolid
> > > > ate+and+de-duplicate+sensor+tasks+in+airflow+Smart+Sensor
> > > >
> > > > Change summary:
> > > >
> > > >    - Add a new mode called “smart sensor mode”. In smart sensor mode,
> > > >    instead of holding a long running process for each sensor and
> poking
> > > >    periodically, a sensor will only store poke context at
> > sensor_instance
> > > >    table and then exits with a ‘sensing’ state.
> > > >    - When the smart sensor mode is enabled, a special set of builtin
> > > smart
> > > >    sensor DAGs (named smart_sensor_group_shard_xxx) is created by the
> > > > system;
> > > >    These DAGs contain SmartSensorOperator task and manage the smart
> > > sensor
> > > >    jobs for the airflow cluster. The SmartSensorOperator task can
> fetch
> > > >    hundreds of ‘sensing’ instances from sensor_instance table and
> poke
> > on
> > > >    behalf of them in batches. Users don’t need to change their
> > > > existing DAGs.
> > > >    - The smart sensor mode currently supports
> NamedHivePartitionSensor
> > > and
> > > >    MetastorePartitionSensor however it can easily be extended to
> > > > support more
> > > >    sensor classes.
> > > >    - Smart sensor mode on/off, the list of smart sensor enabled
> > classes,
> > > >    and the number of SmartSensorOperator tasks can be configured in
> > > airflow
> > > >    config.
> > > >    - Sensor logs in smart sensors are populated to each task instance
> > log
> > > >    UI.
> > > >
> > > >
> > > > A PR https://github.com/apache/airflow/pull/5499 is ready for review
> > > > from the committers and community.
> > > >
> > > >
> > > > This email is formally calling for a vote to accept the AIP and PR.
> > > > Please note that we will update the PR / feature to fix bugs if we
> find
> > > any.
> > > >
> > > >
> > > > Best
> > > >
> > > > Yingbo
> > > >
> > >
> > >
> > >
> > >
> >
> ===============================================================================
> > >
> > > Please access the attached hyperlink for an important electronic
> > > communications disclaimer:
> > > http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html
> > >
> >
> ===============================================================================
> > >
> > >
> >
>


-- 

Jarek Potiuk
Polidea <https://www.polidea.com/> | Principal Software Engineer

M: +48 660 796 129 <+48660796129>
[image: Polidea] <https://www.polidea.com/>

Reply via email to