[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files URL: https://github.com/apache/airflow/pull/6788#discussion_r386480827 ## File path: airflow/www/views.py ## @@ -570,25 +572,37 @@ def dag_details(self, session=None): @has_dag_access(can_dag_read=True) @has_access @action_logging -def rendered(self): +@provide_session +def rendered(self, session=None): dag_id = request.args.get('dag_id') task_id = request.args.get('task_id') execution_date = request.args.get('execution_date') dttm = timezone.parse(execution_date) form = DateTimeForm(data={'execution_date': dttm}) root = request.args.get('root', '') -# Loads dag from file -logging.info("Processing DAG file to render template.") -dag = dagbag.get_dag(dag_id, from_file_only=True) + +logging.info("Retrieving rendered templates.") +dag = dagbag.get_dag(dag_id) + task = copy.copy(dag.get_task(task_id)) ti = models.TaskInstance(task=task, execution_date=dttm) try: -ti.render_templates() +if STORE_SERIALIZED_DAGS: +rtif = RenderedTaskInstanceFields.get_templated_fields(ti) +if rtif: +for field_name, rendered_value in rtif.items(): +setattr(task, field_name, rendered_value) +else: +# ToDo: Fetch raw strings from RenderedTaskInstanceFields table +flash("Template field not found") +else: +ti.render_templates() Review comment: Done ! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files URL: https://github.com/apache/airflow/pull/6788#discussion_r386480519 ## File path: airflow/utils/operator_helpers.py ## @@ -84,3 +85,24 @@ def context_to_airflow_vars(context, in_env_var_format=False): params[AIRFLOW_VAR_NAME_FORMAT_MAPPING['AIRFLOW_CONTEXT_DAG_RUN_ID'][ name_format]] = dag_run.run_id return params + + +def serialize_template_field(template_field): Review comment: Moved This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files URL: https://github.com/apache/airflow/pull/6788#discussion_r386480422 ## File path: airflow/serialization/serialized_objects.py ## @@ -319,6 +320,9 @@ def serialize_operator(cls, op: BaseOperator) -> dict: if op.operator_extra_links: serialize_op['_operator_extra_links'] = \ cls._serialize_operator_extra_links(op.operator_extra_links) +serialize_op['_templated_fields'] = { +field: serialize_template_field(getattr(op, field)) for field in op.template_fields Review comment: Removing Unrendered part from this PR. Will create a separate PR to add that This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files URL: https://github.com/apache/airflow/pull/6788#discussion_r386387672 ## File path: airflow/models/templatedfields.py ## @@ -0,0 +1,126 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Save Rendered Template Fields """ +import json +from typing import Optional + +from sqlalchemy import JSON, Column, String, and_, exists, tuple_ +from sqlalchemy.orm import Session + +from airflow.configuration import conf +from airflow.models.base import ID_LEN, Base +from airflow.models.taskinstance import TaskInstance +from airflow.utils.operator_helpers import serialize_template_field +from airflow.utils.session import provide_session +from airflow.utils.sqlalchemy import UtcDateTime + + +class RenderedTaskInstanceFields(Base): +""" +Save Rendered Template Fields +""" + +__tablename__ = "rendered_task_instance_fields" + +dag_id = Column(String(ID_LEN), primary_key=True) +task_id = Column(String(ID_LEN), primary_key=True) +execution_date = Column(UtcDateTime, primary_key=True) +rendered_fields = Column(JSON, nullable=False) + +def __init__(self, ti: TaskInstance): +self.dag_id = ti.dag_id +self.task_id = ti.task_id +self.task = ti.task +self.execution_date = ti.execution_date + +ti.render_templates() +self.rendered_fields = { +field: serialize_template_field( +getattr(self.task, field) +) for field in self.task.template_fields +} + +@classmethod +@provide_session +def get_templated_fields(cls, ti: TaskInstance, session: Session = None) -> Optional[dict]: +""" +Get templated field for a TaskInstance from the RenderedTaskInstanceFields +table. + +:param ti: Task Instance +:param session: SqlAlchemy Session +:return: Rendered Templated TI field +""" +result = session.query(cls.rendered_fields).filter( +cls.dag_id == ti.dag_id, +cls.task_id == ti.task_id, +cls.execution_date == ti.execution_date +).first() + +if result: +rendered_fields = result.rendered_fields +if isinstance(rendered_fields, str): +rendered_fields = json.loads(rendered_fields) Review comment: This is a safe-guard to DBs that don't support JSON This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files URL: https://github.com/apache/airflow/pull/6788#discussion_r386336053 ## File path: airflow/models/templatedfields.py ## @@ -0,0 +1,126 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Save Rendered Template Fields """ +import json +from typing import Optional + +from sqlalchemy import JSON, Column, String, and_, exists, tuple_ +from sqlalchemy.orm import Session + +from airflow.configuration import conf +from airflow.models.base import ID_LEN, Base +from airflow.models.taskinstance import TaskInstance +from airflow.utils.operator_helpers import serialize_template_field +from airflow.utils.session import provide_session +from airflow.utils.sqlalchemy import UtcDateTime + + +class RenderedTaskInstanceFields(Base): +""" +Save Rendered Template Fields +""" + +__tablename__ = "rendered_task_instance_fields" + +dag_id = Column(String(ID_LEN), primary_key=True) +task_id = Column(String(ID_LEN), primary_key=True) +execution_date = Column(UtcDateTime, primary_key=True) +rendered_fields = Column(JSON, nullable=False) + +def __init__(self, ti: TaskInstance): +self.dag_id = ti.dag_id +self.task_id = ti.task_id +self.task = ti.task +self.execution_date = ti.execution_date + +ti.render_templates() +self.rendered_fields = { +field: serialize_template_field( +getattr(self.task, field) +) for field in self.task.template_fields +} + +@classmethod +@provide_session +def get_templated_fields(cls, ti: TaskInstance, session: Session = None) -> Optional[dict]: +""" +Get templated field for a TaskInstance from the RenderedTaskInstanceFields +table. + +:param ti: Task Instance +:param session: SqlAlchemy Session +:return: Rendered Templated TI field +""" +result = session.query(cls.rendered_fields).filter( +cls.dag_id == ti.dag_id, +cls.task_id == ti.task_id, +cls.execution_date == ti.execution_date +).first() + +if result: +rendered_fields = result.rendered_fields +if isinstance(rendered_fields, str): +rendered_fields = json.loads(rendered_fields) +return rendered_fields +else: +return None + +@classmethod +@provide_session +def has_templated_fields(cls, ti: TaskInstance, session: Session = None) -> bool: +"""Checks templated field exist for this ti. + +:param ti: Task Instance +:param session: SqlAlchemy Session +""" +return session.query(exists().where( +and_(cls.dag_id == ti.dag_id, + cls.task_id == ti.task_id, + cls.execution_date == ti.execution_date) +)).scalar() Review comment: We are already using "exists": ``` session.query(exists().where( and_(cls.dag_id == ti.dag_id, cls.task_id == ti.task_id, cls.execution_date == ti.execution_date) )).scalar() ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files URL: https://github.com/apache/airflow/pull/6788#discussion_r385502185 ## File path: airflow/models/templatedfields.py ## @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Save Rendered Template Fields """ +import json + +from sqlalchemy import JSON, Column, String +from sqlalchemy.orm import Session + +from airflow.models.base import ID_LEN, Base +from airflow.models.taskinstance import TaskInstance +from airflow.utils.session import provide_session +from airflow.utils.sqlalchemy import UtcDateTime + + +class RenderedTaskInstanceFields(Base): +""" +Save Rendered Template Fields +""" + +__tablename__ = "rendered_task_instance_fields" + +dag_id = Column(String(ID_LEN), primary_key=True) +task_id = Column(String(ID_LEN), primary_key=True) +execution_date = Column(UtcDateTime, primary_key=True) +rendered_fields = Column(JSON, nullable=True) Review comment: updated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files URL: https://github.com/apache/airflow/pull/6788#discussion_r38550 ## File path: airflow/models/templatedfields.py ## @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Save Rendered Template Fields """ +import json + +from sqlalchemy import JSON, Column, String +from sqlalchemy.orm import Session + +from airflow.models.base import ID_LEN, Base +from airflow.models.taskinstance import TaskInstance +from airflow.utils.session import provide_session +from airflow.utils.sqlalchemy import UtcDateTime + + +class RenderedTaskInstanceFields(Base): +""" +Save Rendered Template Fields +""" + +__tablename__ = "rendered_task_instance_fields" + +dag_id = Column(String(ID_LEN), primary_key=True) +task_id = Column(String(ID_LEN), primary_key=True) +execution_date = Column(UtcDateTime, primary_key=True) +rendered_fields = Column(JSON, nullable=True) + +def __init__(self, ti: TaskInstance): +self.dag_id = ti.dag_id +self.task_id = ti.task_id +self.task = ti.task +self.execution_date = ti.execution_date + +ti.render_templates() +self.rendered_fields = { +field: self.serialize_rendered_field( +getattr(self.task, field) +) for field in self.task.template_fields +} + +@staticmethod +@provide_session +def get_templated_fields(ti: TaskInstance, session: Session = None): +""" +Get templated field for a TaskInstance from the RenderedTaskInstanceFields +table. + +:param ti: Task Instance +:param session: SqlAlchemy Session +:return: Rendered Templated TI field +""" +result = session.query(RenderedTaskInstanceFields.rendered_fields).filter( +RenderedTaskInstanceFields.dag_id == ti.dag_id, +RenderedTaskInstanceFields.task_id == ti.task_id, +RenderedTaskInstanceFields.execution_date == ti.execution_date +).first() + +if result: +return result.rendered_fields +else: +return None + +@staticmethod +def serialize_rendered_field(rendered_field): Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files URL: https://github.com/apache/airflow/pull/6788#discussion_r385502167 ## File path: airflow/models/dagrun.py ## @@ -419,6 +420,17 @@ def verify_integrity(self, session=None): 1, 1) ti = TaskInstance(task, self.execution_date) session.add(ti) +session.commit() + +# ToDo: Store only Last X number (maybe 10 or 100) TIs for a task +rtif = session.query(RenderedTaskInstanceFields).filter( +RenderedTaskInstanceFields.dag_id == ti.dag_id, +RenderedTaskInstanceFields.task_id == ti.task_id, +RenderedTaskInstanceFields.execution_date == ti.execution_date, +).first() Review comment: Updated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files URL: https://github.com/apache/airflow/pull/6788#discussion_r384646459 ## File path: airflow/models/dagrun.py ## @@ -419,6 +420,17 @@ def verify_integrity(self, session=None): 1, 1) ti = TaskInstance(task, self.execution_date) session.add(ti) +session.commit() + +# ToDo: Store only Last X number (maybe 10 or 100) TIs for a task +rtif = session.query(RenderedTaskInstanceFields).filter( +RenderedTaskInstanceFields.dag_id == ti.dag_id, +RenderedTaskInstanceFields.task_id == ti.task_id, +RenderedTaskInstanceFields.execution_date == ti.execution_date, +).first() Review comment: I can use the following but according to https://docs.sqlalchemy.org/en/13/orm/query.html#sqlalchemy.orm.query.Query.exists we will need to further modify it >Note that some databases such as SQL Server don’t allow an EXISTS expression to be present in the columns clause of a SELECT. To select a simple boolean value based on the exists as a WHERE, use literal(): ```python rtif = session.query(session.query(RenderedTaskInstanceFields).filter( RenderedTaskInstanceFields.dag_id == ti.dag_id, RenderedTaskInstanceFields.task_id == ti.task_id, RenderedTaskInstanceFields.execution_date == ti.execution_date, ).exists()).scalar() ``` We might need to change it as follows for it work on SQL Server: ```python rtif = session.query(literal(True)).filter(session.query(RenderedTaskInstanceFields).filter( RenderedTaskInstanceFields.dag_id == ti.dag_id, RenderedTaskInstanceFields.task_id == ti.task_id, RenderedTaskInstanceFields.execution_date == ti.execution_date, ).exists()).scalar() ``` WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files URL: https://github.com/apache/airflow/pull/6788#discussion_r384646459 ## File path: airflow/models/dagrun.py ## @@ -419,6 +420,17 @@ def verify_integrity(self, session=None): 1, 1) ti = TaskInstance(task, self.execution_date) session.add(ti) +session.commit() + +# ToDo: Store only Last X number (maybe 10 or 100) TIs for a task +rtif = session.query(RenderedTaskInstanceFields).filter( +RenderedTaskInstanceFields.dag_id == ti.dag_id, +RenderedTaskInstanceFields.task_id == ti.task_id, +RenderedTaskInstanceFields.execution_date == ti.execution_date, +).first() Review comment: I can use the following but according to https://docs.sqlalchemy.org/en/13/orm/query.html#sqlalchemy.orm.query.Query.exists we will need to further modify it >Note that some databases such as SQL Server don’t allow an EXISTS expression to be present in the columns clause of a SELECT. To select a simple boolean value based on the exists as a WHERE, use literal(): ``` rtif = session.query(session.query(RenderedTaskInstanceFields).filter( RenderedTaskInstanceFields.dag_id == ti.dag_id, RenderedTaskInstanceFields.task_id == ti.task_id, RenderedTaskInstanceFields.execution_date == ti.execution_date, ).exists()).scalar() ``` We might need to change it as follows for it work on SQL Server: ``` rtif = session.query(literal(True)).filter(session.query(RenderedTaskInstanceFields).filter( RenderedTaskInstanceFields.dag_id == ti.dag_id, RenderedTaskInstanceFields.task_id == ti.task_id, RenderedTaskInstanceFields.execution_date == ti.execution_date, ).exists()).scalar() ``` WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files URL: https://github.com/apache/airflow/pull/6788#discussion_r374088960 ## File path: airflow/www/views.py ## @@ -570,25 +572,39 @@ def dag_details(self, session=None): @has_dag_access(can_dag_read=True) @has_access @action_logging -def rendered(self): +@provide_session +def rendered(self, session=None): dag_id = request.args.get('dag_id') task_id = request.args.get('task_id') execution_date = request.args.get('execution_date') dttm = timezone.parse(execution_date) form = DateTimeForm(data={'execution_date': dttm}) root = request.args.get('root', '') -# Loads dag from file -logging.info("Processing DAG file to render template.") -dag = dagbag.get_dag(dag_id, from_file_only=True) + +logging.info("Retrieving rendered templates.") +dag = dagbag.get_dag(dag_id) + task = copy.copy(dag.get_task(task_id)) ti = models.TaskInstance(task=task, execution_date=dttm) try: -ti.render_templates() +if STORE_SERIALIZED_DAGS: +rtif = RenderedTaskInstanceFields.get_templated_fields(ti) +if rtif: +if isinstance(rtif, str): +rtif = json.loads(rtif) +for field_name, rendered_value in rtif.items(): +setattr(task, field_name, rendered_value) Review comment: Ash's comment: >make that a method on a Serialzied* class This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files URL: https://github.com/apache/airflow/pull/6788#discussion_r374086756 ## File path: airflow/models/templatedfields.py ## @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Save Rendered Template Fields """ +import json + +from sqlalchemy import JSON, Column, String +from sqlalchemy.orm import Session + +from airflow.models.base import ID_LEN, Base +from airflow.models.taskinstance import TaskInstance +from airflow.utils.session import provide_session +from airflow.utils.sqlalchemy import UtcDateTime + + +class RenderedTaskInstanceFields(Base): +""" +Save Rendered Template Fields +""" + +__tablename__ = "rendered_task_instance_fields" + +dag_id = Column(String(ID_LEN), primary_key=True) +task_id = Column(String(ID_LEN), primary_key=True) +execution_date = Column(UtcDateTime, primary_key=True) +rendered_fields = Column(JSON, nullable=True) + +def __init__(self, ti: TaskInstance): +self.dag_id = ti.dag_id +self.task_id = ti.task_id +self.task = ti.task +self.execution_date = ti.execution_date + +ti.render_templates() +self.rendered_fields = { +field: self.serialize_rendered_field( +getattr(self.task, field) +) for field in self.task.template_fields +} + +@staticmethod +@provide_session +def get_templated_fields(ti: TaskInstance, session: Session = None): +""" +Get templated field for a TaskInstance from the RenderedTaskInstanceFields +table. + +:param ti: Task Instance +:param session: SqlAlchemy Session +:return: Rendered Templated TI field +""" +result = session.query(RenderedTaskInstanceFields.rendered_fields).filter( +RenderedTaskInstanceFields.dag_id == ti.dag_id, +RenderedTaskInstanceFields.task_id == ti.task_id, +RenderedTaskInstanceFields.execution_date == ti.execution_date +).first() + +if result: +return result.rendered_fields +else: +return None + +@staticmethod +def serialize_rendered_field(rendered_field): Review comment: Yup, planning to put somewhere in "utils/" to avoid and import that in both of them to avoid cyclic imports This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files URL: https://github.com/apache/airflow/pull/6788#discussion_r373608085 ## File path: airflow/migrations/versions/852ae6c715af_add_templated_fields_rendered_table.py ## @@ -0,0 +1,103 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Add TemplatedFieldsRendered table + +Revision ID: 852ae6c715af +Revises: 1d3a23a719bf +Create Date: 2019-12-10 22:19:18.034961 + +""" + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import mysql + +# revision identifiers, used by Alembic. +revision = '852ae6c715af' +down_revision = '1d3a23a719bf' +branch_labels = None +depends_on = None + +TABLE_NAME = 'templated_fields_rendered' +INDEX_NAME = 'idx_tfr_dag_task_date' + + +def upgrade(): +"""Apply Add TemplatedFieldsRendered table""" +json_type = sa.JSON +conn = op.get_bind() # pylint: disable=no-member + +if conn.dialect.name != "postgresql": +# Mysql 5.7+/MariaDB 10.2.3 has JSON support. Rather than checking for +# versions, check for the function existing. +try: +conn.execute("SELECT JSON_VALID(1)").fetchone() +except sa.exc.OperationalError: +json_type = sa.Text + +op.create_table( +TABLE_NAME, # pylint: disable=no-member +sa.Column('dag_id', sa.String(length=250), nullable=False), +sa.Column('task_id', sa.String(length=250), nullable=False), +sa.Column('execution_date', sa.DateTime(), nullable=False), +sa.Column('rendered_fields', json_type(), nullable=True), +sa.PrimaryKeyConstraint('dag_id', 'task_id', 'execution_date') +) +op.create_index( # pylint: disable=no-member +INDEX_NAME, +TABLE_NAME, +['dag_id', 'task_id', 'execution_date'], +) + +if conn.dialect.name == "mysql": +conn.execute("SET time_zone = '+00:00'") Review comment: From the same [article](https://www.eversql.com/mysql-datetime-vs-timestamp-column-types-which-one-i-should-use/) : >To summarize, if you want to serve your date and time data the same way regardless of timezones, you can use the DATETIME type (which will also allow you to use all the date & type functions built in MySQL). Otherwise, you can use TIMESTAMP and serve the data on a per-timezone basis. Also, from https://docs.sqlalchemy.org/en/13/core/type_basics.html#sqlalchemy.types.DateTime >For timezone support, use at least the TIMESTAMP datatype, if not the dialect-specific datatype object. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files URL: https://github.com/apache/airflow/pull/6788#discussion_r372224607 ## File path: airflow/migrations/versions/852ae6c715af_add_templated_fields_rendered_table.py ## @@ -0,0 +1,103 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Add TemplatedFieldsRendered table + +Revision ID: 852ae6c715af +Revises: 1d3a23a719bf +Create Date: 2019-12-10 22:19:18.034961 + +""" + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import mysql + +# revision identifiers, used by Alembic. +revision = '852ae6c715af' +down_revision = '1d3a23a719bf' +branch_labels = None +depends_on = None + +TABLE_NAME = 'templated_fields_rendered' +INDEX_NAME = 'idx_tfr_dag_task_date' + + +def upgrade(): +"""Apply Add TemplatedFieldsRendered table""" +json_type = sa.JSON +conn = op.get_bind() # pylint: disable=no-member + +if conn.dialect.name != "postgresql": +# Mysql 5.7+/MariaDB 10.2.3 has JSON support. Rather than checking for +# versions, check for the function existing. +try: +conn.execute("SELECT JSON_VALID(1)").fetchone() +except sa.exc.OperationalError: +json_type = sa.Text + +op.create_table( +TABLE_NAME, # pylint: disable=no-member +sa.Column('dag_id', sa.String(length=250), nullable=False), +sa.Column('task_id', sa.String(length=250), nullable=False), +sa.Column('execution_date', sa.DateTime(), nullable=False), +sa.Column('rendered_fields', json_type(), nullable=True), +sa.PrimaryKeyConstraint('dag_id', 'task_id', 'execution_date') +) +op.create_index( # pylint: disable=no-member +INDEX_NAME, +TABLE_NAME, +['dag_id', 'task_id', 'execution_date'], +) + +if conn.dialect.name == "mysql": +conn.execute("SET time_zone = '+00:00'") Review comment: It won't contain the timezones though if we just use DATETIME! It might have undesirable effects when comparing current time with this time! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files URL: https://github.com/apache/airflow/pull/6788#discussion_r372224848 ## File path: airflow/migrations/versions/1d3a23a719bf_merge_heads.py ## @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""empty message + +Revision ID: 1d3a23a719bf +Revises: c1840b4bcf1a, fe461863935f +Create Date: 2019-12-10 22:18:09.253540 + +""" + +# revision identifiers, used by Alembic. +revision = '1d3a23a719bf' +down_revision = ('c1840b4bcf1a', 'fe461863935f') +branch_labels = None +depends_on = None + + +def upgrade(): +"""Apply empty message""" +pass + + +def downgrade(): +"""Unapply empty message""" Review comment: These are outdated, I need to update it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files URL: https://github.com/apache/airflow/pull/6788#discussion_r372224848 ## File path: airflow/migrations/versions/1d3a23a719bf_merge_heads.py ## @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""empty message + +Revision ID: 1d3a23a719bf +Revises: c1840b4bcf1a, fe461863935f +Create Date: 2019-12-10 22:18:09.253540 + +""" + +# revision identifiers, used by Alembic. +revision = '1d3a23a719bf' +down_revision = ('c1840b4bcf1a', 'fe461863935f') +branch_labels = None +depends_on = None + + +def upgrade(): +"""Apply empty message""" +pass + + +def downgrade(): +"""Unapply empty message""" Review comment: This are outdated, I need to update it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files
kaxil commented on a change in pull request #6788: WIP: [AIRFLOW-5944] Rendering templated_fields without accessing DAG files URL: https://github.com/apache/airflow/pull/6788#discussion_r372224607 ## File path: airflow/migrations/versions/852ae6c715af_add_templated_fields_rendered_table.py ## @@ -0,0 +1,103 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Add TemplatedFieldsRendered table + +Revision ID: 852ae6c715af +Revises: 1d3a23a719bf +Create Date: 2019-12-10 22:19:18.034961 + +""" + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import mysql + +# revision identifiers, used by Alembic. +revision = '852ae6c715af' +down_revision = '1d3a23a719bf' +branch_labels = None +depends_on = None + +TABLE_NAME = 'templated_fields_rendered' +INDEX_NAME = 'idx_tfr_dag_task_date' + + +def upgrade(): +"""Apply Add TemplatedFieldsRendered table""" +json_type = sa.JSON +conn = op.get_bind() # pylint: disable=no-member + +if conn.dialect.name != "postgresql": +# Mysql 5.7+/MariaDB 10.2.3 has JSON support. Rather than checking for +# versions, check for the function existing. +try: +conn.execute("SELECT JSON_VALID(1)").fetchone() +except sa.exc.OperationalError: +json_type = sa.Text + +op.create_table( +TABLE_NAME, # pylint: disable=no-member +sa.Column('dag_id', sa.String(length=250), nullable=False), +sa.Column('task_id', sa.String(length=250), nullable=False), +sa.Column('execution_date', sa.DateTime(), nullable=False), +sa.Column('rendered_fields', json_type(), nullable=True), +sa.PrimaryKeyConstraint('dag_id', 'task_id', 'execution_date') +) +op.create_index( # pylint: disable=no-member +INDEX_NAME, +TABLE_NAME, +['dag_id', 'task_id', 'execution_date'], +) + +if conn.dialect.name == "mysql": +conn.execute("SET time_zone = '+00:00'") Review comment: It won't contain the timezones though if we just use DATETIME! It might have undesirable effects when comparing current time vs this time! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services