[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files

2020-03-02 Thread GitBox
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] 
Rendering templated_fields without accessing DAG files
URL: https://github.com/apache/airflow/pull/6788#discussion_r386480827
 
 

 ##
 File path: airflow/www/views.py
 ##
 @@ -570,25 +572,37 @@ def dag_details(self, session=None):
 @has_dag_access(can_dag_read=True)
 @has_access
 @action_logging
-def rendered(self):
+@provide_session
+def rendered(self, session=None):
 dag_id = request.args.get('dag_id')
 task_id = request.args.get('task_id')
 execution_date = request.args.get('execution_date')
 dttm = timezone.parse(execution_date)
 form = DateTimeForm(data={'execution_date': dttm})
 root = request.args.get('root', '')
-# Loads dag from file
-logging.info("Processing DAG file to render template.")
-dag = dagbag.get_dag(dag_id, from_file_only=True)
+
+logging.info("Retrieving rendered templates.")
+dag = dagbag.get_dag(dag_id)
+
 task = copy.copy(dag.get_task(task_id))
 ti = models.TaskInstance(task=task, execution_date=dttm)
 try:
-ti.render_templates()
+if STORE_SERIALIZED_DAGS:
+rtif = RenderedTaskInstanceFields.get_templated_fields(ti)
+if rtif:
+for field_name, rendered_value in rtif.items():
+setattr(task, field_name, rendered_value)
+else:
+# ToDo: Fetch raw strings from RenderedTaskInstanceFields 
table
+flash("Template field not found")
+else:
+ti.render_templates()
 
 Review comment:
   Done !


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files

2020-03-02 Thread GitBox
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] 
Rendering templated_fields without accessing DAG files
URL: https://github.com/apache/airflow/pull/6788#discussion_r386480519
 
 

 ##
 File path: airflow/utils/operator_helpers.py
 ##
 @@ -84,3 +85,24 @@ def context_to_airflow_vars(context, 
in_env_var_format=False):
 params[AIRFLOW_VAR_NAME_FORMAT_MAPPING['AIRFLOW_CONTEXT_DAG_RUN_ID'][
 name_format]] = dag_run.run_id
 return params
+
+
+def serialize_template_field(template_field):
 
 Review comment:
   Moved


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files

2020-03-02 Thread GitBox
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] 
Rendering templated_fields without accessing DAG files
URL: https://github.com/apache/airflow/pull/6788#discussion_r386480422
 
 

 ##
 File path: airflow/serialization/serialized_objects.py
 ##
 @@ -319,6 +320,9 @@ def serialize_operator(cls, op: BaseOperator) -> dict:
 if op.operator_extra_links:
 serialize_op['_operator_extra_links'] = \
 cls._serialize_operator_extra_links(op.operator_extra_links)
+serialize_op['_templated_fields'] = {
+field: serialize_template_field(getattr(op, field)) for field in 
op.template_fields
 
 Review comment:
   Removing Unrendered part from this PR. Will create a separate PR to add that


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files

2020-03-02 Thread GitBox
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] 
Rendering templated_fields without accessing DAG files
URL: https://github.com/apache/airflow/pull/6788#discussion_r386387672
 
 

 ##
 File path: airflow/models/templatedfields.py
 ##
 @@ -0,0 +1,126 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Save Rendered Template Fields """
+import json
+from typing import Optional
+
+from sqlalchemy import JSON, Column, String, and_, exists, tuple_
+from sqlalchemy.orm import Session
+
+from airflow.configuration import conf
+from airflow.models.base import ID_LEN, Base
+from airflow.models.taskinstance import TaskInstance
+from airflow.utils.operator_helpers import serialize_template_field
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+
+class RenderedTaskInstanceFields(Base):
+"""
+Save Rendered Template Fields
+"""
+
+__tablename__ = "rendered_task_instance_fields"
+
+dag_id = Column(String(ID_LEN), primary_key=True)
+task_id = Column(String(ID_LEN), primary_key=True)
+execution_date = Column(UtcDateTime, primary_key=True)
+rendered_fields = Column(JSON, nullable=False)
+
+def __init__(self, ti: TaskInstance):
+self.dag_id = ti.dag_id
+self.task_id = ti.task_id
+self.task = ti.task
+self.execution_date = ti.execution_date
+
+ti.render_templates()
+self.rendered_fields = {
+field: serialize_template_field(
+getattr(self.task, field)
+) for field in self.task.template_fields
+}
+
+@classmethod
+@provide_session
+def get_templated_fields(cls, ti: TaskInstance, session: Session = None) 
-> Optional[dict]:
+"""
+Get templated field for a TaskInstance from the 
RenderedTaskInstanceFields
+table.
+
+:param ti: Task Instance
+:param session: SqlAlchemy Session
+:return: Rendered Templated TI field
+"""
+result = session.query(cls.rendered_fields).filter(
+cls.dag_id == ti.dag_id,
+cls.task_id == ti.task_id,
+cls.execution_date == ti.execution_date
+).first()
+
+if result:
+rendered_fields = result.rendered_fields
+if isinstance(rendered_fields, str):
+rendered_fields = json.loads(rendered_fields)
 
 Review comment:
   This is a safe-guard to DBs that don't support JSON 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files

2020-03-02 Thread GitBox
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] 
Rendering templated_fields without accessing DAG files
URL: https://github.com/apache/airflow/pull/6788#discussion_r386336053
 
 

 ##
 File path: airflow/models/templatedfields.py
 ##
 @@ -0,0 +1,126 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Save Rendered Template Fields """
+import json
+from typing import Optional
+
+from sqlalchemy import JSON, Column, String, and_, exists, tuple_
+from sqlalchemy.orm import Session
+
+from airflow.configuration import conf
+from airflow.models.base import ID_LEN, Base
+from airflow.models.taskinstance import TaskInstance
+from airflow.utils.operator_helpers import serialize_template_field
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+
+class RenderedTaskInstanceFields(Base):
+"""
+Save Rendered Template Fields
+"""
+
+__tablename__ = "rendered_task_instance_fields"
+
+dag_id = Column(String(ID_LEN), primary_key=True)
+task_id = Column(String(ID_LEN), primary_key=True)
+execution_date = Column(UtcDateTime, primary_key=True)
+rendered_fields = Column(JSON, nullable=False)
+
+def __init__(self, ti: TaskInstance):
+self.dag_id = ti.dag_id
+self.task_id = ti.task_id
+self.task = ti.task
+self.execution_date = ti.execution_date
+
+ti.render_templates()
+self.rendered_fields = {
+field: serialize_template_field(
+getattr(self.task, field)
+) for field in self.task.template_fields
+}
+
+@classmethod
+@provide_session
+def get_templated_fields(cls, ti: TaskInstance, session: Session = None) 
-> Optional[dict]:
+"""
+Get templated field for a TaskInstance from the 
RenderedTaskInstanceFields
+table.
+
+:param ti: Task Instance
+:param session: SqlAlchemy Session
+:return: Rendered Templated TI field
+"""
+result = session.query(cls.rendered_fields).filter(
+cls.dag_id == ti.dag_id,
+cls.task_id == ti.task_id,
+cls.execution_date == ti.execution_date
+).first()
+
+if result:
+rendered_fields = result.rendered_fields
+if isinstance(rendered_fields, str):
+rendered_fields = json.loads(rendered_fields)
+return rendered_fields
+else:
+return None
+
+@classmethod
+@provide_session
+def has_templated_fields(cls, ti: TaskInstance, session: Session = None) 
-> bool:
+"""Checks templated field exist for this ti.
+
+:param ti: Task Instance
+:param session: SqlAlchemy Session
+"""
+return session.query(exists().where(
+and_(cls.dag_id == ti.dag_id,
+ cls.task_id == ti.task_id,
+ cls.execution_date == ti.execution_date)
+)).scalar()
 
 Review comment:
   We are already using "exists":
   
   ```
   session.query(exists().where(
   and_(cls.dag_id == ti.dag_id,
cls.task_id == ti.task_id,
cls.execution_date == ti.execution_date)
   )).scalar()
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files

2020-02-27 Thread GitBox
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] 
Rendering templated_fields without accessing DAG files
URL: https://github.com/apache/airflow/pull/6788#discussion_r385502185
 
 

 ##
 File path: airflow/models/templatedfields.py
 ##
 @@ -0,0 +1,97 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Save Rendered Template Fields """
+import json
+
+from sqlalchemy import JSON, Column, String
+from sqlalchemy.orm import Session
+
+from airflow.models.base import ID_LEN, Base
+from airflow.models.taskinstance import TaskInstance
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+
+class RenderedTaskInstanceFields(Base):
+"""
+Save Rendered Template Fields
+"""
+
+__tablename__ = "rendered_task_instance_fields"
+
+dag_id = Column(String(ID_LEN), primary_key=True)
+task_id = Column(String(ID_LEN), primary_key=True)
+execution_date = Column(UtcDateTime, primary_key=True)
+rendered_fields = Column(JSON, nullable=True)
 
 Review comment:
   updated


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files

2020-02-27 Thread GitBox
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] 
Rendering templated_fields without accessing DAG files
URL: https://github.com/apache/airflow/pull/6788#discussion_r38550
 
 

 ##
 File path: airflow/models/templatedfields.py
 ##
 @@ -0,0 +1,97 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Save Rendered Template Fields """
+import json
+
+from sqlalchemy import JSON, Column, String
+from sqlalchemy.orm import Session
+
+from airflow.models.base import ID_LEN, Base
+from airflow.models.taskinstance import TaskInstance
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+
+class RenderedTaskInstanceFields(Base):
+"""
+Save Rendered Template Fields
+"""
+
+__tablename__ = "rendered_task_instance_fields"
+
+dag_id = Column(String(ID_LEN), primary_key=True)
+task_id = Column(String(ID_LEN), primary_key=True)
+execution_date = Column(UtcDateTime, primary_key=True)
+rendered_fields = Column(JSON, nullable=True)
+
+def __init__(self, ti: TaskInstance):
+self.dag_id = ti.dag_id
+self.task_id = ti.task_id
+self.task = ti.task
+self.execution_date = ti.execution_date
+
+ti.render_templates()
+self.rendered_fields = {
+field: self.serialize_rendered_field(
+getattr(self.task, field)
+) for field in self.task.template_fields
+}
+
+@staticmethod
+@provide_session
+def get_templated_fields(ti: TaskInstance, session: Session = None):
+"""
+Get templated field for a TaskInstance from the 
RenderedTaskInstanceFields
+table.
+
+:param ti: Task Instance
+:param session: SqlAlchemy Session
+:return: Rendered Templated TI field
+"""
+result = 
session.query(RenderedTaskInstanceFields.rendered_fields).filter(
+RenderedTaskInstanceFields.dag_id == ti.dag_id,
+RenderedTaskInstanceFields.task_id == ti.task_id,
+RenderedTaskInstanceFields.execution_date == ti.execution_date
+).first()
+
+if result:
+return result.rendered_fields
+else:
+return None
+
+@staticmethod
+def serialize_rendered_field(rendered_field):
 
 Review comment:
   done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files

2020-02-27 Thread GitBox
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] 
Rendering templated_fields without accessing DAG files
URL: https://github.com/apache/airflow/pull/6788#discussion_r385502167
 
 

 ##
 File path: airflow/models/dagrun.py
 ##
 @@ -419,6 +420,17 @@ def verify_integrity(self, session=None):
 1, 1)
 ti = TaskInstance(task, self.execution_date)
 session.add(ti)
+session.commit()
+
+# ToDo: Store only Last X number (maybe 10 or 100) TIs for a 
task
+rtif = session.query(RenderedTaskInstanceFields).filter(
+RenderedTaskInstanceFields.dag_id == ti.dag_id,
+RenderedTaskInstanceFields.task_id == ti.task_id,
+RenderedTaskInstanceFields.execution_date == 
ti.execution_date,
+).first()
 
 Review comment:
   Updated


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files

2020-02-26 Thread GitBox
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] 
Rendering templated_fields without accessing DAG files
URL: https://github.com/apache/airflow/pull/6788#discussion_r384646459
 
 

 ##
 File path: airflow/models/dagrun.py
 ##
 @@ -419,6 +420,17 @@ def verify_integrity(self, session=None):
 1, 1)
 ti = TaskInstance(task, self.execution_date)
 session.add(ti)
+session.commit()
+
+# ToDo: Store only Last X number (maybe 10 or 100) TIs for a 
task
+rtif = session.query(RenderedTaskInstanceFields).filter(
+RenderedTaskInstanceFields.dag_id == ti.dag_id,
+RenderedTaskInstanceFields.task_id == ti.task_id,
+RenderedTaskInstanceFields.execution_date == 
ti.execution_date,
+).first()
 
 Review comment:
   I can use the following but according to 
https://docs.sqlalchemy.org/en/13/orm/query.html#sqlalchemy.orm.query.Query.exists
 we will need to further modify it
   
   >Note that some databases such as SQL Server don’t allow an EXISTS 
expression to be present in the columns clause of a SELECT. To select a simple 
boolean value based on the exists as a WHERE, use literal():
   
   
   ```python
   rtif = session.query(session.query(RenderedTaskInstanceFields).filter(
   RenderedTaskInstanceFields.dag_id == ti.dag_id,
   RenderedTaskInstanceFields.task_id == ti.task_id,
   RenderedTaskInstanceFields.execution_date == 
ti.execution_date,
   ).exists()).scalar()
   ```
   
   We might need to change it as follows for it work on SQL Server:
   
   ```python
   rtif = 
session.query(literal(True)).filter(session.query(RenderedTaskInstanceFields).filter(
   RenderedTaskInstanceFields.dag_id == ti.dag_id,
   RenderedTaskInstanceFields.task_id == ti.task_id,
   RenderedTaskInstanceFields.execution_date == 
ti.execution_date,
   ).exists()).scalar()
   ```
   
   WDYT?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files

2020-02-26 Thread GitBox
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] 
Rendering templated_fields without accessing DAG files
URL: https://github.com/apache/airflow/pull/6788#discussion_r384646459
 
 

 ##
 File path: airflow/models/dagrun.py
 ##
 @@ -419,6 +420,17 @@ def verify_integrity(self, session=None):
 1, 1)
 ti = TaskInstance(task, self.execution_date)
 session.add(ti)
+session.commit()
+
+# ToDo: Store only Last X number (maybe 10 or 100) TIs for a 
task
+rtif = session.query(RenderedTaskInstanceFields).filter(
+RenderedTaskInstanceFields.dag_id == ti.dag_id,
+RenderedTaskInstanceFields.task_id == ti.task_id,
+RenderedTaskInstanceFields.execution_date == 
ti.execution_date,
+).first()
 
 Review comment:
   
   I can use the following but according to 
https://docs.sqlalchemy.org/en/13/orm/query.html#sqlalchemy.orm.query.Query.exists
 we will need to further modify it
   
   >Note that some databases such as SQL Server don’t allow an EXISTS 
expression to be present in the columns clause of a SELECT. To select a simple 
boolean value based on the exists as a WHERE, use literal():
   
   
   ```
   rtif = session.query(session.query(RenderedTaskInstanceFields).filter(
   RenderedTaskInstanceFields.dag_id == ti.dag_id,
   RenderedTaskInstanceFields.task_id == ti.task_id,
   RenderedTaskInstanceFields.execution_date == 
ti.execution_date,
   ).exists()).scalar()
   ```
   
   We might need to change it as follows for it work on SQL Server:
   
   ```
   rtif = 
session.query(literal(True)).filter(session.query(RenderedTaskInstanceFields).filter(
   RenderedTaskInstanceFields.dag_id == ti.dag_id,
   RenderedTaskInstanceFields.task_id == ti.task_id,
   RenderedTaskInstanceFields.execution_date == 
ti.execution_date,
   ).exists()).scalar()
   ```
   
   WDYT?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files

2020-02-03 Thread GitBox
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] 
Rendering templated_fields without accessing DAG files
URL: https://github.com/apache/airflow/pull/6788#discussion_r374088960
 
 

 ##
 File path: airflow/www/views.py
 ##
 @@ -570,25 +572,39 @@ def dag_details(self, session=None):
 @has_dag_access(can_dag_read=True)
 @has_access
 @action_logging
-def rendered(self):
+@provide_session
+def rendered(self, session=None):
 dag_id = request.args.get('dag_id')
 task_id = request.args.get('task_id')
 execution_date = request.args.get('execution_date')
 dttm = timezone.parse(execution_date)
 form = DateTimeForm(data={'execution_date': dttm})
 root = request.args.get('root', '')
-# Loads dag from file
-logging.info("Processing DAG file to render template.")
-dag = dagbag.get_dag(dag_id, from_file_only=True)
+
+logging.info("Retrieving rendered templates.")
+dag = dagbag.get_dag(dag_id)
+
 task = copy.copy(dag.get_task(task_id))
 ti = models.TaskInstance(task=task, execution_date=dttm)
 try:
-ti.render_templates()
+if STORE_SERIALIZED_DAGS:
+rtif = RenderedTaskInstanceFields.get_templated_fields(ti)
+if rtif:
+if isinstance(rtif, str):
+rtif = json.loads(rtif)
+for field_name, rendered_value in rtif.items():
+setattr(task, field_name, rendered_value)
 
 Review comment:
   Ash's comment: 
   
   >make that a method on a Serialzied* class


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files

2020-02-03 Thread GitBox
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] 
Rendering templated_fields without accessing DAG files
URL: https://github.com/apache/airflow/pull/6788#discussion_r374086756
 
 

 ##
 File path: airflow/models/templatedfields.py
 ##
 @@ -0,0 +1,97 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Save Rendered Template Fields """
+import json
+
+from sqlalchemy import JSON, Column, String
+from sqlalchemy.orm import Session
+
+from airflow.models.base import ID_LEN, Base
+from airflow.models.taskinstance import TaskInstance
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+
+class RenderedTaskInstanceFields(Base):
+"""
+Save Rendered Template Fields
+"""
+
+__tablename__ = "rendered_task_instance_fields"
+
+dag_id = Column(String(ID_LEN), primary_key=True)
+task_id = Column(String(ID_LEN), primary_key=True)
+execution_date = Column(UtcDateTime, primary_key=True)
+rendered_fields = Column(JSON, nullable=True)
+
+def __init__(self, ti: TaskInstance):
+self.dag_id = ti.dag_id
+self.task_id = ti.task_id
+self.task = ti.task
+self.execution_date = ti.execution_date
+
+ti.render_templates()
+self.rendered_fields = {
+field: self.serialize_rendered_field(
+getattr(self.task, field)
+) for field in self.task.template_fields
+}
+
+@staticmethod
+@provide_session
+def get_templated_fields(ti: TaskInstance, session: Session = None):
+"""
+Get templated field for a TaskInstance from the 
RenderedTaskInstanceFields
+table.
+
+:param ti: Task Instance
+:param session: SqlAlchemy Session
+:return: Rendered Templated TI field
+"""
+result = 
session.query(RenderedTaskInstanceFields.rendered_fields).filter(
+RenderedTaskInstanceFields.dag_id == ti.dag_id,
+RenderedTaskInstanceFields.task_id == ti.task_id,
+RenderedTaskInstanceFields.execution_date == ti.execution_date
+).first()
+
+if result:
+return result.rendered_fields
+else:
+return None
+
+@staticmethod
+def serialize_rendered_field(rendered_field):
 
 Review comment:
   Yup, planning to put somewhere in "utils/" to avoid and import that in both 
of them to avoid cyclic imports


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files

2020-01-31 Thread GitBox
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] 
Rendering templated_fields without accessing DAG files
URL: https://github.com/apache/airflow/pull/6788#discussion_r373608085
 
 

 ##
 File path: 
airflow/migrations/versions/852ae6c715af_add_templated_fields_rendered_table.py
 ##
 @@ -0,0 +1,103 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add TemplatedFieldsRendered table
+
+Revision ID: 852ae6c715af
+Revises: 1d3a23a719bf
+Create Date: 2019-12-10 22:19:18.034961
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy.dialects import mysql
+
+# revision identifiers, used by Alembic.
+revision = '852ae6c715af'
+down_revision = '1d3a23a719bf'
+branch_labels = None
+depends_on = None
+
+TABLE_NAME = 'templated_fields_rendered'
+INDEX_NAME = 'idx_tfr_dag_task_date'
+
+
+def upgrade():
+"""Apply Add TemplatedFieldsRendered table"""
+json_type = sa.JSON
+conn = op.get_bind()  # pylint: disable=no-member
+
+if conn.dialect.name != "postgresql":
+# Mysql 5.7+/MariaDB 10.2.3 has JSON support. Rather than checking for
+# versions, check for the function existing.
+try:
+conn.execute("SELECT JSON_VALID(1)").fetchone()
+except sa.exc.OperationalError:
+json_type = sa.Text
+
+op.create_table(
+TABLE_NAME,  # pylint: disable=no-member
+sa.Column('dag_id', sa.String(length=250), nullable=False),
+sa.Column('task_id', sa.String(length=250), nullable=False),
+sa.Column('execution_date', sa.DateTime(), nullable=False),
+sa.Column('rendered_fields', json_type(), nullable=True),
+sa.PrimaryKeyConstraint('dag_id', 'task_id', 'execution_date')
+)
+op.create_index(  # pylint: disable=no-member
+INDEX_NAME,
+TABLE_NAME,
+['dag_id', 'task_id', 'execution_date'],
+)
+
+if conn.dialect.name == "mysql":
+conn.execute("SET time_zone = '+00:00'")
 
 Review comment:
   From the same 
[article](https://www.eversql.com/mysql-datetime-vs-timestamp-column-types-which-one-i-should-use/)
 :
   
   >To summarize, if you want to serve your date and time data the same way 
regardless of timezones, you can use the DATETIME type (which will also allow 
you to use all the date & type functions built in MySQL). Otherwise, you can 
use TIMESTAMP and serve the data on a per-timezone basis.
   
   Also, from 
https://docs.sqlalchemy.org/en/13/core/type_basics.html#sqlalchemy.types.DateTime
   >For timezone support, use at least the TIMESTAMP datatype, if not the 
dialect-specific datatype object.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files

2020-01-28 Thread GitBox
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] 
Rendering templated_fields without accessing DAG files
URL: https://github.com/apache/airflow/pull/6788#discussion_r372224607
 
 

 ##
 File path: 
airflow/migrations/versions/852ae6c715af_add_templated_fields_rendered_table.py
 ##
 @@ -0,0 +1,103 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add TemplatedFieldsRendered table
+
+Revision ID: 852ae6c715af
+Revises: 1d3a23a719bf
+Create Date: 2019-12-10 22:19:18.034961
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy.dialects import mysql
+
+# revision identifiers, used by Alembic.
+revision = '852ae6c715af'
+down_revision = '1d3a23a719bf'
+branch_labels = None
+depends_on = None
+
+TABLE_NAME = 'templated_fields_rendered'
+INDEX_NAME = 'idx_tfr_dag_task_date'
+
+
+def upgrade():
+"""Apply Add TemplatedFieldsRendered table"""
+json_type = sa.JSON
+conn = op.get_bind()  # pylint: disable=no-member
+
+if conn.dialect.name != "postgresql":
+# Mysql 5.7+/MariaDB 10.2.3 has JSON support. Rather than checking for
+# versions, check for the function existing.
+try:
+conn.execute("SELECT JSON_VALID(1)").fetchone()
+except sa.exc.OperationalError:
+json_type = sa.Text
+
+op.create_table(
+TABLE_NAME,  # pylint: disable=no-member
+sa.Column('dag_id', sa.String(length=250), nullable=False),
+sa.Column('task_id', sa.String(length=250), nullable=False),
+sa.Column('execution_date', sa.DateTime(), nullable=False),
+sa.Column('rendered_fields', json_type(), nullable=True),
+sa.PrimaryKeyConstraint('dag_id', 'task_id', 'execution_date')
+)
+op.create_index(  # pylint: disable=no-member
+INDEX_NAME,
+TABLE_NAME,
+['dag_id', 'task_id', 'execution_date'],
+)
+
+if conn.dialect.name == "mysql":
+conn.execute("SET time_zone = '+00:00'")
 
 Review comment:
   It won't contain the timezones though if we just use DATETIME! It might have 
undesirable effects when comparing current time with this time!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files

2020-01-28 Thread GitBox
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] 
Rendering templated_fields without accessing DAG files
URL: https://github.com/apache/airflow/pull/6788#discussion_r372224848
 
 

 ##
 File path: airflow/migrations/versions/1d3a23a719bf_merge_heads.py
 ##
 @@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""empty message
+
+Revision ID: 1d3a23a719bf
+Revises: c1840b4bcf1a, fe461863935f
+Create Date: 2019-12-10 22:18:09.253540
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '1d3a23a719bf'
+down_revision = ('c1840b4bcf1a', 'fe461863935f')
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+"""Apply empty message"""
+pass
+
+
+def downgrade():
+"""Unapply empty message"""
 
 Review comment:
   These are outdated, I need to update it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files

2020-01-28 Thread GitBox
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] 
Rendering templated_fields without accessing DAG files
URL: https://github.com/apache/airflow/pull/6788#discussion_r372224848
 
 

 ##
 File path: airflow/migrations/versions/1d3a23a719bf_merge_heads.py
 ##
 @@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""empty message
+
+Revision ID: 1d3a23a719bf
+Revises: c1840b4bcf1a, fe461863935f
+Create Date: 2019-12-10 22:18:09.253540
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = '1d3a23a719bf'
+down_revision = ('c1840b4bcf1a', 'fe461863935f')
+branch_labels = None
+depends_on = None
+
+
+def upgrade():
+"""Apply empty message"""
+pass
+
+
+def downgrade():
+"""Unapply empty message"""
 
 Review comment:
   This are outdated, I need to update it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files

2020-01-28 Thread GitBox
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] 
Rendering templated_fields without accessing DAG files
URL: https://github.com/apache/airflow/pull/6788#discussion_r372224607
 
 

 ##
 File path: 
airflow/migrations/versions/852ae6c715af_add_templated_fields_rendered_table.py
 ##
 @@ -0,0 +1,103 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Add TemplatedFieldsRendered table
+
+Revision ID: 852ae6c715af
+Revises: 1d3a23a719bf
+Create Date: 2019-12-10 22:19:18.034961
+
+"""
+
+import sqlalchemy as sa
+from alembic import op
+from sqlalchemy.dialects import mysql
+
+# revision identifiers, used by Alembic.
+revision = '852ae6c715af'
+down_revision = '1d3a23a719bf'
+branch_labels = None
+depends_on = None
+
+TABLE_NAME = 'templated_fields_rendered'
+INDEX_NAME = 'idx_tfr_dag_task_date'
+
+
+def upgrade():
+"""Apply Add TemplatedFieldsRendered table"""
+json_type = sa.JSON
+conn = op.get_bind()  # pylint: disable=no-member
+
+if conn.dialect.name != "postgresql":
+# Mysql 5.7+/MariaDB 10.2.3 has JSON support. Rather than checking for
+# versions, check for the function existing.
+try:
+conn.execute("SELECT JSON_VALID(1)").fetchone()
+except sa.exc.OperationalError:
+json_type = sa.Text
+
+op.create_table(
+TABLE_NAME,  # pylint: disable=no-member
+sa.Column('dag_id', sa.String(length=250), nullable=False),
+sa.Column('task_id', sa.String(length=250), nullable=False),
+sa.Column('execution_date', sa.DateTime(), nullable=False),
+sa.Column('rendered_fields', json_type(), nullable=True),
+sa.PrimaryKeyConstraint('dag_id', 'task_id', 'execution_date')
+)
+op.create_index(  # pylint: disable=no-member
+INDEX_NAME,
+TABLE_NAME,
+['dag_id', 'task_id', 'execution_date'],
+)
+
+if conn.dialect.name == "mysql":
+conn.execute("SET time_zone = '+00:00'")
 
 Review comment:
   It won't contain the timezones though if we just use DATETIME! It might have 
undesirable effects when comparing current time vs this time!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services