Right I see where you're at, I looked through some of our operators and I do see some logic that's repeated (converting to JSON and writing to temp files). We also do what looks like pipes.clear_keys with some of the files we get using the SSHHook. Now that I notice that, I'm tempted to pull it and put it with some of our other utils which is what it looks like pipes is doing. I think if some common things like that existed, we'd consider using them. I know we burned a lot of time getting tempfiles working right since we're not all experienced software engineers over here.
On Thu, Sep 27, 2018 at 1:48 PM Daniel Cohen <daniel.co...@cloudinary.com> wrote: > Hi Michael, thank you for your comment. > XCom is for sharing state between tasks , and you are right in stating that > it won't be wise to pass datasets via it. > I'm suggesting to refactor already exists code in operators (that each > operator implemented separately) . if we move some logic to hooks (or other > construct) we can build more robust operators faster. > > 10x > d > > On Thu, Sep 27, 2018 at 4:54 PM Michael Ghen <m...@mikeghen.com> wrote: > > > I see what your looking for and I think this is the purpose of XCom. > We've > > used xcom in some of our custom operators to get this type of > > functionality. > > > > Though, we tend to avoid putting a lot of data into xcom, I believe > > somewhere in the docs it talks about how that's an anti pattern. The > > pattern was to lean on external systems for exchanging data. > > > > On Wed, Sep 26, 2018 at 4:26 PM Jeff Payne <jpa...@bombora.com> wrote: > > > > > Ah, OK. Thanks for the clarification. > > > > > > Get Outlook for Android<https://aka.ms/ghei36> > > > > > > ________________________________ > > > From: Daniel Cohen <daniel.co...@cloudinary.com> > > > Sent: Wednesday, September 26, 2018 1:15:16 PM > > > To: dev@airflow.incubator.apache.org > > > Subject: Re: hooks & operators improvement proposal > > > > > > Hi Jeff, > > > seems that I was a bit unclear > > > The DAG ETL spans across multiple tasks. and usually looks like kickoff > > >> > > > source_to_staging >> staging_to_warehouse >> warehouse_post_process. > > > I'm not proposing changes to operators they are gr8 , what i am > proposing > > > is to borrow the same concept to the smaller building blocks. > > > > > > I argue that the task anatomy (in ETL flows) is usually comprised of > > > 'mini' flows that usually looks like read source > serialize > dump > > > (example > > > 1 > > > < > > > > > > https://github.com/apache/incubator-airflow/blob/7cd9a26418ce9cb120f1cacd9fdcfe43fe5c0254/airflow/operators/mysql_to_hive.py#L124 > > > > > > > , example 2 > > > < > > > > > > https://github.com/apache/incubator-airflow/blob/7cd9a26418ce9cb120f1cacd9fdcfe43fe5c0254/airflow/contrib/hooks/salesforce_hook.py#L201 > > > >) > > > . you can see that sometimes its written in the operator and > sometimes > > in > > > the hook , the code is not shared and handles same cases each time. > > > > > > thanks, > > > d > > > > > > > > > > > > On Wed, Sep 26, 2018 at 10:43 PM Jeff Payne <jpa...@bombora.com> > wrote: > > > > > > > So, in your scenario, the ETL pipeline happens inside the single > > > > operator/task? > > > > > > > > If so, would it not make sense for the pipeline to span multiple > tasks > > > and > > > > provide a standard set of functions/decorators/etc for defining the > > > > input/output to/from each task? That way you would leverage the > ability > > > to > > > > rerun the DAG from a particular step of the ETL pipeline in case of a > > > > recoverable failure. Or am I misunderstanding... > > > > > > > > Get Outlook for Android<https://aka.ms/ghei36> > > > > > > > > ________________________________ > > > > From: Daniel Cohen <daniel.co...@cloudinary.com> > > > > Sent: Wednesday, September 26, 2018 12:27:29 PM > > > > To: d...@airflow.apache.org > > > > Subject: hooks & operators improvement proposal > > > > > > > > Some thoughts about operators / hooks: > > > > Operators are composable, typical ETL flow looks like `kickoff >> > > > > source_to_staging >> staging_to_warehouse >> warehouse_post_process` > > > where > > > > tasks use shared state (like s3) or naming conventions to continue > work > > > > where upstream task left off. > > > > > > > > hooks on the other hand are not composable and a lot of ETL logic is > > > > written ad hoc in the operator each time. > > > > > > > > i propose a lightweight, in process, ETL framework that allows > > > > - hook composition > > > > - shared general utilities (compression / file management / > > > serialization) > > > > - simplifies operator building > > > > > > > > how it looks from the operator's side > > > > def execute(self, context): > > > > # initialize hooks > > > > self.s3 = S3Hook... > > > > self.mysql = MySqlHook... > > > > > > > > # setup operator state > > > > query = 'select * from somewhere' > > > > > > > > # declare your ETL process > > > > self.mysql.yield_query(query) >> \ > > > > pipes.clear_keys(keys=self.scrubbed_columns) >> \ > > > > pipes.ndjson_dumps() >> \ > > > > pipes.batch(size=1024) >> \ > > > > pipes.gzip() >> \ > > > > pipes.tempfile() >> \ > > > > self.s3.file_writer(s3_key=self.s3_key, > > > > bucket_name=self.s3_bucket, > > > > replace=True) > > > > > > > > > > > > how it looks from the hook's side > > > > > > > > @pipes.producer # decorate > > > > def yield_query(self, query): > > > > cursor.execute(query) > > > > for row in cursor: > > > > yield row > > > > > > > > > > > > *pipes is a module with a set of operations that are generic and > > > > potentially reused between hooks / operators > > > > > > > > the idea inspired by 'bonobo' and 'python-pipes' (lightwait etl > > > packsges) > > > > and implementation based on on generators and decorators. > > > > > > > > we (cloudinary.com) are planning to open source it , is it something > > > that > > > > would be interesting to integrate into airflow ,or as a 3rd party ? > or > > > not > > > > at all ? any thoughts suggestions ? > > > > > > > > thanks , > > > > d > > > > > > > > > > > > -- > > > > daniel cohen > > > > +972-(0)54-4799-147 > > > > > > > > > > > > > -- > > > daniel cohen > > > +972-(0)54-4799-147 > > > > > > > > -- > daniel cohen > +972-(0)54-4799-147 >