So, in your scenario, the ETL pipeline happens inside the single operator/task?
If so, would it not make sense for the pipeline to span multiple tasks and provide a standard set of functions/decorators/etc for defining the input/output to/from each task? That way you would leverage the ability to rerun the DAG from a particular step of the ETL pipeline in case of a recoverable failure. Or am I misunderstanding... Get Outlook for Android<https://aka.ms/ghei36> ________________________________ From: Daniel Cohen <daniel.co...@cloudinary.com> Sent: Wednesday, September 26, 2018 12:27:29 PM To: d...@airflow.apache.org Subject: hooks & operators improvement proposal Some thoughts about operators / hooks: Operators are composable, typical ETL flow looks like `kickoff >> source_to_staging >> staging_to_warehouse >> warehouse_post_process` where tasks use shared state (like s3) or naming conventions to continue work where upstream task left off. hooks on the other hand are not composable and a lot of ETL logic is written ad hoc in the operator each time. i propose a lightweight, in process, ETL framework that allows - hook composition - shared general utilities (compression / file management / serialization) - simplifies operator building how it looks from the operator's side def execute(self, context): # initialize hooks self.s3 = S3Hook... self.mysql = MySqlHook... # setup operator state query = 'select * from somewhere' # declare your ETL process self.mysql.yield_query(query) >> \ pipes.clear_keys(keys=self.scrubbed_columns) >> \ pipes.ndjson_dumps() >> \ pipes.batch(size=1024) >> \ pipes.gzip() >> \ pipes.tempfile() >> \ self.s3.file_writer(s3_key=self.s3_key, bucket_name=self.s3_bucket, replace=True) how it looks from the hook's side @pipes.producer # decorate def yield_query(self, query): cursor.execute(query) for row in cursor: yield row *pipes is a module with a set of operations that are generic and potentially reused between hooks / operators the idea inspired by 'bonobo' and 'python-pipes' (lightwait etl packsges) and implementation based on on generators and decorators. we (cloudinary.com) are planning to open source it , is it something that would be interesting to integrate into airflow ,or as a 3rd party ? or not at all ? any thoughts suggestions ? thanks , d -- daniel cohen +972-(0)54-4799-147