[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9556: API Endpoint - DagRuns Batch
ephraimbuddy commented on a change in pull request #9556: URL: https://github.com/apache/airflow/pull/9556#discussion_r452434706 ## File path: airflow/api_connexion/endpoints/dag_run_endpoint.py ## @@ -87,41 +87,68 @@ def get_dag_runs( if dag_id != "~": query = query.filter(DagRun.dag_id == dag_id) +dag_run, total_entries = _fetch_dag_runs(query, session, end_date_gte, end_date_lte, execution_date_gte, + execution_date_lte, start_date_gte, start_date_lte, + limit, offset) + +return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_run, + total_entries=total_entries)) + + +def _fetch_dag_runs(query, session, end_date_gte, end_date_lte, +execution_date_gte, execution_date_lte, +start_date_gte, start_date_lte, limit, offset): +query = _apply_date_filters_to_query(query, end_date_gte, end_date_lte, execution_date_gte, + execution_date_lte, start_date_gte, start_date_lte) +# apply offset and limit +dag_run = query.order_by(DagRun.id).offset(offset).limit(limit).all() +total_entries = session.query(func.count(DagRun.id)).scalar() +return dag_run, total_entries + + +def _apply_date_filters_to_query(query, end_date_gte, end_date_lte, execution_date_gte, + execution_date_lte, start_date_gte, start_date_lte): # filter start date if start_date_gte: query = query.filter(DagRun.start_date >= start_date_gte) - if start_date_lte: query = query.filter(DagRun.start_date <= start_date_lte) - # filter execution date if execution_date_gte: query = query.filter(DagRun.execution_date >= execution_date_gte) - if execution_date_lte: query = query.filter(DagRun.execution_date <= execution_date_lte) - # filter end date if end_date_gte: query = query.filter(DagRun.end_date >= end_date_gte) - if end_date_lte: query = query.filter(DagRun.end_date <= end_date_lte) +return query -# apply offset and limit -dag_run = query.order_by(DagRun.id).offset(offset).limit(limit).all() -total_entries = session.query(func.count(DagRun.id)).scalar() -return dagrun_collection_schema.dump( -DAGRunCollection(dag_runs=dag_run, total_entries=total_entries) -) - - -def get_dag_runs_batch(): +@provide_session +def get_dag_runs_batch(session): """ Get list of DAG Runs """ -raise NotImplementedError("Not implemented yet.") +body = request.get_json() +try: +data = dagruns_batch_form_schema.load(body) +except ValidationError as err: +raise BadRequest(detail=err.messages) Review comment: ```suggestion raise BadRequest(detail=str(err.messages)) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9556: API Endpoint - DagRuns Batch
ephraimbuddy commented on a change in pull request #9556: URL: https://github.com/apache/airflow/pull/9556#discussion_r449756755 ## File path: airflow/api_connexion/endpoints/dag_run_endpoint.py ## @@ -68,40 +70,69 @@ def get_dag_runs(session, dag_id, start_date_gte=None, start_date_lte=None, if dag_id != '~': query = query.filter(DagRun.dag_id == dag_id) +dag_run, total_entries = _fetch_dag_runs(query, session, end_date_gte, end_date_lte, execution_date_gte, + execution_date_lte, start_date_gte, start_date_lte, + limit, offset) + +return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_run, + total_entries=total_entries)) + + +def _fetch_dag_runs(query, session, end_date_gte, end_date_lte, +execution_date_gte, execution_date_lte, +start_date_gte, start_date_lte, limit, offset): + +query = _apply_date_filters_to_query(query, end_date_gte, end_date_lte, execution_date_gte, + execution_date_lte, start_date_gte, start_date_lte) +# apply offset and limit +dag_run = query.order_by(DagRun.id).offset(offset).limit(limit).all() +total_entries = session.query(func.count(DagRun.id)).scalar() +return dag_run, total_entries + + +def _apply_date_filters_to_query(query, end_date_gte, end_date_lte, execution_date_gte, + execution_date_lte, start_date_gte, start_date_lte): # filter start date if start_date_gte: query = query.filter(DagRun.start_date >= start_date_gte) - if start_date_lte: query = query.filter(DagRun.start_date <= start_date_lte) - # filter execution date if execution_date_gte: query = query.filter(DagRun.execution_date >= execution_date_gte) - if execution_date_lte: query = query.filter(DagRun.execution_date <= execution_date_lte) - # filter end date if end_date_gte: query = query.filter(DagRun.end_date >= end_date_gte) - if end_date_lte: query = query.filter(DagRun.end_date <= end_date_lte) +return query -# apply offset and limit -dag_run = query.order_by(DagRun.id).offset(offset).limit(limit).all() -total_entries = session.query(func.count(DagRun.id)).scalar() - -return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_run, - total_entries=total_entries)) - -def get_dag_runs_batch(): +@provide_session +def get_dag_runs_batch(session): """ Get list of DAG Runs """ -raise NotImplementedError("Not implemented yet.") +body = request.get_json() +try: +data = dagruns_batch_form_schema.load(body).data +except ValidationError as err: +raise BadRequest(detail=err.messages) + +query = session.query(DagRun) + +if data["dag_ids"]: +query = query.filter(DagRun.dag_id.in_(data["dag_ids"])) + +dag_runs, total_entries = _fetch_dag_runs(query, session, data["end_date_gte"], data["end_date_lte"], + data["execution_date_gte"], data["execution_date_lte"], + data["start_date_gte"], data["start_date_lte"], + data["page_limit"], data["page_offset"]) Review comment: It no longer exists but can be added back. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9556: API Endpoint - DagRuns Batch
ephraimbuddy commented on a change in pull request #9556: URL: https://github.com/apache/airflow/pull/9556#discussion_r449699768 ## File path: airflow/api_connexion/schemas/dag_run_schema.py ## @@ -80,15 +80,15 @@ class Meta: datetimeformat = 'iso' strict = True -page_offset = fields.Int(required=False, missing=0, min=0) -page_limit = fields.Int(required=False, missing=100, min=1) -dag_ids = fields.List(fields.Str(), required=False, missing=None) -execution_date_gte = fields.DateTime(required=False, missing=None) -execution_date_lte = fields.DateTime(required=False, missing=None) -start_date_gte = fields.DateTime(required=False, missing=None) -start_date_lte = fields.DateTime(required=False, missing=None) -end_date_gte = fields.DateTime(required=False, missing=None) -end_date_lte = fields.DateTime(required=False, missing=None) +page_offset = fields.Int(missing=0, min=0) +page_limit = fields.Int(missing=100, min=1) +dag_ids = fields.List(fields.Str(), missing=None) +execution_date_gte = fields.DateTime(missing=None) +execution_date_lte = fields.DateTime(missing=None) +start_date_gte = fields.DateTime(missing=None) +start_date_lte = fields.DateTime(missing=None) +end_date_gte = fields.DateTime(missing=None) +end_date_lte = fields.DateTime(missing=None) Review comment: Do you have any problem when you remove `missing=None`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9556: API Endpoint - DagRuns Batch
ephraimbuddy commented on a change in pull request #9556: URL: https://github.com/apache/airflow/pull/9556#discussion_r448810142 ## File path: airflow/api_connexion/schemas/dag_run_schema.py ## @@ -72,5 +72,25 @@ class DAGRunCollectionSchema(Schema): total_entries = fields.Int() +class DagRunsBatchFormSchema(Schema): +""" Schema to validate and deserialize the Form(request payload) submitted to DagRun Batch endpoint""" + +class Meta: +""" Meta """ +datetimeformat = 'iso' +strict = True + +page_offset = fields.Int(required=False, missing=0, min=0) +page_limit = fields.Int(required=False, missing=100, min=1) Review comment: I think it's better left out. Some servers can handle more than 1000 page request limit. I'm working on configurable page request limit and we can apply it here. Then it'll not be a hard limit but a limit that the users can configure. You can see the commit here https://github.com/apache/airflow/commit/a37ac22c179e5dccc95b05c862bffc542dad125e This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9556: API Endpoint - DagRuns Batch
ephraimbuddy commented on a change in pull request #9556: URL: https://github.com/apache/airflow/pull/9556#discussion_r448256015 ## File path: airflow/api_connexion/schemas/dag_run_schema.py ## @@ -72,5 +72,25 @@ class DAGRunCollectionSchema(Schema): total_entries = fields.Int() +class DagRunsBatchFormSchema(Schema): +""" Schema to validate and deserialize the Form(request payload) submitted to DagRun Batch endpoint""" + +class Meta: +""" Meta """ +datetimeformat = 'iso' +strict = True + +page_offset = fields.Int(required=False, missing=0, min=0) +page_limit = fields.Int(required=False, missing=100, min=1) +dag_ids = fields.List(fields.Str(), required=False, missing=None) +execution_date_gte = fields.DateTime(required=False, missing=None) Review comment: ```suggestion execution_date_gte = fields.DateTime() ``` I suggest you remove the missing and required args. To only use them when the value should be something other than False and None respectively. This is already the default from marshmallow. https://marshmallow.readthedocs.io/en/2.x-line/api_reference.html#module-marshmallow.fields This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9556: API Endpoint - DagRuns Batch
ephraimbuddy commented on a change in pull request #9556: URL: https://github.com/apache/airflow/pull/9556#discussion_r448256015 ## File path: airflow/api_connexion/schemas/dag_run_schema.py ## @@ -72,5 +72,25 @@ class DAGRunCollectionSchema(Schema): total_entries = fields.Int() +class DagRunsBatchFormSchema(Schema): +""" Schema to validate and deserialize the Form(request payload) submitted to DagRun Batch endpoint""" + +class Meta: +""" Meta """ +datetimeformat = 'iso' +strict = True + +page_offset = fields.Int(required=False, missing=0, min=0) +page_limit = fields.Int(required=False, missing=100, min=1) +dag_ids = fields.List(fields.Str(), required=False, missing=None) +execution_date_gte = fields.DateTime(required=False, missing=None) Review comment: ```suggestion execution_date_gte = fields.DateTime() ``` I suggest you remove the missing and required field. To only use them when the value should be something other than False and None respectively. This is already the default from marshmallow. https://marshmallow.readthedocs.io/en/2.x-line/api_reference.html#module-marshmallow.fields This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org