ephraimbuddy commented on a change in pull request #9556: URL: https://github.com/apache/airflow/pull/9556#discussion_r447160605
########## File path: airflow/api_connexion/endpoints/dag_run_endpoint.py ########## @@ -67,40 +67,72 @@ def get_dag_runs(session, dag_id, start_date_gte=None, start_date_lte=None, if dag_id != '~': query = query.filter(DagRun.dag_id == dag_id) + dag_run, total_entries = _fetch_dag_runs(query, session, end_date_gte, end_date_lte, execution_date_gte, + execution_date_lte, start_date_gte, start_date_lte, + limit, offset) + + return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_run, + total_entries=total_entries)) + + +def _fetch_dag_runs(query, session, end_date_gte, end_date_lte, + execution_date_gte, execution_date_lte, + start_date_gte, start_date_lte, limit, offset): + + query = _apply_date_filters_to_query(query, end_date_gte, end_date_lte, execution_date_gte, + execution_date_lte, start_date_gte, start_date_lte) + # apply offset and limit + dag_run = query.order_by(DagRun.id).offset(offset).limit(limit).all() + total_entries = session.query(func.count(DagRun.id)).scalar() + return dag_run, total_entries + + +def _apply_date_filters_to_query(query, end_date_gte, end_date_lte, execution_date_gte, + execution_date_lte, start_date_gte, start_date_lte): # filter start date if start_date_gte: query = query.filter(DagRun.start_date >= start_date_gte) - if start_date_lte: query = query.filter(DagRun.start_date <= start_date_lte) - # filter execution date if execution_date_gte: query = query.filter(DagRun.execution_date >= execution_date_gte) - if execution_date_lte: query = query.filter(DagRun.execution_date <= execution_date_lte) - # filter end date if end_date_gte: query = query.filter(DagRun.end_date >= end_date_gte) - if end_date_lte: query = query.filter(DagRun.end_date <= end_date_lte) + return query - # apply offset and limit - dag_run = query.order_by(DagRun.id).offset(offset).limit(limit).all() - total_entries = session.query(func.count(DagRun.id)).scalar() - - return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_run, - total_entries=total_entries)) - -def get_dag_runs_batch(): +@format_parameters({ + 'start_date_gte': format_datetime, + 'start_date_lte': format_datetime, + 'execution_date_gte': format_datetime, + 'execution_date_lte': format_datetime, + 'end_date_gte': format_datetime, + 'end_date_lte': format_datetime, +}) +@provide_session +def get_dag_runs_batch(session, dag_ids, start_date_gte=None, start_date_lte=None, + execution_date_gte=None, execution_date_lte=None, + end_date_gte=None, end_date_lte=None, offset=None, limit=None): """ Get list of DAG Runs """ Review comment: ``` body = request.get_json() try: data = list_dag_runs_form_schema.load(body) except ValidationError as err: raise BadRequest(detail=err.messages) ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org