Ah, OK. Thanks for the clarification. Get Outlook for Android<https://aka.ms/ghei36>
________________________________ From: Daniel Cohen <daniel.co...@cloudinary.com> Sent: Wednesday, September 26, 2018 1:15:16 PM To: dev@airflow.incubator.apache.org Subject: Re: hooks & operators improvement proposal Hi Jeff, seems that I was a bit unclear The DAG ETL spans across multiple tasks. and usually looks like kickoff >> source_to_staging >> staging_to_warehouse >> warehouse_post_process. I'm not proposing changes to operators they are gr8 , what i am proposing is to borrow the same concept to the smaller building blocks. I argue that the task anatomy (in ETL flows) is usually comprised of 'mini' flows that usually looks like read source > serialize > dump (example 1 <https://github.com/apache/incubator-airflow/blob/7cd9a26418ce9cb120f1cacd9fdcfe43fe5c0254/airflow/operators/mysql_to_hive.py#L124> , example 2 <https://github.com/apache/incubator-airflow/blob/7cd9a26418ce9cb120f1cacd9fdcfe43fe5c0254/airflow/contrib/hooks/salesforce_hook.py#L201>) . you can see that sometimes its written in the operator and sometimes in the hook , the code is not shared and handles same cases each time. thanks, d On Wed, Sep 26, 2018 at 10:43 PM Jeff Payne <jpa...@bombora.com> wrote: > So, in your scenario, the ETL pipeline happens inside the single > operator/task? > > If so, would it not make sense for the pipeline to span multiple tasks and > provide a standard set of functions/decorators/etc for defining the > input/output to/from each task? That way you would leverage the ability to > rerun the DAG from a particular step of the ETL pipeline in case of a > recoverable failure. Or am I misunderstanding... > > Get Outlook for Android<https://aka.ms/ghei36> > > ________________________________ > From: Daniel Cohen <daniel.co...@cloudinary.com> > Sent: Wednesday, September 26, 2018 12:27:29 PM > To: d...@airflow.apache.org > Subject: hooks & operators improvement proposal > > Some thoughts about operators / hooks: > Operators are composable, typical ETL flow looks like `kickoff >> > source_to_staging >> staging_to_warehouse >> warehouse_post_process` where > tasks use shared state (like s3) or naming conventions to continue work > where upstream task left off. > > hooks on the other hand are not composable and a lot of ETL logic is > written ad hoc in the operator each time. > > i propose a lightweight, in process, ETL framework that allows > - hook composition > - shared general utilities (compression / file management / serialization) > - simplifies operator building > > how it looks from the operator's side > def execute(self, context): > # initialize hooks > self.s3 = S3Hook... > self.mysql = MySqlHook... > > # setup operator state > query = 'select * from somewhere' > > # declare your ETL process > self.mysql.yield_query(query) >> \ > pipes.clear_keys(keys=self.scrubbed_columns) >> \ > pipes.ndjson_dumps() >> \ > pipes.batch(size=1024) >> \ > pipes.gzip() >> \ > pipes.tempfile() >> \ > self.s3.file_writer(s3_key=self.s3_key, > bucket_name=self.s3_bucket, > replace=True) > > > how it looks from the hook's side > > @pipes.producer # decorate > def yield_query(self, query): > cursor.execute(query) > for row in cursor: > yield row > > > *pipes is a module with a set of operations that are generic and > potentially reused between hooks / operators > > the idea inspired by 'bonobo' and 'python-pipes' (lightwait etl packsges) > and implementation based on on generators and decorators. > > we (cloudinary.com) are planning to open source it , is it something that > would be interesting to integrate into airflow ,or as a 3rd party ? or not > at all ? any thoughts suggestions ? > > thanks , > d > > > -- > daniel cohen > +972-(0)54-4799-147 > -- daniel cohen +972-(0)54-4799-147