[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9556: API Endpoint - DagRuns Batch

2020-07-09 Thread GitBox


ephraimbuddy commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r452434706



##
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##
@@ -87,41 +87,68 @@ def get_dag_runs(
 if dag_id != "~":
 query = query.filter(DagRun.dag_id == dag_id)
 
+dag_run, total_entries = _fetch_dag_runs(query, session, end_date_gte, 
end_date_lte, execution_date_gte,
+ execution_date_lte, 
start_date_gte, start_date_lte,
+ limit, offset)
+
+return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_run,
+  
total_entries=total_entries))
+
+
+def _fetch_dag_runs(query, session, end_date_gte, end_date_lte,
+execution_date_gte, execution_date_lte,
+start_date_gte, start_date_lte, limit, offset):
+query = _apply_date_filters_to_query(query, end_date_gte, end_date_lte, 
execution_date_gte,
+ execution_date_lte, start_date_gte, 
start_date_lte)
+# apply offset and limit
+dag_run = query.order_by(DagRun.id).offset(offset).limit(limit).all()
+total_entries = session.query(func.count(DagRun.id)).scalar()
+return dag_run, total_entries
+
+
+def _apply_date_filters_to_query(query, end_date_gte, end_date_lte, 
execution_date_gte,
+ execution_date_lte, start_date_gte, 
start_date_lte):
 # filter start date
 if start_date_gte:
 query = query.filter(DagRun.start_date >= start_date_gte)
-
 if start_date_lte:
 query = query.filter(DagRun.start_date <= start_date_lte)
-
 # filter execution date
 if execution_date_gte:
 query = query.filter(DagRun.execution_date >= execution_date_gte)
-
 if execution_date_lte:
 query = query.filter(DagRun.execution_date <= execution_date_lte)
-
 # filter end date
 if end_date_gte:
 query = query.filter(DagRun.end_date >= end_date_gte)
-
 if end_date_lte:
 query = query.filter(DagRun.end_date <= end_date_lte)
+return query
 
-# apply offset and limit
-dag_run = query.order_by(DagRun.id).offset(offset).limit(limit).all()
-total_entries = session.query(func.count(DagRun.id)).scalar()
 
-return dagrun_collection_schema.dump(
-DAGRunCollection(dag_runs=dag_run, total_entries=total_entries)
-)
-
-
-def get_dag_runs_batch():
+@provide_session
+def get_dag_runs_batch(session):
 """
 Get list of DAG Runs
 """
-raise NotImplementedError("Not implemented yet.")
+body = request.get_json()
+try:
+data = dagruns_batch_form_schema.load(body)
+except ValidationError as err:
+raise BadRequest(detail=err.messages)

Review comment:
   ```suggestion
   raise BadRequest(detail=str(err.messages))
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9556: API Endpoint - DagRuns Batch

2020-07-04 Thread GitBox


ephraimbuddy commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r449756755



##
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##
@@ -68,40 +70,69 @@ def get_dag_runs(session, dag_id, start_date_gte=None, 
start_date_lte=None,
 if dag_id != '~':
 query = query.filter(DagRun.dag_id == dag_id)
 
+dag_run, total_entries = _fetch_dag_runs(query, session, end_date_gte, 
end_date_lte, execution_date_gte,
+ execution_date_lte, 
start_date_gte, start_date_lte,
+ limit, offset)
+
+return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_run,
+  
total_entries=total_entries))
+
+
+def _fetch_dag_runs(query, session, end_date_gte, end_date_lte,
+execution_date_gte, execution_date_lte,
+start_date_gte, start_date_lte, limit, offset):
+
+query = _apply_date_filters_to_query(query, end_date_gte, end_date_lte, 
execution_date_gte,
+ execution_date_lte, start_date_gte, 
start_date_lte)
+# apply offset and limit
+dag_run = query.order_by(DagRun.id).offset(offset).limit(limit).all()
+total_entries = session.query(func.count(DagRun.id)).scalar()
+return dag_run, total_entries
+
+
+def _apply_date_filters_to_query(query, end_date_gte, end_date_lte, 
execution_date_gte,
+ execution_date_lte, start_date_gte, 
start_date_lte):
 # filter start date
 if start_date_gte:
 query = query.filter(DagRun.start_date >= start_date_gte)
-
 if start_date_lte:
 query = query.filter(DagRun.start_date <= start_date_lte)
-
 # filter execution date
 if execution_date_gte:
 query = query.filter(DagRun.execution_date >= execution_date_gte)
-
 if execution_date_lte:
 query = query.filter(DagRun.execution_date <= execution_date_lte)
-
 # filter end date
 if end_date_gte:
 query = query.filter(DagRun.end_date >= end_date_gte)
-
 if end_date_lte:
 query = query.filter(DagRun.end_date <= end_date_lte)
+return query
 
-# apply offset and limit
-dag_run = query.order_by(DagRun.id).offset(offset).limit(limit).all()
-total_entries = session.query(func.count(DagRun.id)).scalar()
-
-return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_run,
-  
total_entries=total_entries))
 
-
-def get_dag_runs_batch():
+@provide_session
+def get_dag_runs_batch(session):
 """
 Get list of DAG Runs
 """
-raise NotImplementedError("Not implemented yet.")
+body = request.get_json()
+try:
+data = dagruns_batch_form_schema.load(body).data
+except ValidationError as err:
+raise BadRequest(detail=err.messages)
+
+query = session.query(DagRun)
+
+if data["dag_ids"]:
+query = query.filter(DagRun.dag_id.in_(data["dag_ids"]))
+
+dag_runs, total_entries = _fetch_dag_runs(query, session, 
data["end_date_gte"], data["end_date_lte"],
+  data["execution_date_gte"], 
data["execution_date_lte"],
+  data["start_date_gte"], 
data["start_date_lte"],
+  data["page_limit"], 
data["page_offset"])

Review comment:
   It no longer exists but can be added back.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9556: API Endpoint - DagRuns Batch

2020-07-03 Thread GitBox


ephraimbuddy commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r449699768



##
File path: airflow/api_connexion/schemas/dag_run_schema.py
##
@@ -80,15 +80,15 @@ class Meta:
 datetimeformat = 'iso'
 strict = True
 
-page_offset = fields.Int(required=False, missing=0, min=0)
-page_limit = fields.Int(required=False, missing=100, min=1)
-dag_ids = fields.List(fields.Str(), required=False, missing=None)
-execution_date_gte = fields.DateTime(required=False, missing=None)
-execution_date_lte = fields.DateTime(required=False, missing=None)
-start_date_gte = fields.DateTime(required=False, missing=None)
-start_date_lte = fields.DateTime(required=False, missing=None)
-end_date_gte = fields.DateTime(required=False, missing=None)
-end_date_lte = fields.DateTime(required=False, missing=None)
+page_offset = fields.Int(missing=0, min=0)
+page_limit = fields.Int(missing=100, min=1)
+dag_ids = fields.List(fields.Str(), missing=None)
+execution_date_gte = fields.DateTime(missing=None)
+execution_date_lte = fields.DateTime(missing=None)
+start_date_gte = fields.DateTime(missing=None)
+start_date_lte = fields.DateTime(missing=None)
+end_date_gte = fields.DateTime(missing=None)
+end_date_lte = fields.DateTime(missing=None)

Review comment:
   Do you have any problem when you remove `missing=None`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9556: API Endpoint - DagRuns Batch

2020-07-02 Thread GitBox


ephraimbuddy commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r448810142



##
File path: airflow/api_connexion/schemas/dag_run_schema.py
##
@@ -72,5 +72,25 @@ class DAGRunCollectionSchema(Schema):
 total_entries = fields.Int()
 
 
+class DagRunsBatchFormSchema(Schema):
+""" Schema to validate and deserialize the Form(request payload) submitted 
to DagRun Batch endpoint"""
+
+class Meta:
+""" Meta """
+datetimeformat = 'iso'
+strict = True
+
+page_offset = fields.Int(required=False, missing=0, min=0)
+page_limit = fields.Int(required=False, missing=100, min=1)

Review comment:
   I think it's better left out. Some servers can handle more than 1000 
page request limit. I'm working on configurable page request limit and we can 
apply it here. Then it'll not be a hard limit but a limit that the users can 
configure.
   You can see the commit here 
https://github.com/apache/airflow/commit/a37ac22c179e5dccc95b05c862bffc542dad125e





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9556: API Endpoint - DagRuns Batch

2020-07-01 Thread GitBox


ephraimbuddy commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r448256015



##
File path: airflow/api_connexion/schemas/dag_run_schema.py
##
@@ -72,5 +72,25 @@ class DAGRunCollectionSchema(Schema):
 total_entries = fields.Int()
 
 
+class DagRunsBatchFormSchema(Schema):
+""" Schema to validate and deserialize the Form(request payload) submitted 
to DagRun Batch endpoint"""
+
+class Meta:
+""" Meta """
+datetimeformat = 'iso'
+strict = True
+
+page_offset = fields.Int(required=False, missing=0, min=0)
+page_limit = fields.Int(required=False, missing=100, min=1)
+dag_ids = fields.List(fields.Str(), required=False, missing=None)
+execution_date_gte = fields.DateTime(required=False, missing=None)

Review comment:
   ```suggestion
   execution_date_gte = fields.DateTime()
   ```
   I suggest you remove the missing and required args.  To only use them when 
the value should be something other than False and None respectively. This is 
already the default from marshmallow. 
   
https://marshmallow.readthedocs.io/en/2.x-line/api_reference.html#module-marshmallow.fields





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9556: API Endpoint - DagRuns Batch

2020-07-01 Thread GitBox


ephraimbuddy commented on a change in pull request #9556:
URL: https://github.com/apache/airflow/pull/9556#discussion_r448256015



##
File path: airflow/api_connexion/schemas/dag_run_schema.py
##
@@ -72,5 +72,25 @@ class DAGRunCollectionSchema(Schema):
 total_entries = fields.Int()
 
 
+class DagRunsBatchFormSchema(Schema):
+""" Schema to validate and deserialize the Form(request payload) submitted 
to DagRun Batch endpoint"""
+
+class Meta:
+""" Meta """
+datetimeformat = 'iso'
+strict = True
+
+page_offset = fields.Int(required=False, missing=0, min=0)
+page_limit = fields.Int(required=False, missing=100, min=1)
+dag_ids = fields.List(fields.Str(), required=False, missing=None)
+execution_date_gte = fields.DateTime(required=False, missing=None)

Review comment:
   ```suggestion
   execution_date_gte = fields.DateTime()
   ```
   I suggest you remove the missing and required field.  To only use them when 
the value should be something other than False and None respectively. This is 
already the default from marshmallow. 
   
https://marshmallow.readthedocs.io/en/2.x-line/api_reference.html#module-marshmallow.fields





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org