[jira] [Created] (AIRFLOW-6916) Replace SimpleDag & SimpleDagBag representation with Serialized Dag
Kaxil Naik created AIRFLOW-6916: --- Summary: Replace SimpleDag & SimpleDagBag representation with Serialized Dag Key: AIRFLOW-6916 URL: https://issues.apache.org/jira/browse/AIRFLOW-6916 Project: Apache Airflow Issue Type: Improvement Components: serialization Affects Versions: 2.0.0 Reporter: Kaxil Naik Assignee: Kaxil Naik Fix For: 2.0.0 Currently, we have 2 Serialized Representation: * SimpleDags (were created because SimpleDags were not pickleable) * Serialized DAG We should remove SimpleDags -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (AIRFLOW-6916) Replace SimpleDag & SimpleDagBag representation with Serialized Dag
[ https://issues.apache.org/jira/browse/AIRFLOW-6916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kaxil Naik updated AIRFLOW-6916: Labels: dag-serialization (was: ) > Replace SimpleDag & SimpleDagBag representation with Serialized Dag > --- > > Key: AIRFLOW-6916 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6916 > Project: Apache Airflow > Issue Type: Improvement > Components: serialization >Affects Versions: 2.0.0 >Reporter: Kaxil Naik >Assignee: Kaxil Naik >Priority: Major > Labels: dag-serialization > Fix For: 2.0.0 > > > Currently, we have 2 Serialized Representation: > * SimpleDags (were created because SimpleDags were not pickleable) > * Serialized DAG > We should remove SimpleDags -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] kaxil commented on a change in pull request #7535: [AIRFLOW-6915] Adds AI Platform Console link for MLEngineStartTrainingOperator
kaxil commented on a change in pull request #7535: [AIRFLOW-6915] Adds AI Platform Console link for MLEngineStartTrainingOperator URL: https://github.com/apache/airflow/pull/7535#discussion_r384106760 ## File path: tests/providers/google/cloud/operators/test_mlengine.py ## @@ -404,6 +405,34 @@ def test_failed_job_error(self, mock_hook): project_id='test-project', job=self.TRAINING_INPUT, use_existing_job_fn=ANY) self.assertEqual('A failure message', str(context.exception)) +@patch('airflow.providers.google.cloud.operators.mlengine.MLEngineHook') +def test_console_extra_link(self, mock_hook): +training_op = MLEngineStartTrainingJobOperator( +**self.TRAINING_DEFAULT_ARGS) + +ti = TaskInstance( +task=training_op, +execution_date=DEFAULT_DATE, +) + +job_id = self.TRAINING_DEFAULT_ARGS['job_id'] +project_id = self.TRAINING_DEFAULT_ARGS['project_id'] +gcp_metadata = { +"job_id": job_id, +"project_id": project_id, +} +ti.xcom_push(key='gcp_metadata', value=gcp_metadata) + +self.assertEqual( + f"https://console.cloud.google.com/ai-platform/jobs/{job_id}?project={project_id}";, +training_op.get_extra_links(DEFAULT_DATE, AIPlatformConsoleLink.name), +) + +self.assertEqual( +'', +training_op.get_extra_links(datetime.datetime(2019, 1, 1), AIPlatformConsoleLink.name), +) + Review comment: We need to add a test to check this Operator link works well with Serialization like: https://github.com/apache/airflow/blob/746d8de2fcaa9554b4ce7dbf261e4ab148233222/tests/providers/google/cloud/operators/test_bigquery.py#L503-L538 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] jj-ookla commented on a change in pull request #6670: [AIRFLOW-4816]MySqlToS3Operator
jj-ookla commented on a change in pull request #6670: [AIRFLOW-4816]MySqlToS3Operator URL: https://github.com/apache/airflow/pull/6670#discussion_r384067287 ## File path: airflow/operators/mysql_to_s3_operator.py ## @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Transfer data from MySQL into a S3 bucket +""" +from io import StringIO +from typing import Optional, Union + +import numpy as np +import pandas as pd + +from airflow.hooks.mysql_hook import MySqlHook +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.utils.decorators import apply_defaults + + +class MySQLToS3Operator(BaseOperator): +""" +Saves data from an specific MySQL query into a file in S3. + +:param query: the sql query to be executed. +:type query: str +:param s3_bucket: bucket where the data will be stored +:type s3_bucket: str +:param s3_key: desired key for the file. It includes the name of the file. +If a csv file is wanted, the param must end with ".csv". +:type s3_key: str +:param mysql_conn_id: reference to a specific mysql database +:type mysql_conn_id: str + :param aws_conn_id: reference to a specific S3 connection +:type aws_conn_id: str +:param verify: Whether or not to verify SSL certificates for S3 connection. +By default SSL certificates are verified. +You can provide the following values: + +- False: do not validate SSL certificates. SSL will still be used + (unless use_ssl is False), but SSL certificates will not be + verified. +- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses. + You can specify this argument if you want to use a different + CA cert bundle than the one used by botocore. +:type verify: bool or str +""" + +@apply_defaults +def __init__( +self, +query: str, +s3_bucket: str, +s3_key: str, +mysql_conn_id: str = 'mysql_default', +aws_conn_id: str = 'aws_default', +verify: Optional[Union[bool, str]] = None, +header: Optional[bool] = False, +index: Optional[bool] = False, +*args, **kwargs) -> None: +super().__init__(*args, **kwargs) +self.query = query +self.s3_bucket = s3_bucket +self.s3_key = s3_key +self.mysql_conn_id = mysql_conn_id +self.aws_conn_id = aws_conn_id +self.verify = verify +self.header = header +self.index = index + +def fill_na_with_none(self, series): +"""Replace NaN values with None""" +return np.where(series.isnull(), None, series) + +def fix_int_dtypes(self, df): +""" +Mutate DataFrame to set dtypes for int columns containing NaN values." +""" +for col in df: +if "float" in df[col].dtype.name and df[col].hasnans: +# inspect values to determine if dtype of non-null values is int or float +notna_series = df[col].dropna().values +if np.isclose(notna_series, notna_series.astype(int)).all(): +# set to dtype that retains integers and supports NaNs +df[col] = self.fill_na_with_none(df[col]).astype(pd.Int64Dtype) + +def execute(self, context, **kwargs): +self.hook = MySqlHook(mysql_conn_id=self.mysql_conn_id) +self.s3_conn = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) +data_df = self.hook.get_pandas_df(self.query) +self.log.info("Data from MySQL obtained") + +self.fix_int_dtypes(data_df) +file_obj = StringIO() Review comment: I think including the option is a good idea. The implementation is tricky to support index / header kwargs and any other supported by the pandas method. I have a custom operator that works similar to this and settled on an approach similar to what is below. The overall goal being to set `index=False` by default. ```Python def __init__( self, ..., index=False, pd
[GitHub] [airflow] JavierLopezT commented on a change in pull request #6670: [AIRFLOW-4816]MySqlToS3Operator
JavierLopezT commented on a change in pull request #6670: [AIRFLOW-4816]MySqlToS3Operator URL: https://github.com/apache/airflow/pull/6670#discussion_r384056558 ## File path: airflow/operators/mysql_to_s3_operator.py ## @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Transfer data from MySQL into a S3 bucket +""" +from io import StringIO +from typing import Optional, Union + +import numpy as np +import pandas as pd + +from airflow.hooks.mysql_hook import MySqlHook +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.utils.decorators import apply_defaults + + +class MySQLToS3Operator(BaseOperator): +""" +Saves data from an specific MySQL query into a file in S3. + +:param query: the sql query to be executed. +:type query: str +:param s3_bucket: bucket where the data will be stored +:type s3_bucket: str +:param s3_key: desired key for the file. It includes the name of the file. +If a csv file is wanted, the param must end with ".csv". +:type s3_key: str +:param mysql_conn_id: reference to a specific mysql database +:type mysql_conn_id: str + :param aws_conn_id: reference to a specific S3 connection +:type aws_conn_id: str +:param verify: Whether or not to verify SSL certificates for S3 connection. +By default SSL certificates are verified. +You can provide the following values: + +- False: do not validate SSL certificates. SSL will still be used + (unless use_ssl is False), but SSL certificates will not be + verified. +- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses. + You can specify this argument if you want to use a different + CA cert bundle than the one used by botocore. +:type verify: bool or str +""" + +@apply_defaults +def __init__( +self, +query: str, +s3_bucket: str, +s3_key: str, +mysql_conn_id: str = 'mysql_default', +aws_conn_id: str = 'aws_default', +verify: Optional[Union[bool, str]] = None, +header: Optional[bool] = False, +index: Optional[bool] = False, +*args, **kwargs) -> None: +super().__init__(*args, **kwargs) +self.query = query +self.s3_bucket = s3_bucket +self.s3_key = s3_key +self.mysql_conn_id = mysql_conn_id +self.aws_conn_id = aws_conn_id +self.verify = verify +self.header = header +self.index = index + +def fill_na_with_none(self, series): +"""Replace NaN values with None""" +return np.where(series.isnull(), None, series) + +def fix_int_dtypes(self, df): +""" +Mutate DataFrame to set dtypes for int columns containing NaN values." +""" +for col in df: +if "float" in df[col].dtype.name and df[col].hasnans: +# inspect values to determine if dtype of non-null values is int or float +notna_series = df[col].dropna().values +if np.isclose(notna_series, notna_series.astype(int)).all(): +# set to dtype that retains integers and supports NaNs +df[col] = self.fill_na_with_none(df[col]).astype(pd.Int64Dtype) + +def execute(self, context, **kwargs): +self.hook = MySqlHook(mysql_conn_id=self.mysql_conn_id) +self.s3_conn = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) +data_df = self.hook.get_pandas_df(self.query) +self.log.info("Data from MySQL obtained") + +self.fix_int_dtypes(data_df) +file_obj = StringIO() Review comment: Thank you very much for your help. Regarding the kwargs, I already had included both index and header in the __init__ as ``` header: Optional[bool] = False, index: Optional[bool] = False ``` But I indeed forgot to include them in the code. Do you recommend to change that to your pd_csv_kwargs? This is an automated me
[GitHub] [airflow] JavierLopezT commented on a change in pull request #6670: [AIRFLOW-4816]MySqlToS3Operator
JavierLopezT commented on a change in pull request #6670: [AIRFLOW-4816]MySqlToS3Operator URL: https://github.com/apache/airflow/pull/6670#discussion_r384056558 ## File path: airflow/operators/mysql_to_s3_operator.py ## @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Transfer data from MySQL into a S3 bucket +""" +from io import StringIO +from typing import Optional, Union + +import numpy as np +import pandas as pd + +from airflow.hooks.mysql_hook import MySqlHook +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.utils.decorators import apply_defaults + + +class MySQLToS3Operator(BaseOperator): +""" +Saves data from an specific MySQL query into a file in S3. + +:param query: the sql query to be executed. +:type query: str +:param s3_bucket: bucket where the data will be stored +:type s3_bucket: str +:param s3_key: desired key for the file. It includes the name of the file. +If a csv file is wanted, the param must end with ".csv". +:type s3_key: str +:param mysql_conn_id: reference to a specific mysql database +:type mysql_conn_id: str + :param aws_conn_id: reference to a specific S3 connection +:type aws_conn_id: str +:param verify: Whether or not to verify SSL certificates for S3 connection. +By default SSL certificates are verified. +You can provide the following values: + +- False: do not validate SSL certificates. SSL will still be used + (unless use_ssl is False), but SSL certificates will not be + verified. +- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses. + You can specify this argument if you want to use a different + CA cert bundle than the one used by botocore. +:type verify: bool or str +""" + +@apply_defaults +def __init__( +self, +query: str, +s3_bucket: str, +s3_key: str, +mysql_conn_id: str = 'mysql_default', +aws_conn_id: str = 'aws_default', +verify: Optional[Union[bool, str]] = None, +header: Optional[bool] = False, +index: Optional[bool] = False, +*args, **kwargs) -> None: +super().__init__(*args, **kwargs) +self.query = query +self.s3_bucket = s3_bucket +self.s3_key = s3_key +self.mysql_conn_id = mysql_conn_id +self.aws_conn_id = aws_conn_id +self.verify = verify +self.header = header +self.index = index + +def fill_na_with_none(self, series): +"""Replace NaN values with None""" +return np.where(series.isnull(), None, series) + +def fix_int_dtypes(self, df): +""" +Mutate DataFrame to set dtypes for int columns containing NaN values." +""" +for col in df: +if "float" in df[col].dtype.name and df[col].hasnans: +# inspect values to determine if dtype of non-null values is int or float +notna_series = df[col].dropna().values +if np.isclose(notna_series, notna_series.astype(int)).all(): +# set to dtype that retains integers and supports NaNs +df[col] = self.fill_na_with_none(df[col]).astype(pd.Int64Dtype) + +def execute(self, context, **kwargs): +self.hook = MySqlHook(mysql_conn_id=self.mysql_conn_id) +self.s3_conn = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) +data_df = self.hook.get_pandas_df(self.query) +self.log.info("Data from MySQL obtained") + +self.fix_int_dtypes(data_df) +file_obj = StringIO() Review comment: Thank you very much for your help. Regarding the kwargs, I already had included both index and header in the __init__ as ``` header: Optional[bool] = False, index: Optional[bool] = False ``` But I indeed forgot to include them in the code. Do you recommend to change that to your pd_csv_kwargs? This is an
[GitHub] [airflow] JavierLopezT commented on a change in pull request #6670: [AIRFLOW-4816]MySqlToS3Operator
JavierLopezT commented on a change in pull request #6670: [AIRFLOW-4816]MySqlToS3Operator URL: https://github.com/apache/airflow/pull/6670#discussion_r384056558 ## File path: airflow/operators/mysql_to_s3_operator.py ## @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Transfer data from MySQL into a S3 bucket +""" +from io import StringIO +from typing import Optional, Union + +import numpy as np +import pandas as pd + +from airflow.hooks.mysql_hook import MySqlHook +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.utils.decorators import apply_defaults + + +class MySQLToS3Operator(BaseOperator): +""" +Saves data from an specific MySQL query into a file in S3. + +:param query: the sql query to be executed. +:type query: str +:param s3_bucket: bucket where the data will be stored +:type s3_bucket: str +:param s3_key: desired key for the file. It includes the name of the file. +If a csv file is wanted, the param must end with ".csv". +:type s3_key: str +:param mysql_conn_id: reference to a specific mysql database +:type mysql_conn_id: str + :param aws_conn_id: reference to a specific S3 connection +:type aws_conn_id: str +:param verify: Whether or not to verify SSL certificates for S3 connection. +By default SSL certificates are verified. +You can provide the following values: + +- False: do not validate SSL certificates. SSL will still be used + (unless use_ssl is False), but SSL certificates will not be + verified. +- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses. + You can specify this argument if you want to use a different + CA cert bundle than the one used by botocore. +:type verify: bool or str +""" + +@apply_defaults +def __init__( +self, +query: str, +s3_bucket: str, +s3_key: str, +mysql_conn_id: str = 'mysql_default', +aws_conn_id: str = 'aws_default', +verify: Optional[Union[bool, str]] = None, +header: Optional[bool] = False, +index: Optional[bool] = False, +*args, **kwargs) -> None: +super().__init__(*args, **kwargs) +self.query = query +self.s3_bucket = s3_bucket +self.s3_key = s3_key +self.mysql_conn_id = mysql_conn_id +self.aws_conn_id = aws_conn_id +self.verify = verify +self.header = header +self.index = index + +def fill_na_with_none(self, series): +"""Replace NaN values with None""" +return np.where(series.isnull(), None, series) + +def fix_int_dtypes(self, df): +""" +Mutate DataFrame to set dtypes for int columns containing NaN values." +""" +for col in df: +if "float" in df[col].dtype.name and df[col].hasnans: +# inspect values to determine if dtype of non-null values is int or float +notna_series = df[col].dropna().values +if np.isclose(notna_series, notna_series.astype(int)).all(): +# set to dtype that retains integers and supports NaNs +df[col] = self.fill_na_with_none(df[col]).astype(pd.Int64Dtype) + +def execute(self, context, **kwargs): +self.hook = MySqlHook(mysql_conn_id=self.mysql_conn_id) +self.s3_conn = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) +data_df = self.hook.get_pandas_df(self.query) +self.log.info("Data from MySQL obtained") + +self.fix_int_dtypes(data_df) +file_obj = StringIO() Review comment: Thank you very much for your help. Regarding the kwargs, I already had included both index and header in the __init__ as ``` header: Optional[bool] = False, index: Optional[bool] = False ``` But I indeed forgot to include them in the code. Do you recommend to change that to your pd_csv_kwargs? This is an auto
[GitHub] [airflow] jj-ookla commented on a change in pull request #6670: [AIRFLOW-4816]MySqlToS3Operator
jj-ookla commented on a change in pull request #6670: [AIRFLOW-4816]MySqlToS3Operator URL: https://github.com/apache/airflow/pull/6670#discussion_r384052041 ## File path: airflow/operators/mysql_to_s3_operator.py ## @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Transfer data from MySQL into a S3 bucket +""" +from io import StringIO +from typing import Optional, Union + +import numpy as np +import pandas as pd + +from airflow.hooks.mysql_hook import MySqlHook +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.utils.decorators import apply_defaults + + +class MySQLToS3Operator(BaseOperator): +""" +Saves data from an specific MySQL query into a file in S3. + +:param query: the sql query to be executed. +:type query: str +:param s3_bucket: bucket where the data will be stored +:type s3_bucket: str +:param s3_key: desired key for the file. It includes the name of the file. +If a csv file is wanted, the param must end with ".csv". +:type s3_key: str +:param mysql_conn_id: reference to a specific mysql database +:type mysql_conn_id: str + :param aws_conn_id: reference to a specific S3 connection +:type aws_conn_id: str +:param verify: Whether or not to verify SSL certificates for S3 connection. +By default SSL certificates are verified. +You can provide the following values: + +- False: do not validate SSL certificates. SSL will still be used + (unless use_ssl is False), but SSL certificates will not be + verified. +- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses. + You can specify this argument if you want to use a different + CA cert bundle than the one used by botocore. +:type verify: bool or str +""" + +@apply_defaults +def __init__( +self, +query: str, +s3_bucket: str, +s3_key: str, +mysql_conn_id: str = 'mysql_default', +aws_conn_id: str = 'aws_default', +verify: Optional[Union[bool, str]] = None, +header: Optional[bool] = False, +index: Optional[bool] = False, +*args, **kwargs) -> None: +super().__init__(*args, **kwargs) +self.query = query +self.s3_bucket = s3_bucket +self.s3_key = s3_key +self.mysql_conn_id = mysql_conn_id +self.aws_conn_id = aws_conn_id +self.verify = verify +self.header = header +self.index = index + +def fill_na_with_none(self, series): +"""Replace NaN values with None""" +return np.where(series.isnull(), None, series) + +def fix_int_dtypes(self, df): +""" +Mutate DataFrame to set dtypes for int columns containing NaN values." +""" +for col in df: +if "float" in df[col].dtype.name and df[col].hasnans: +# inspect values to determine if dtype of non-null values is int or float +notna_series = df[col].dropna().values +if np.isclose(notna_series, notna_series.astype(int)).all(): +# set to dtype that retains integers and supports NaNs +df[col] = self.fill_na_with_none(df[col]).astype(pd.Int64Dtype) + +def execute(self, context, **kwargs): +self.hook = MySqlHook(mysql_conn_id=self.mysql_conn_id) +self.s3_conn = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) +data_df = self.hook.get_pandas_df(self.query) +self.log.info("Data from MySQL obtained") + +self.fix_int_dtypes(data_df) +file_obj = StringIO() Review comment: Gotcha. This has been an issue for me in the past, as well. `NamedTemporaryFile` needs to be open in `r+` to allow for writing / reading strings to support: - `df.to_csv` string output - s3_hook reading from file One caveat is that the to_csv kwarg `index` should be set to False to enforce compatibility when loading from S3 into Redshift / Athena / MySQL. I would recommend inclu
[GitHub] [airflow] jj-ookla commented on a change in pull request #6670: [AIRFLOW-4816]MySqlToS3Operator
jj-ookla commented on a change in pull request #6670: [AIRFLOW-4816]MySqlToS3Operator URL: https://github.com/apache/airflow/pull/6670#discussion_r384052041 ## File path: airflow/operators/mysql_to_s3_operator.py ## @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Transfer data from MySQL into a S3 bucket +""" +from io import StringIO +from typing import Optional, Union + +import numpy as np +import pandas as pd + +from airflow.hooks.mysql_hook import MySqlHook +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.utils.decorators import apply_defaults + + +class MySQLToS3Operator(BaseOperator): +""" +Saves data from an specific MySQL query into a file in S3. + +:param query: the sql query to be executed. +:type query: str +:param s3_bucket: bucket where the data will be stored +:type s3_bucket: str +:param s3_key: desired key for the file. It includes the name of the file. +If a csv file is wanted, the param must end with ".csv". +:type s3_key: str +:param mysql_conn_id: reference to a specific mysql database +:type mysql_conn_id: str + :param aws_conn_id: reference to a specific S3 connection +:type aws_conn_id: str +:param verify: Whether or not to verify SSL certificates for S3 connection. +By default SSL certificates are verified. +You can provide the following values: + +- False: do not validate SSL certificates. SSL will still be used + (unless use_ssl is False), but SSL certificates will not be + verified. +- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses. + You can specify this argument if you want to use a different + CA cert bundle than the one used by botocore. +:type verify: bool or str +""" + +@apply_defaults +def __init__( +self, +query: str, +s3_bucket: str, +s3_key: str, +mysql_conn_id: str = 'mysql_default', +aws_conn_id: str = 'aws_default', +verify: Optional[Union[bool, str]] = None, +header: Optional[bool] = False, +index: Optional[bool] = False, +*args, **kwargs) -> None: +super().__init__(*args, **kwargs) +self.query = query +self.s3_bucket = s3_bucket +self.s3_key = s3_key +self.mysql_conn_id = mysql_conn_id +self.aws_conn_id = aws_conn_id +self.verify = verify +self.header = header +self.index = index + +def fill_na_with_none(self, series): +"""Replace NaN values with None""" +return np.where(series.isnull(), None, series) + +def fix_int_dtypes(self, df): +""" +Mutate DataFrame to set dtypes for int columns containing NaN values." +""" +for col in df: +if "float" in df[col].dtype.name and df[col].hasnans: +# inspect values to determine if dtype of non-null values is int or float +notna_series = df[col].dropna().values +if np.isclose(notna_series, notna_series.astype(int)).all(): +# set to dtype that retains integers and supports NaNs +df[col] = self.fill_na_with_none(df[col]).astype(pd.Int64Dtype) + +def execute(self, context, **kwargs): +self.hook = MySqlHook(mysql_conn_id=self.mysql_conn_id) +self.s3_conn = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) +data_df = self.hook.get_pandas_df(self.query) +self.log.info("Data from MySQL obtained") + +self.fix_int_dtypes(data_df) +file_obj = StringIO() Review comment: Gotcha. This has been an issue for me in the past, as well. One caveat is that the to_csv kwarg `index` should be set to False to enforce compatibility when loading from S3 into Redshift / Athena / MySQL. I would recommend including a `pd_csv_kwargs` in the operator to allow for users to set these options as needed. ```Python # in operator pd_csv_kwargs = {"index": False}
[GitHub] [airflow] brandonwillard edited a comment on issue #7423: [AIRFLOW-3126] Add option to specify additional K8s volumes
brandonwillard edited a comment on issue #7423: [AIRFLOW-3126] Add option to specify additional K8s volumes URL: https://github.com/apache/airflow/pull/7423#issuecomment-590958733 @davlum, I've tried it and It did not appear to help—specifically, for default worker pod creation under the K8s Executor. It required too much specificity in the template and seemed to deactivate most other configuration options. Can you give an example of how it can be used to add volumes in exactly this way without sacrificing any of the existing functionality/configurability of worker pod creation? Also, I don't see how it could possibly help with https://github.com/apache/airflow/pull/7405. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] konpap94 edited a comment on issue #6337: [AIRFLOW-5659] - Add support for ephemeral storage on KubernetesPodOp…
konpap94 edited a comment on issue #6337: [AIRFLOW-5659] - Add support for ephemeral storage on KubernetesPodOp… URL: https://github.com/apache/airflow/pull/6337#issuecomment-587121417 Is there any update on this being merged in? @potiuk This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] JavierLopezT commented on a change in pull request #6670: [AIRFLOW-4816]MySqlToS3Operator
JavierLopezT commented on a change in pull request #6670: [AIRFLOW-4816]MySqlToS3Operator URL: https://github.com/apache/airflow/pull/6670#discussion_r384031703 ## File path: airflow/operators/mysql_to_s3_operator.py ## @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Transfer data from MySQL into a S3 bucket +""" +from io import StringIO +from typing import Optional, Union + +import numpy as np +import pandas as pd + +from airflow.hooks.mysql_hook import MySqlHook +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.utils.decorators import apply_defaults + + +class MySQLToS3Operator(BaseOperator): +""" +Saves data from an specific MySQL query into a file in S3. + +:param query: the sql query to be executed. +:type query: str +:param s3_bucket: bucket where the data will be stored +:type s3_bucket: str +:param s3_key: desired key for the file. It includes the name of the file. +If a csv file is wanted, the param must end with ".csv". +:type s3_key: str +:param mysql_conn_id: reference to a specific mysql database +:type mysql_conn_id: str + :param aws_conn_id: reference to a specific S3 connection +:type aws_conn_id: str +:param verify: Whether or not to verify SSL certificates for S3 connection. +By default SSL certificates are verified. +You can provide the following values: + +- False: do not validate SSL certificates. SSL will still be used + (unless use_ssl is False), but SSL certificates will not be + verified. +- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses. + You can specify this argument if you want to use a different + CA cert bundle than the one used by botocore. +:type verify: bool or str +""" + +@apply_defaults +def __init__( +self, +query: str, +s3_bucket: str, +s3_key: str, +mysql_conn_id: str = 'mysql_default', +aws_conn_id: str = 'aws_default', +verify: Optional[Union[bool, str]] = None, +header: Optional[bool] = False, +index: Optional[bool] = False, +*args, **kwargs) -> None: +super().__init__(*args, **kwargs) +self.query = query +self.s3_bucket = s3_bucket +self.s3_key = s3_key +self.mysql_conn_id = mysql_conn_id +self.aws_conn_id = aws_conn_id +self.verify = verify +self.header = header +self.index = index + +def fill_na_with_none(self, series): +"""Replace NaN values with None""" +return np.where(series.isnull(), None, series) + +def fix_int_dtypes(self, df): +""" +Mutate DataFrame to set dtypes for int columns containing NaN values." +""" +for col in df: +if "float" in df[col].dtype.name and df[col].hasnans: +# inspect values to determine if dtype of non-null values is int or float +notna_series = df[col].dropna().values +if np.isclose(notna_series, notna_series.astype(int)).all(): +# set to dtype that retains integers and supports NaNs +df[col] = self.fill_na_with_none(df[col]).astype(pd.Int64Dtype) + +def execute(self, context, **kwargs): +self.hook = MySqlHook(mysql_conn_id=self.mysql_conn_id) +self.s3_conn = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) +data_df = self.hook.get_pandas_df(self.query) +self.log.info("Data from MySQL obtained") + +self.fix_int_dtypes(data_df) +file_obj = StringIO() Review comment: > Why is the DataFrame pickled then written to a file with a csv extension? This would definitely explain why the csv is unreadable. > > https://github.com/apache/airflow/blob/69ff9f3dbf25ecefc627f95bbb75e3b9b9835b7c/airflow/operators/mysql_to_s3_operator.py#L108 Because if you put the code as @nuclearpinguin proposed originally you get the error: `AttributeError: 'DataFrame' obje
[GitHub] [airflow] codecov-io edited a comment on issue #6652: [AIRFLOW-5548] [AIRFLOW-5550] REST API enhancement - dag info, task …
codecov-io edited a comment on issue #6652: [AIRFLOW-5548] [AIRFLOW-5550] REST API enhancement - dag info, task … URL: https://github.com/apache/airflow/pull/6652#issuecomment-558277657 # [Codecov](https://codecov.io/gh/apache/airflow/pull/6652?src=pr&el=h1) Report > Merging [#6652](https://codecov.io/gh/apache/airflow/pull/6652?src=pr&el=desc) into [master](https://codecov.io/gh/apache/airflow/commit/746d8de2fcaa9554b4ce7dbf261e4ab148233222?src=pr&el=desc) will **increase** coverage by `<.01%`. > The diff coverage is `88.18%`. [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/6652/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/6652?src=pr&el=tree) ```diff @@Coverage Diff @@ ## master#6652 +/- ## == + Coverage 86.85% 86.86% +<.01% == Files 896 899 +3 Lines 4263442740 +106 == + Hits3703137124 +93 - Misses 5603 5616 +13 ``` | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/6652?src=pr&el=tree) | Coverage Δ | | |---|---|---| | [airflow/api/common/experimental/get\_dag.py](https://codecov.io/gh/apache/airflow/pull/6652/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9nZXRfZGFnLnB5) | `100% <100%> (ø)` | | | [airflow/api/common/experimental/get\_dags.py](https://codecov.io/gh/apache/airflow/pull/6652/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9nZXRfZGFncy5weQ==) | `100% <100%> (ø)` | | | [airflow/api/common/experimental/get\_tasks.py](https://codecov.io/gh/apache/airflow/pull/6652/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9nZXRfdGFza3MucHk=) | `100% <100%> (ø)` | | | [airflow/www/api/experimental/endpoints.py](https://codecov.io/gh/apache/airflow/pull/6652/diff?src=pr&el=tree#diff-YWlyZmxvdy93d3cvYXBpL2V4cGVyaW1lbnRhbC9lbmRwb2ludHMucHk=) | `88.32% <83.05%> (-1.5%)` | :arrow_down: | | [airflow/api/common/experimental/get\_task.py](https://codecov.io/gh/apache/airflow/pull/6652/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9nZXRfdGFzay5weQ==) | `88.46% <86.95%> (-11.54%)` | :arrow_down: | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/6652?src=pr&el=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/6652?src=pr&el=footer). Last update [746d8de...189a2a8](https://codecov.io/gh/apache/airflow/pull/6652?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] jj-ookla commented on a change in pull request #6670: [AIRFLOW-4816]MySqlToS3Operator
jj-ookla commented on a change in pull request #6670: [AIRFLOW-4816]MySqlToS3Operator URL: https://github.com/apache/airflow/pull/6670#discussion_r384017782 ## File path: airflow/operators/mysql_to_s3_operator.py ## @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Transfer data from MySQL into a S3 bucket +""" +from io import StringIO +from typing import Optional, Union + +import numpy as np +import pandas as pd + +from airflow.hooks.mysql_hook import MySqlHook +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.utils.decorators import apply_defaults + + +class MySQLToS3Operator(BaseOperator): +""" +Saves data from an specific MySQL query into a file in S3. + +:param query: the sql query to be executed. +:type query: str +:param s3_bucket: bucket where the data will be stored +:type s3_bucket: str +:param s3_key: desired key for the file. It includes the name of the file. +If a csv file is wanted, the param must end with ".csv". +:type s3_key: str +:param mysql_conn_id: reference to a specific mysql database +:type mysql_conn_id: str + :param aws_conn_id: reference to a specific S3 connection +:type aws_conn_id: str +:param verify: Whether or not to verify SSL certificates for S3 connection. +By default SSL certificates are verified. +You can provide the following values: + +- False: do not validate SSL certificates. SSL will still be used + (unless use_ssl is False), but SSL certificates will not be + verified. +- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses. + You can specify this argument if you want to use a different + CA cert bundle than the one used by botocore. +:type verify: bool or str +""" + +@apply_defaults +def __init__( +self, +query: str, +s3_bucket: str, +s3_key: str, +mysql_conn_id: str = 'mysql_default', +aws_conn_id: str = 'aws_default', +verify: Optional[Union[bool, str]] = None, +header: Optional[bool] = False, +index: Optional[bool] = False, +*args, **kwargs) -> None: +super().__init__(*args, **kwargs) +self.query = query +self.s3_bucket = s3_bucket +self.s3_key = s3_key +self.mysql_conn_id = mysql_conn_id +self.aws_conn_id = aws_conn_id +self.verify = verify +self.header = header +self.index = index + +def fill_na_with_none(self, series): +"""Replace NaN values with None""" +return np.where(series.isnull(), None, series) + +def fix_int_dtypes(self, df): +""" +Mutate DataFrame to set dtypes for int columns containing NaN values." +""" +for col in df: +if "float" in df[col].dtype.name and df[col].hasnans: +# inspect values to determine if dtype of non-null values is int or float +notna_series = df[col].dropna().values +if np.isclose(notna_series, notna_series.astype(int)).all(): +# set to dtype that retains integers and supports NaNs +df[col] = self.fill_na_with_none(df[col]).astype(pd.Int64Dtype) + +def execute(self, context, **kwargs): +self.hook = MySqlHook(mysql_conn_id=self.mysql_conn_id) +self.s3_conn = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) +data_df = self.hook.get_pandas_df(self.query) +self.log.info("Data from MySQL obtained") + +self.fix_int_dtypes(data_df) +file_obj = StringIO() Review comment: Why is the DataFrame pickled then written to a file with a csv extension? This would definitely explain why the csv is unreadable. https://github.com/apache/airflow/blob/69ff9f3dbf25ecefc627f95bbb75e3b9b9835b7c/airflow/operators/mysql_to_s3_operator.py#L108 This is an automated message from the Apache Git Service. To respond to t
[jira] [Updated] (AIRFLOW-5869) Creating DagRuns fails for Deserialized tasks with no start_date
[ https://issues.apache.org/jira/browse/AIRFLOW-5869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kaxil Naik updated AIRFLOW-5869: Labels: dag-serialization (was: ) > Creating DagRuns fails for Deserialized tasks with no start_date > > > Key: AIRFLOW-5869 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5869 > Project: Apache Airflow > Issue Type: Bug > Components: core >Affects Versions: 2.0.0 >Reporter: Kaxil Naik >Assignee: Kaxil Naik >Priority: Major > Labels: dag-serialization > Fix For: 1.10.7 > > > Deserialized operators do not always have start_date set. > That, for instance, breaks triggering dags. > See the code from DAG.create_dagrun(): > {code:python} > run = DagRun(...) > session.add(run) > session.commit() > run.dag = self > run.verify_integrity(session=session) # this validation fails because > run assumes that all operators have start_date set > run.refresh_from_db() > {code} > One of the optimisation > (https://github.com/coufon/airflow/commit/b5ee858f44f55818c589cf2c8bf3866fa5d50e30) > we did as part of DAG Serialization was to not store dates in tasks if they > have a matching date (start_date or end_date) in DAG. Unfortunately, when > triggering DAG containing such tasks, it fails on DagRun.run.verify_integrity. > The fix is to add the start_date when deserializing the operator. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (AIRFLOW-5943) DAG Serialization & DB Persistence
[ https://issues.apache.org/jira/browse/AIRFLOW-5943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kaxil Naik updated AIRFLOW-5943: Labels: dag-serialization (was: ) > DAG Serialization & DB Persistence > -- > > Key: AIRFLOW-5943 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5943 > Project: Apache Airflow > Issue Type: Epic > Components: core >Affects Versions: 1.10.5, 1.10.6 >Reporter: Kaxil Naik >Assignee: Kaxil Naik >Priority: Major > Labels: dag-serialization > > This Epic aims to decouple Webserver from having to parse the DAG files. > Short-term Scheduler will parse the DAG files and persist it in DB. Longer > term we would have a new airflow component "Serializer"/"Parser" that would > be the component parsing DAG files, serializing DAG objects and persisting it > to DB. This would be the Webserver & Scheduler would not need access to DAG > files. > More details Implementing AIP-24: > https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-24+DAG+Persistence+in+DB+using+JSON+for+Airflow+Webserver+and+%28optional%29+Scheduler -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (AIRFLOW-5946) Store & Read code from DB for Code View
[ https://issues.apache.org/jira/browse/AIRFLOW-5946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kaxil Naik updated AIRFLOW-5946: Labels: dag-serialization (was: ) > Store & Read code from DB for Code View > --- > > Key: AIRFLOW-5946 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5946 > Project: Apache Airflow > Issue Type: Improvement > Components: webserver >Affects Versions: 2.0.0, 1.10.7 >Reporter: Kaxil Naik >Assignee: Kaxil Naik >Priority: Major > Labels: dag-serialization > > To make Webserver not need DAG Files we need to find a way to get Code to > display in *Code View*. > - Store in lazy-loaded column in SerializedDag table > - Save in a new table with DAG_id and store versions as well. Add a limit of > last 10 versions. This is just needed by Code View so not a problem if we > store in New table > OR - Just keep as reading from file? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (AIRFLOW-5947) Make the json backend pluggable for DAG Serialization
[ https://issues.apache.org/jira/browse/AIRFLOW-5947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kaxil Naik updated AIRFLOW-5947: Labels: dag-serialization (was: ) > Make the json backend pluggable for DAG Serialization > - > > Key: AIRFLOW-5947 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5947 > Project: Apache Airflow > Issue Type: Improvement > Components: core, scheduler >Affects Versions: 2.0.0, 1.10.7 >Reporter: Kaxil Naik >Assignee: Kaxil Naik >Priority: Major > Labels: dag-serialization > Fix For: 1.10.7 > > > Allow users the option to choose the JSON library of their choice for DAG > Serialization. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (AIRFLOW-5948) Replace SimpleDag with serialized version
[ https://issues.apache.org/jira/browse/AIRFLOW-5948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kaxil Naik updated AIRFLOW-5948: Labels: dag-serialization (was: ) > Replace SimpleDag with serialized version > - > > Key: AIRFLOW-5948 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5948 > Project: Apache Airflow > Issue Type: Improvement > Components: core, scheduler >Affects Versions: 2.0.0, 1.10.7 >Reporter: Kaxil Naik >Assignee: Kaxil Naik >Priority: Critical > Labels: dag-serialization > > Replace SimpleDag with serialized version (json over multiprocessing) in > SchedulerJob etc., no other change in scheduler behaviour. (This doesn't make > sense long term, but does tidy up the code) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (AIRFLOW-5945) Fix limitation with OperatorLinks when using DAG Serialization
[ https://issues.apache.org/jira/browse/AIRFLOW-5945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kaxil Naik updated AIRFLOW-5945: Labels: dag-serialization (was: ) > Fix limitation with OperatorLinks when using DAG Serialization > -- > > Key: AIRFLOW-5945 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5945 > Project: Apache Airflow > Issue Type: Improvement > Components: core, webserver >Affects Versions: 2.0.0, 1.10.7 >Reporter: Kaxil Naik >Assignee: Kaxil Naik >Priority: Major > Labels: dag-serialization > Fix For: 1.10.7 > > > Operator links (Re-write the Quoble link to not need an operator of the right > class and add this to the docs) > or store the info in a new column in Task Instance or Serialized DAG table. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (AIRFLOW-5944) Rendering templated_fields without accessing DAG files
[ https://issues.apache.org/jira/browse/AIRFLOW-5944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kaxil Naik updated AIRFLOW-5944: Labels: dag-serialization (was: ) > Rendering templated_fields without accessing DAG files > -- > > Key: AIRFLOW-5944 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5944 > Project: Apache Airflow > Issue Type: Improvement > Components: core >Affects Versions: 2.0.0, 1.10.7 >Reporter: Kaxil Naik >Assignee: Kaxil Naik >Priority: Major > Labels: dag-serialization > > One of the limitations with the current implementation of DAG s10n is that > Templated fields still need access to DAG Files. > *Potential solution 1*: Store rendered version in the DB somewhere. > *Limitation*: Tasks that are not yet run won't be able to render the > templated fields (this is a useful debugging aid, especially for those > without CLI access to run `airflow tasks render`) . > *Potential solution 2*: Store both rendered & unrendered version in the DB > somewhere. This will allow getting past the Limitation in Solution (1). We > should store only the current unrendered version and store the last X number > of rendered versions. This will also have an advantage that old task would > have the rendered version that was correct at that time (which is not > possible currently). > *Limitation/Issue*: Maintaining two separate tables without much benefit > *Potential solution 3*: Store rendered version in TI table and unrendered > version in a separate column in DAG serialization table. This means that each > TI would have an associated rendered field without creating an extra table > and duplicating dag_id, task_id & execution_date. Once we start storing > different version of serialized DAGs we can have each DAG row have it's > unrendered version. > ** -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (AIRFLOW-5088) To implement DAG JSON serialization and DB persistence for webserver scalability improvement
[ https://issues.apache.org/jira/browse/AIRFLOW-5088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kaxil Naik updated AIRFLOW-5088: Labels: dag-serialization (was: ) > To implement DAG JSON serialization and DB persistence for webserver > scalability improvement > > > Key: AIRFLOW-5088 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5088 > Project: Apache Airflow > Issue Type: Improvement > Components: DAG, webserver >Affects Versions: 1.10.5 >Reporter: Zhou Fang >Assignee: Zhou Fang >Priority: Major > Labels: dag-serialization > Fix For: 1.10.7 > > > Created this issue for starting to implement DAG serialization using JSON and > persistence in DB. Serialized DAG will be used in webserver for solving the > webserver scalability issue. > > The implementation is based on AIP-24: > [https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-24+DAG+Persistence+in+DB+using+JSON+for+Airflow+Webserver+and+%28optional%29+Scheduler] > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] brandonwillard commented on issue #7423: [AIRFLOW-3126] Add option to specify additional K8s volumes
brandonwillard commented on issue #7423: [AIRFLOW-3126] Add option to specify additional K8s volumes URL: https://github.com/apache/airflow/pull/7423#issuecomment-590958733 @davlum, I've tried it and It did not appear to help. Can you give an example of how it can be used to add volumes in this way without sacrificing any of the existing functionality/configurability of worker pods? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] brandonwillard removed a comment on issue #7423: [AIRFLOW-3126] Add option to specify additional K8s volumes
brandonwillard removed a comment on issue #7423: [AIRFLOW-3126] Add option to specify additional K8s volumes URL: https://github.com/apache/airflow/pull/7423#issuecomment-590955205 It does not appear to help. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] codecov-io commented on issue #7523: [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod
codecov-io commented on issue #7523: [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod URL: https://github.com/apache/airflow/pull/7523#issuecomment-590956652 # [Codecov](https://codecov.io/gh/apache/airflow/pull/7523?src=pr&el=h1) Report > :exclamation: No coverage uploaded for pull request base (`master@b6a11c1`). [Click here to learn what that means](https://docs.codecov.io/docs/error-reference#section-missing-base-commit). > The diff coverage is `93.18%`. [![Impacted file tree graph](https://codecov.io/gh/apache/airflow/pull/7523/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7523?src=pr&el=tree) ```diff @@Coverage Diff@@ ## master#7523 +/- ## = Coverage ? 86.85% = Files ? 896 Lines ?42638 Branches ?0 = Hits ?37035 Misses? 5603 Partials ?0 ``` | [Impacted Files](https://codecov.io/gh/apache/airflow/pull/7523?src=pr&el=tree) | Coverage Δ | | |---|---|---| | [airflow/utils/helpers.py](https://codecov.io/gh/apache/airflow/pull/7523/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9oZWxwZXJzLnB5) | `82.6% <100%> (ø)` | | | [airflow/settings.py](https://codecov.io/gh/apache/airflow/pull/7523/diff?src=pr&el=tree#diff-YWlyZmxvdy9zZXR0aW5ncy5weQ==) | `93.66% <100%> (ø)` | | | [airflow/models/dagbag.py](https://codecov.io/gh/apache/airflow/pull/7523/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnYmFnLnB5) | `89.61% <100%> (ø)` | | | [...s/google/cloud/example\_dags/example\_stackdriver.py](https://codecov.io/gh/apache/airflow/pull/7523/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvZ29vZ2xlL2Nsb3VkL2V4YW1wbGVfZGFncy9leGFtcGxlX3N0YWNrZHJpdmVyLnB5) | `100% <100%> (ø)` | | | [airflow/models/taskinstance.py](https://codecov.io/gh/apache/airflow/pull/7523/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvdGFza2luc3RhbmNlLnB5) | `95.17% <100%> (ø)` | | | [...ow/providers/microsoft/azure/hooks/azure\_cosmos.py](https://codecov.io/gh/apache/airflow/pull/7523/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvbWljcm9zb2Z0L2F6dXJlL2hvb2tzL2F6dXJlX2Nvc21vcy5weQ==) | `76.72% <100%> (ø)` | | | [airflow/providers/amazon/aws/hooks/s3.py](https://codecov.io/gh/apache/airflow/pull/7523/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYW1hem9uL2F3cy9ob29rcy9zMy5weQ==) | `96.6% <100%> (ø)` | | | [...ow/providers/google/cloud/operators/stackdriver.py](https://codecov.io/gh/apache/airflow/pull/7523/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvZ29vZ2xlL2Nsb3VkL29wZXJhdG9ycy9zdGFja2RyaXZlci5weQ==) | `100% <100%> (ø)` | | | [airflow/models/dag.py](https://codecov.io/gh/apache/airflow/pull/7523/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnLnB5) | `91.37% <100%> (ø)` | | | [airflow/utils/log/logging\_mixin.py](https://codecov.io/gh/apache/airflow/pull/7523/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9sb2cvbG9nZ2luZ19taXhpbi5weQ==) | `95.38% <100%> (ø)` | | | ... and [7 more](https://codecov.io/gh/apache/airflow/pull/7523/diff?src=pr&el=tree-more) | | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/airflow/pull/7523?src=pr&el=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/airflow/pull/7523?src=pr&el=footer). Last update [b6a11c1...e13f6c1](https://codecov.io/gh/apache/airflow/pull/7523?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] brandonwillard commented on issue #7423: [AIRFLOW-3126] Add option to specify additional K8s volumes
brandonwillard commented on issue #7423: [AIRFLOW-3126] Add option to specify additional K8s volumes URL: https://github.com/apache/airflow/pull/7423#issuecomment-590955205 It does not appear to help. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (AIRFLOW-5191) SubDag is marked failed
[ https://issues.apache.org/jira/browse/AIRFLOW-5191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17044639#comment-17044639 ] nexoriv commented on AIRFLOW-5191: -- thanks [~puhrez] that was useful! > SubDag is marked failed > > > Key: AIRFLOW-5191 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5191 > Project: Apache Airflow > Issue Type: Bug > Components: DAG, DagRun >Affects Versions: 1.10.4 > Environment: CentOS 7, Maria-DB, python 3.6.7, Airflow 1.10.4 >Reporter: Oliver Ricken >Priority: Blocker > > Dear all, > after having upgraded from Airflow version 1.10.2 to 1.10.4, we experience > strange and very problematic behaviour of SubDags (which are crucial for our > environment and used frequently). > Tasks inside the SubDag failing and awaiting retry ("up-for-retry") mark the > SubDag "failed" (while in 1.10.2, the SubDag was still in "running"-state). > This is particularly problematic for downstream tasks depending on the state > of the SubDag. Since we have downstream tasks triggered on "all_done", the > downstream task is triggered by the "failed" SubDag although a > SubDag-internal task is awaiting retry and might (in our case: most likely) > yield successfully processed data. This data is thus not available to the > prematurely triggered task downstream of the SubDag. > This is a severe problem for us and worth rolling back to 1.10.2 if there is > no quick solution or work-around to this issue! > We urgently need help on this matter. > Thanks allot in advance, any suggestions and input is highly appreciated! > Cheers > Oliver -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db URL: https://github.com/apache/airflow/pull/7217#discussion_r383986936 ## File path: airflow/models/dagcode.py ## @@ -0,0 +1,108 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import logging +from typing import List + +from sqlalchemy import Column, Index, Integer, String, Text, and_ + +from airflow.models import Base +from airflow.utils import timezone +from airflow.utils.session import provide_session +from airflow.utils.sqlalchemy import UtcDateTime + +log = logging.getLogger(__name__) + + +class DagCodeModel(Base): +"""A table for DAGs code. + +dag_code table contains code of DAG files synchronized by scheduler. +This feature is controlled by: + +* ``[core] store_serialized_dags = True``: enable this feature +* ``[core] store_code = True``: enable this feature + +For details on dag serialization see SerializedDagModel +""" +__tablename__ = 'dag_code' + +fileloc = Column(String(2000), primary_key=True) +# The max length of fileloc exceeds the limit of indexing. +fileloc_hash = Column(Integer, primary_key=True) +last_updated = Column(UtcDateTime, nullable=False) +source_code = Column(Text(), nullable=False) + +__table_args__ = ( +Index('idx_fileloc_code_hash', fileloc_hash, unique=False), +) + +def __init__(self, full_filepath: str): +self.fileloc = full_filepath +self.fileloc_hash = DagCodeModel.dag_fileloc_hash(self.fileloc) +self.last_updated = timezone.utcnow() +self.source_code = DagCodeModel._read_code(self.fileloc) + +@classmethod +def _read_code(cls, fileloc: str): +try: +with open(fileloc, 'r') as source: +source_code = source.read() +except IOError: +source_code = "Couldn't read source file {}.".format(fileloc) +return source_code + +@provide_session +def write_code(self, session=None): +"""Writes code into database. + +:param session: ORM Session +""" +session.merge(self) + +@classmethod +@provide_session +def remove_deleted_code(cls, alive_dag_filelocs: List[str], session=None): +"""Deletes code not included in alive_dag_filelocs. + +:param alive_dag_filelocs: file paths of alive DAGs +:param session: ORM Session +""" +alive_fileloc_hashes = [ +cls.dag_fileloc_hash(fileloc) for fileloc in alive_dag_filelocs] + +log.debug("Deleting code from %s table ", cls.__tablename__) + +session.execute( +cls.__table__.delete().where( +and_(cls.fileloc_hash.notin_(alive_fileloc_hashes), + cls.fileloc.notin_(alive_dag_filelocs + +@classmethod +def dag_fileloc_hash(cls, full_filepath: str) -> int: +Hashing file location for indexing. + +:param full_filepath: full filepath of DAG file +:return: hashed full_filepath +""" +# hashing is needed because the length of fileloc is 2000 as an Airflow convention, +# which is over the limit of indexing. If we can reduce the length of fileloc, then +# hashing is not needed. +import hashlib +return int.from_bytes( +hashlib.sha1( +full_filepath.encode('utf-8')).digest()[-2:], +byteorder='big', signed=False) Review comment: I agree. I shall create this migration This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db
anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db URL: https://github.com/apache/airflow/pull/7217#discussion_r383986117 ## File path: airflow/models/dagcode.py ## @@ -0,0 +1,108 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import logging +from typing import List + +from sqlalchemy import Column, Index, Integer, String, Text, and_ + +from airflow.models import Base +from airflow.utils import timezone +from airflow.utils.session import provide_session +from airflow.utils.sqlalchemy import UtcDateTime + +log = logging.getLogger(__name__) + + +class DagCodeModel(Base): +"""A table for DAGs code. + +dag_code table contains code of DAG files synchronized by scheduler. +This feature is controlled by: + +* ``[core] store_serialized_dags = True``: enable this feature +* ``[core] store_code = True``: enable this feature + +For details on dag serialization see SerializedDagModel +""" +__tablename__ = 'dag_code' + +fileloc = Column(String(2000), primary_key=True) +# The max length of fileloc exceeds the limit of indexing. +fileloc_hash = Column(Integer, primary_key=True) +last_updated = Column(UtcDateTime, nullable=False) +source_code = Column(Text()) + +__table_args__ = ( +Index('idx_fileloc_code_hash', fileloc_hash, unique=False), +) + +def __init__(self, full_filepath: str): +self.fileloc = full_filepath +self.fileloc_hash = DagCodeModel.dag_fileloc_hash(self.fileloc) +self.last_updated = timezone.utcnow() +self.source_code = DagCodeModel._read_code(self.fileloc) + +@classmethod +def _read_code(cls, fileloc: str): +try: +with open(fileloc, 'r') as source: +source_code = source.read() +except IOError: +source_code = "Couldn't read source file {}.".format(fileloc) Review comment: I agree This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on issue #7484: [AIRFLOW-6864] Make airflow/jobs pylint compatible
ashb commented on issue #7484: [AIRFLOW-6864] Make airflow/jobs pylint compatible URL: https://github.com/apache/airflow/pull/7484#issuecomment-590940199 Ah, because we have ignored that file (or mostly ignored it) it's not yet showing up in Pylint. SQLA DelcarativeModels does _funny_ things with the classes/instances that pylint can't detect. I guess it's unavoidable, and we will have to ignore those few lines. I think https://github.com/PyCQA/pylint/issues/3334 is the same issue we have (open, no feedback). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (AIRFLOW-6904) Airflow 1.10.9 suppresses Operator logs
[ https://issues.apache.org/jira/browse/AIRFLOW-6904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] RAHUL JAIN updated AIRFLOW-6904: Affects Version/s: 1.10.7 > Airflow 1.10.9 suppresses Operator logs > --- > > Key: AIRFLOW-6904 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6904 > Project: Apache Airflow > Issue Type: Bug > Components: core >Affects Versions: 1.10.7, 1.10.8, 1.10.9 >Reporter: RAHUL JAIN >Priority: Critical > Attachments: 1.10.2.png, 1.10.9.png > > > After upgrading from 1.10.2 to 1.10.9, we noticed that the Operator logs are > no longer printed. See the attachments for comparison. There is also a slack > channel discussion pointing to a recen change that may have broken this - > [https://apache-airflow.slack.com/archives/CCQ7EGB1P/p1582548602014200] > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (AIRFLOW-6904) Airflow 1.10.9 suppresses Operator logs
[ https://issues.apache.org/jira/browse/AIRFLOW-6904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17044604#comment-17044604 ] Kaxil Naik commented on AIRFLOW-6904: - This worked fine with Airflow <=1.10.6 and is caused by https://github.com/apache/airflow/pull/6627 > Airflow 1.10.9 suppresses Operator logs > --- > > Key: AIRFLOW-6904 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6904 > Project: Apache Airflow > Issue Type: Bug > Components: core >Affects Versions: 1.10.8, 1.10.9 >Reporter: RAHUL JAIN >Priority: Critical > Attachments: 1.10.2.png, 1.10.9.png > > > After upgrading from 1.10.2 to 1.10.9, we noticed that the Operator logs are > no longer printed. See the attachments for comparison. There is also a slack > channel discussion pointing to a recen change that may have broken this - > [https://apache-airflow.slack.com/archives/CCQ7EGB1P/p1582548602014200] > > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] davlum commented on issue #7423: [AIRFLOW-3126] Add option to specify additional K8s volumes
davlum commented on issue #7423: [AIRFLOW-3126] Add option to specify additional K8s volumes URL: https://github.com/apache/airflow/pull/7423#issuecomment-590936689 I've noticed you've added several PRs recently to add missing functionality to the `airflow.cfg` for configuring the KubernetesExecutor. There is also a [new field](https://github.com/apache/airflow/blob/746d8de2fcaa9554b4ce7dbf261e4ab148233222/airflow/config_templates/default_airflow.cfg#L756) which allows you to pass the YAML directly if that helps at all. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7523: [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod
kaxil commented on a change in pull request #7523: [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod URL: https://github.com/apache/airflow/pull/7523#discussion_r383966365 ## File path: airflow/config_templates/config.yml ## @@ -2035,6 +2035,17 @@ type: string example: ~ default: "" +- name: delete_option_kwargs + description: | +Optional keyword arguments to pass to the ``delete_namespaced_pod`` kubernetes client +``core_v1_api`` method when using the Kubernetes Executor. +This should be an object and can contain any of the options listed in the ``v1DeleteOptions`` +class defined here: + https://github.com/kubernetes-client/python/blob/41f11a09995efcd0142e25946adc7591431bfb2f/kubernetes/client/models/v1_delete_options.py#L19 + version_added: ~ + type: string + example: "{{\"grace_period_seconds\": 10}}" Review comment: Updated ! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7523: [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod
kaxil commented on a change in pull request #7523: [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod URL: https://github.com/apache/airflow/pull/7523#discussion_r383964107 ## File path: setup.py ## @@ -309,7 +309,7 @@ def write_version(filename: str = os.path.join(*[dirname(__file__), "airflow", " 'pinotdb==0.1.1', ] postgres = [ -'psycopg2-binary>=2.7.4', Review comment: Whoops this was not meant for this PR, removing it This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] potiuk commented on a change in pull request #7523: [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod
potiuk commented on a change in pull request #7523: [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod URL: https://github.com/apache/airflow/pull/7523#discussion_r383963718 ## File path: setup.py ## @@ -309,7 +309,7 @@ def write_version(filename: str = os.path.join(*[dirname(__file__), "airflow", " 'pinotdb==0.1.1', ] postgres = [ -'psycopg2-binary>=2.7.4', Review comment: Why this ? Just wondering? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] Fokko commented on a change in pull request #6577: [AIRFLOW-5908] Add download_file to S3 Hook
Fokko commented on a change in pull request #6577: [AIRFLOW-5908] Add download_file to S3 Hook URL: https://github.com/apache/airflow/pull/6577#discussion_r383956654 ## File path: airflow/providers/amazon/aws/hooks/s3.py ## @@ -652,3 +654,30 @@ def delete_objects(self, bucket, keys): if "Errors" in response: errors_keys = [x['Key'] for x in response.get("Errors", [])] raise AirflowException("Errors when deleting: {}".format(errors_keys)) + +@provide_bucket_name +@unify_bucket_name_and_key +def download_file(self, key: str, bucket_name: Optional[str] = None, local_path: str = '/tmp') -> str: Review comment: Instead of hardcoding the `/tmp` dir. We could ask this using a Python function, which is system independent since it will ask the OS for a temp dir: https://docs.python.org/3/library/tempfile.html#tempfile.mkdtemp This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] nuclearpinguin edited a comment on issue #7484: [AIRFLOW-6864] Make airflow/jobs pylint compatible
nuclearpinguin edited a comment on issue #7484: [AIRFLOW-6864] Make airflow/jobs pylint compatible URL: https://github.com/apache/airflow/pull/7484#issuecomment-590924614 ``` * Module airflow.jobs.scheduler_job airflow/jobs/scheduler_job.py:1031:16: E1101: Method 'state' has no 'is_' member (no-member) ``` @ashb I assume that the difference is that here `DM.dag_id.is_(None)` we are referencing `dag_id` attribute of sqla model. And here `models.DagRun.state.is_(None)` reference a method of `DagRun`: ```python @declared_attr def state(self): return synonym('_state', descriptor=property(self.get_state, self.set_state)) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] nuclearpinguin edited a comment on issue #7484: [AIRFLOW-6864] Make airflow/jobs pylint compatible
nuclearpinguin edited a comment on issue #7484: [AIRFLOW-6864] Make airflow/jobs pylint compatible URL: https://github.com/apache/airflow/pull/7484#issuecomment-590924614 ``` * Module airflow.jobs.scheduler_job airflow/jobs/scheduler_job.py:1031:16: E1101: Method 'state' has no 'is_' member (no-member) ``` @ashb I assume that the difference is that here `models.TaskInstance.state.in_(old_states)` we are referencing `state` attribute of sqla model. And here `models.DagRun.state.is_(None)` reference a method of `DagRun`: ```python @declared_attr def state(self): return synonym('_state', descriptor=property(self.get_state, self.set_state)) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] nuclearpinguin commented on issue #7484: [AIRFLOW-6864] Make airflow/jobs pylint compatible
nuclearpinguin commented on issue #7484: [AIRFLOW-6864] Make airflow/jobs pylint compatible URL: https://github.com/apache/airflow/pull/7484#issuecomment-590924614 ``` * Module airflow.jobs.scheduler_job airflow/jobs/scheduler_job.py:1031:16: E1101: Method 'state' has no 'is_' member (no-member) ``` @ashb This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (AIRFLOW-6915) Adds AI Platform Console link for MLEngineStartTrainingJobOperator
[ https://issues.apache.org/jira/browse/AIRFLOW-6915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17044571#comment-17044571 ] ASF GitHub Bot commented on AIRFLOW-6915: - MarkYHZhang commented on pull request #7535: [AIRFLOW-6915] Adds AI Platform Console link for MLEngineStartTrainingOperator URL: https://github.com/apache/airflow/pull/7535 Adds AI Platform Console link for MLEngineStartTrainingOperator for the RBAC UI --- Issue link: WILL BE INSERTED BY [boring-cyborg](https://github.com/kaxil/boring-cyborg) Make sure to mark the boxes below before creating PR: [x] - [x] Description above provides context of the change - [x] Commit message/PR title starts with `[AIRFLOW-]`. AIRFLOW- = JIRA ID* - [x] Unit tests coverage for changes (not needed for documentation changes) - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)" - [x] Relevant documentation is updated including usage instructions. - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example). * For document-only changes commit message can start with `[AIRFLOW-]`. --- In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Adds AI Platform Console link for MLEngineStartTrainingJobOperator > -- > > Key: AIRFLOW-6915 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6915 > Project: Apache Airflow > Issue Type: New Feature > Components: gcp, operators >Affects Versions: 2.0.0 >Reporter: Mark Zhang >Assignee: Mark Zhang >Priority: Major > > Adds AI Platform Console link for MLEngineStartTrainingJobOperator -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] MarkYHZhang opened a new pull request #7535: [AIRFLOW-6915] Adds AI Platform Console link for MLEngineStartTrainingOperator
MarkYHZhang opened a new pull request #7535: [AIRFLOW-6915] Adds AI Platform Console link for MLEngineStartTrainingOperator URL: https://github.com/apache/airflow/pull/7535 Adds AI Platform Console link for MLEngineStartTrainingOperator for the RBAC UI --- Issue link: WILL BE INSERTED BY [boring-cyborg](https://github.com/kaxil/boring-cyborg) Make sure to mark the boxes below before creating PR: [x] - [x] Description above provides context of the change - [x] Commit message/PR title starts with `[AIRFLOW-]`. AIRFLOW- = JIRA ID* - [x] Unit tests coverage for changes (not needed for documentation changes) - [x] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)" - [x] Relevant documentation is updated including usage instructions. - [x] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example). * For document-only changes commit message can start with `[AIRFLOW-]`. --- In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (AIRFLOW-5354) Scheduler - constant CPU usage of 25% with nothing running and scheduling loop running too frequently
[ https://issues.apache.org/jira/browse/AIRFLOW-5354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ash Berlin-Taylor reassigned AIRFLOW-5354: -- Assignee: Ash Berlin-Taylor > Scheduler - constant CPU usage of 25% with nothing running and scheduling > loop running too frequently > - > > Key: AIRFLOW-5354 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5354 > Project: Apache Airflow > Issue Type: Improvement > Components: scheduler >Affects Versions: 1.10.4 >Reporter: t oo >Assignee: Ash Berlin-Taylor >Priority: Major > > I've put logging level to debug, cpu utilisation is constant 25% but no dags > are running (they have none schedule since i only externally trigger). Why is > the scheduling loop running every 2 seconds? can I make it every 30 seconds? > > here is some of the scheduler log > > [2019-08-30 09:28:57,538] \{scheduler_job.py:1363} DEBUG - Starting Loop... > [2019-08-30 09:28:57,539] \{scheduler_job.py:1374} DEBUG - Harvesting DAG > parsing results > [2019-08-30 09:28:57,539] \{dag_processing.py:635} DEBUG - Received message > of type DagParsingStat > [2019-08-30 09:28:57,539] \{dag_processing.py:635} DEBUG - Received message > of type DagParsingStat > [2019-08-30 09:28:57,539] \{dag_processing.py:635} DEBUG - Received message > of type DagParsingStat > [2019-08-30 09:28:57,539] \{dag_processing.py:635} DEBUG - Received message > of type DagParsingStat > [2019-08-30 09:28:57,540] \{scheduler_job.py:1376} DEBUG - Harvested 0 > SimpleDAGs > [2019-08-30 09:28:57,540] \{scheduler_job.py:1411} DEBUG - Heartbeating the > executor > [2019-08-30 09:28:57,540] \{base_executor.py:124} DEBUG - 0 running task > instances > [2019-08-30 09:28:57,540] \{base_executor.py:125} DEBUG - 0 in queue > [2019-08-30 09:28:57,540] \{base_executor.py:126} DEBUG - 3 open slots > [2019-08-30 09:28:57,540] \{base_executor.py:135} DEBUG - Calling the 'airflow.executors.local_executor.LocalExecutor'> sync method > [2019-08-30 09:28:57,541] \{scheduler_job.py:1432} DEBUG - Ran scheduling > loop in 0.00 seconds > [2019-08-30 09:28:57,541] \{scheduler_job.py:1435} DEBUG - Sleeping for 1.00 > seconds > [2019-08-30 09:28:58,543] \{scheduler_job.py:1447} DEBUG - Sleeping for 1.00 > seconds to prevent excessive logging > [2019-08-30 09:28:59,541] \{scheduler_job.py:1363} DEBUG - Starting Loop... > [2019-08-30 09:28:59,541] \{scheduler_job.py:1374} DEBUG - Harvesting DAG > parsing results > [2019-08-30 09:28:59,541] \{dag_processing.py:635} DEBUG - Received message > of type DagParsingStat > [2019-08-30 09:28:59,542] \{dag_processing.py:635} DEBUG - Received message > of type DagParsingStat > [2019-08-30 09:28:59,542] \{dag_processing.py:635} DEBUG - Received message > of type DagParsingStat > [2019-08-30 09:28:59,542] \{dag_processing.py:635} DEBUG - Received message > of type DagParsingStat > [2019-08-30 09:28:59,542] \{scheduler_job.py:1376} DEBUG - Harvested 0 > SimpleDAGs > [2019-08-30 09:28:59,542] \{scheduler_job.py:1411} DEBUG - Heartbeating the > executor > [2019-08-30 09:28:59,542] \{base_executor.py:124} DEBUG - 0 running task > instances > [2019-08-30 09:28:59,543] \{base_executor.py:125} DEBUG - 0 in queue > [2019-08-30 09:28:59,543] \{base_executor.py:126} DEBUG - 3 open slots > [2019-08-30 09:28:59,543] \{base_executor.py:135} DEBUG - Calling the 'airflow.executors.local_executor.LocalExecutor'> sync method > [2019-08-30 09:28:59,544] \{scheduler_job.py:1432} DEBUG - Ran scheduling > loop in 0.00 seconds > [2019-08-30 09:28:59,544] \{scheduler_job.py:1435} DEBUG - Sleeping for 1.00 > seconds > [2019-08-30 09:29:00,545] \{scheduler_job.py:1447} DEBUG - Sleeping for 1.00 > seconds to prevent excessive logging > [2019-08-30 09:29:01,544] \{scheduler_job.py:1363} DEBUG - Starting Loop... > [2019-08-30 09:29:01,545] \{scheduler_job.py:1374} DEBUG - Harvesting DAG > parsing results > [2019-08-30 09:29:01,545] \{dag_processing.py:635} DEBUG - Received message > of type DagParsingStat > [2019-08-30 09:29:01,545] \{dag_processing.py:635} DEBUG - Received message > of type DagParsingStat > [2019-08-30 09:29:01,545] \{dag_processing.py:635} DEBUG - Received message > of type DagParsingStat > [2019-08-30 09:29:01,545] \{dag_processing.py:635} DEBUG - Received message > of type DagParsingStat > [2019-08-30 09:29:01,546] \{scheduler_job.py:1376} DEBUG - Harvested 0 > SimpleDAGs > [2019-08-30 09:29:01,546] \{scheduler_job.py:1411} DEBUG - Heartbeating the > executor > [2019-08-30 09:29:01,546] \{base_executor.py:124} DEBUG - 0 running task > instances > [2019-08-30 09:29:01,546] \{base_executor.py:125} DEBUG - 0 in queue > [2019-08-30 09:29:01,547] \{base_executor.py:126} DEBUG - 3 open slots > [2019-08-30 09:29:01,
[GitHub] [airflow] kaxil commented on a change in pull request #7523: [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod
kaxil commented on a change in pull request #7523: [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod URL: https://github.com/apache/airflow/pull/7523#discussion_r383944608 ## File path: airflow/config_templates/config.yml ## @@ -2035,6 +2035,17 @@ type: string example: ~ default: "" +- name: delete_option_kwargs + description: | +Optional keyword arguments to pass to the ``delete_namespaced_pod`` kubernetes client +``core_v1_api`` method when using the Kubernetes Executor. +This should be an object and can contain any of the options listed in the ``v1DeleteOptions`` +class defined here: + https://github.com/kubernetes-client/python/blob/41f11a09995efcd0142e25946adc7591431bfb2f/kubernetes/client/models/v1_delete_options.py#L19 + version_added: ~ + type: string + example: "{{\"grace_period_seconds\": 10}}" Review comment: It is tricky because of `task_log_prefix_template = {{ti.dag_id}}-{{ti.task_id}}-{{execution_date}}-{{try_number}}`. It would replcae it too `task_log_prefix_template = ti.dag_id-ti.task_id-execution_date-try_number` > Maybe we need to update the pre-commit script to replace `{` with `{{` and `}` with `}}` so that this and the default_airflow.cfg are correct. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] potiuk commented on a change in pull request #7534: [AIRFLOW-6867] Fix bug in how we call kill_zombies
potiuk commented on a change in pull request #7534: [AIRFLOW-6867] Fix bug in how we call kill_zombies URL: https://github.com/apache/airflow/pull/7534#discussion_r383942394 ## File path: airflow/jobs/scheduler_job.py ## @@ -877,7 +877,7 @@ def process_file(self, file_path, zombies, pickle_dags=False, session=None): except Exception: self.log.exception("Error logging import errors!") try: -self.kill_zombies(zombies) +self.kill_zombies(dagbag, zombies) Review comment: Tomek has the PR #7484 in progress for that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] potiuk commented on a change in pull request #7534: [AIRFLOW-6867] Fix bug in how we call kill_zombies
potiuk commented on a change in pull request #7534: [AIRFLOW-6867] Fix bug in how we call kill_zombies URL: https://github.com/apache/airflow/pull/7534#discussion_r383942031 ## File path: airflow/jobs/scheduler_job.py ## @@ -877,7 +877,7 @@ def process_file(self, file_path, zombies, pickle_dags=False, session=None): except Exception: self.log.exception("Error logging import errors!") try: -self.kill_zombies(zombies) +self.kill_zombies(dagbag, zombies) Review comment: Indeed . This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] feluelle commented on issue #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration
feluelle commented on issue #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration URL: https://github.com/apache/airflow/pull/6007#issuecomment-590915162 Please also move the tests from contrib to providers This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] feluelle commented on issue #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration
feluelle commented on issue #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration URL: https://github.com/apache/airflow/pull/6007#issuecomment-590914376 If you check the Files Changed tab you can see what is included in your PR and there are still the docs/integration and docs/code files and the contrib files (hook, sensor and operator) in it. Please remove those. You have the operators-and-hooks and the providers files instead. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7523: [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod
kaxil commented on a change in pull request #7523: [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod URL: https://github.com/apache/airflow/pull/7523#discussion_r383936665 ## File path: airflow/config_templates/config.yml ## @@ -2035,6 +2035,17 @@ type: string example: ~ default: "" +- name: delete_option_kwargs + description: | +Optional keyword arguments to pass to the ``delete_namespaced_pod`` kubernetes client +``core_v1_api`` method when using the Kubernetes Executor. +This should be an object and can contain any of the options listed in the ``v1DeleteOptions`` +class defined here: + https://github.com/kubernetes-client/python/blob/41f11a09995efcd0142e25946adc7591431bfb2f/kubernetes/client/models/v1_delete_options.py#L19 + version_added: ~ + type: string + example: "{{\"grace_period_seconds\": 10}}" Review comment: We need `{{"grace_period_seconds": 10}}` in airflow.cfg because ConfigParser does not parse `{` properly This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7523: [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod
kaxil commented on a change in pull request #7523: [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod URL: https://github.com/apache/airflow/pull/7523#discussion_r383936665 ## File path: airflow/config_templates/config.yml ## @@ -2035,6 +2035,17 @@ type: string example: ~ default: "" +- name: delete_option_kwargs + description: | +Optional keyword arguments to pass to the ``delete_namespaced_pod`` kubernetes client +``core_v1_api`` method when using the Kubernetes Executor. +This should be an object and can contain any of the options listed in the ``v1DeleteOptions`` +class defined here: + https://github.com/kubernetes-client/python/blob/41f11a09995efcd0142e25946adc7591431bfb2f/kubernetes/client/models/v1_delete_options.py#L19 + version_added: ~ + type: string + example: "{{\"grace_period_seconds\": 10}}" Review comment: We need `{{"grace_period_seconds": 10}}` in airflow.cfg because ConfigParser does not parse `{` properly This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (AIRFLOW-6915) Adds AI Platform Console link for MLEngineStartTrainingJobOperator
Mark Zhang created AIRFLOW-6915: --- Summary: Adds AI Platform Console link for MLEngineStartTrainingJobOperator Key: AIRFLOW-6915 URL: https://issues.apache.org/jira/browse/AIRFLOW-6915 Project: Apache Airflow Issue Type: New Feature Components: gcp, operators Affects Versions: 2.0.0 Reporter: Mark Zhang Assignee: Mark Zhang Adds AI Platform Console link for MLEngineStartTrainingJobOperator -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] ashb commented on a change in pull request #7523: [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod
ashb commented on a change in pull request #7523: [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod URL: https://github.com/apache/airflow/pull/7523#discussion_r383935977 ## File path: airflow/config_templates/config.yml ## @@ -2035,6 +2035,17 @@ type: string example: ~ default: "" +- name: delete_option_kwargs + description: | +Optional keyword arguments to pass to the ``delete_namespaced_pod`` kubernetes client +``core_v1_api`` method when using the Kubernetes Executor. +This should be an object and can contain any of the options listed in the ``v1DeleteOptions`` +class defined here: + https://github.com/kubernetes-client/python/blob/41f11a09995efcd0142e25946adc7591431bfb2f/kubernetes/client/models/v1_delete_options.py#L19 + version_added: ~ + type: string + example: "{{\"grace_period_seconds\": 10}}" Review comment: Maybe we need to update the pre-commit script to replace `{` with `{{` and `}` with `}}` so that this and the default_airflow.cfg are correct. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #7523: [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod
ashb commented on a change in pull request #7523: [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod URL: https://github.com/apache/airflow/pull/7523#discussion_r383935377 ## File path: airflow/config_templates/config.yml ## @@ -2035,6 +2035,17 @@ type: string example: ~ default: "" +- name: delete_option_kwargs + description: | +Optional keyword arguments to pass to the ``delete_namespaced_pod`` kubernetes client +``core_v1_api`` method when using the Kubernetes Executor. +This should be an object and can contain any of the options listed in the ``v1DeleteOptions`` +class defined here: + https://github.com/kubernetes-client/python/blob/41f11a09995efcd0142e25946adc7591431bfb2f/kubernetes/client/models/v1_delete_options.py#L19 + version_added: ~ + type: string + example: "{{\"grace_period_seconds\": 10}}" Review comment: Doesn't affect the output, but could be done as: ```suggestion example: '{{"grace_period_seconds": 10}}' ``` Also: OH GOD. Do we show this as `{` or `{{` when we render the docs. Becasuse it needs to be `{` -- `{{` is only needed in default_airflow.cfg because that is a template, but we should show in our docs is not templated/formated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (AIRFLOW-5485) scheduler_job: replace == None with is_() comparison
[ https://issues.apache.org/jira/browse/AIRFLOW-5485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17044549#comment-17044549 ] ASF GitHub Bot commented on AIRFLOW-5485: - ashb commented on pull request #7533: [AIRFLOW-5485] - scheduler_job: replace == None with is URL: https://github.com/apache/airflow/pull/7533 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > scheduler_job: replace == None with is_() comparison > > > Key: AIRFLOW-5485 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5485 > Project: Apache Airflow > Issue Type: Bug > Components: hooks >Affects Versions: 1.10.6 >Reporter: Jakob Homan >Assignee: Saurabh Dhupar >Priority: Minor > Labels: ccoss2019, newbie > > Note: This ticket's being created to facilitate a new contributor's workshop > for Airflow. After the workshop has completed, I'll mark these all available > for anyone that might like to take them on. > airflow/jobs/scheduler_job.py:855 > {code:java} > .filter(or_(DR.run_id == None, # noqa: E711 pylint: > disable=singleton-comparison > not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%' > .outerjoin(DM, DM.dag_id == TI.dag_id) > .filter(or_(DM.dag_id == None, # noqa: E711 pylint: > disable=singleton-comparison > not_(DM.is_paused))) {code} > We're using {{col == None}} here where we should use {{col.is_(None)}} to > avoid confusion as and lint warnings. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (AIRFLOW-5485) scheduler_job: replace == None with is_() comparison
[ https://issues.apache.org/jira/browse/AIRFLOW-5485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ash Berlin-Taylor resolved AIRFLOW-5485. Resolution: Fixed (Resolved without fix for as it was fixed in the linked parent issue) > scheduler_job: replace == None with is_() comparison > > > Key: AIRFLOW-5485 > URL: https://issues.apache.org/jira/browse/AIRFLOW-5485 > Project: Apache Airflow > Issue Type: Bug > Components: hooks >Affects Versions: 1.10.6 >Reporter: Jakob Homan >Assignee: Saurabh Dhupar >Priority: Minor > Labels: ccoss2019, newbie > > Note: This ticket's being created to facilitate a new contributor's workshop > for Airflow. After the workshop has completed, I'll mark these all available > for anyone that might like to take them on. > airflow/jobs/scheduler_job.py:855 > {code:java} > .filter(or_(DR.run_id == None, # noqa: E711 pylint: > disable=singleton-comparison > not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%' > .outerjoin(DM, DM.dag_id == TI.dag_id) > .filter(or_(DM.dag_id == None, # noqa: E711 pylint: > disable=singleton-comparison > not_(DM.is_paused))) {code} > We're using {{col == None}} here where we should use {{col.is_(None)}} to > avoid confusion as and lint warnings. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (AIRFLOW-6907) Simplify SchedulerJob
[ https://issues.apache.org/jira/browse/AIRFLOW-6907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ash Berlin-Taylor resolved AIRFLOW-6907. Fix Version/s: 2.0.0 Resolution: Fixed > Simplify SchedulerJob > - > > Key: AIRFLOW-6907 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6907 > Project: Apache Airflow > Issue Type: New Feature > Components: scheduler >Affects Versions: 1.10.9 >Reporter: Kamil Bregula >Priority: Major > Fix For: 2.0.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] ashb closed pull request #7533: [AIRFLOW-5485] - scheduler_job: replace == None with is
ashb closed pull request #7533: [AIRFLOW-5485] - scheduler_job: replace == None with is URL: https://github.com/apache/airflow/pull/7533 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on issue #7533: [AIRFLOW-5485] - scheduler_job: replace == None with is
ashb commented on issue #7533: [AIRFLOW-5485] - scheduler_job: replace == None with is URL: https://github.com/apache/airflow/pull/7533#issuecomment-590909998 The other PR has been merged, so I'm closing this. Thanks for the PR though! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] nuclearpinguin commented on issue #7484: [AIRFLOW-6864] Make airflow/jobs pylint compatible
nuclearpinguin commented on issue #7484: [AIRFLOW-6864] Make airflow/jobs pylint compatible URL: https://github.com/apache/airflow/pull/7484#issuecomment-590904410 > @nuclearpinguin See https://github.com/apache/airflow/pull/7527/files#r383812108 -- this is my only outstanding query about this PR. This PR runs pylint over `scheduler_job.py` the one you mention does not. I suppose once I rebase I will have to fix it in more places. @ashb This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7532: [AIRFLOW-XXXX] Fix outdated doc on settings.policy
kaxil commented on a change in pull request #7532: [AIRFLOW-] Fix outdated doc on settings.policy URL: https://github.com/apache/airflow/pull/7532#discussion_r383925599 ## File path: airflow/settings.py ## @@ -79,28 +79,26 @@ json = json -def policy(task_instance): +def policy(task): Review comment: No worries :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] Fokko commented on a change in pull request #7532: [AIRFLOW-XXXX] Fix outdated doc on settings.policy
Fokko commented on a change in pull request #7532: [AIRFLOW-] Fix outdated doc on settings.policy URL: https://github.com/apache/airflow/pull/7532#discussion_r383924936 ## File path: airflow/settings.py ## @@ -79,28 +79,26 @@ json = json -def policy(task_instance): +def policy(task): Review comment: Much better, sorry for the fuzz. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7527: [AIRFLOW-6907] Simplify SchedulerJob
nuclearpinguin commented on a change in pull request #7527: [AIRFLOW-6907] Simplify SchedulerJob URL: https://github.com/apache/airflow/pull/7527#discussion_r383921545 ## File path: airflow/jobs/scheduler_job.py ## @@ -1098,37 +1094,20 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None): TI = models.TaskInstance DR = models.DagRun DM = models.DagModel -ti_query = ( +task_instances_to_examine = ( session .query(TI) .filter(TI.dag_id.in_(simple_dag_bag.dag_ids)) .outerjoin( -DR, -and_(DR.dag_id == TI.dag_id, DR.execution_date == TI.execution_date) +DR, and_(DR.dag_id == TI.dag_id, DR.execution_date == TI.execution_date) ) -.filter(or_(DR.run_id == None, # noqa: E711 pylint: disable=singleton-comparison -not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%' +.filter(or_(DR.run_id.is_(None), not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%' .outerjoin(DM, DM.dag_id == TI.dag_id) -.filter(or_(DM.dag_id == None, # noqa: E711 pylint: disable=singleton-comparison -not_(DM.is_paused))) +.filter(or_(DM.dag_id.is_(None), not_(DM.is_paused))) Review comment: It's still in TODO list: https://github.com/PolideaInternal/airflow/blob/37c630da636d1ef352273e33fbdb417727914386/scripts/ci/pylint_todo.txt#L11 And I suppose that once I rebase onto new master I will have to fix it in more places... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7532: [AIRFLOW-XXXX] Fix outdated doc on settings.policy
kaxil commented on a change in pull request #7532: [AIRFLOW-] Fix outdated doc on settings.policy URL: https://github.com/apache/airflow/pull/7532#discussion_r383921528 ## File path: airflow/settings.py ## @@ -79,28 +79,26 @@ json = json -def policy(task_instance): +def policy(task): Review comment: Removed Type Annotation This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] kaxil merged pull request #7532: [AIRFLOW-XXXX] Fix outdated doc on settings.policy
kaxil merged pull request #7532: [AIRFLOW-] Fix outdated doc on settings.policy URL: https://github.com/apache/airflow/pull/7532 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7527: [AIRFLOW-6907] Simplify SchedulerJob
nuclearpinguin commented on a change in pull request #7527: [AIRFLOW-6907] Simplify SchedulerJob URL: https://github.com/apache/airflow/pull/7527#discussion_r383920198 ## File path: airflow/jobs/scheduler_job.py ## @@ -1098,37 +1094,20 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None): TI = models.TaskInstance DR = models.DagRun DM = models.DagModel -ti_query = ( +task_instances_to_examine = ( session .query(TI) .filter(TI.dag_id.in_(simple_dag_bag.dag_ids)) .outerjoin( -DR, -and_(DR.dag_id == TI.dag_id, DR.execution_date == TI.execution_date) +DR, and_(DR.dag_id == TI.dag_id, DR.execution_date == TI.execution_date) ) -.filter(or_(DR.run_id == None, # noqa: E711 pylint: disable=singleton-comparison -not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%' +.filter(or_(DR.run_id.is_(None), not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%' .outerjoin(DM, DM.dag_id == TI.dag_id) -.filter(or_(DM.dag_id == None, # noqa: E711 pylint: disable=singleton-comparison -not_(DM.is_paused))) +.filter(or_(DM.dag_id.is_(None), not_(DM.is_paused))) Review comment: I don't know, but here we do not run pylint over this file, do we? @ashb This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7532: [AIRFLOW-XXXX] Fix outdated doc on settings.policy
kaxil commented on a change in pull request #7532: [AIRFLOW-] Fix outdated doc on settings.policy URL: https://github.com/apache/airflow/pull/7532#discussion_r383920439 ## File path: airflow/settings.py ## @@ -79,28 +79,26 @@ json = json -def policy(task_instance): +def policy(task): Review comment: Ya it is causing issues: https://travis-ci.org/apache/airflow/jobs/654914183 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7527: [AIRFLOW-6907] Simplify SchedulerJob
nuclearpinguin commented on a change in pull request #7527: [AIRFLOW-6907] Simplify SchedulerJob URL: https://github.com/apache/airflow/pull/7527#discussion_r383920198 ## File path: airflow/jobs/scheduler_job.py ## @@ -1098,37 +1094,20 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None): TI = models.TaskInstance DR = models.DagRun DM = models.DagModel -ti_query = ( +task_instances_to_examine = ( session .query(TI) .filter(TI.dag_id.in_(simple_dag_bag.dag_ids)) .outerjoin( -DR, -and_(DR.dag_id == TI.dag_id, DR.execution_date == TI.execution_date) +DR, and_(DR.dag_id == TI.dag_id, DR.execution_date == TI.execution_date) ) -.filter(or_(DR.run_id == None, # noqa: E711 pylint: disable=singleton-comparison -not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%' +.filter(or_(DR.run_id.is_(None), not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%' .outerjoin(DM, DM.dag_id == TI.dag_id) -.filter(or_(DM.dag_id == None, # noqa: E711 pylint: disable=singleton-comparison -not_(DM.is_paused))) +.filter(or_(DM.dag_id.is_(None), not_(DM.is_paused))) Review comment: I don't know, but here we do not run pylint over this file, do we? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (AIRFLOW-6907) Simplify SchedulerJob
[ https://issues.apache.org/jira/browse/AIRFLOW-6907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17044481#comment-17044481 ] ASF subversion and git services commented on AIRFLOW-6907: -- Commit 77b532062cc3fb4407f7e4a99626c31b29133854 in airflow's branch refs/heads/master from Kamil Breguła [ https://gitbox.apache.org/repos/asf?p=airflow.git;h=77b5320 ] [AIRFLOW-6907] Simplify SchedulerJob (#7527) > Simplify SchedulerJob > - > > Key: AIRFLOW-6907 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6907 > Project: Apache Airflow > Issue Type: New Feature > Components: scheduler >Affects Versions: 1.10.9 >Reporter: Kamil Bregula >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (AIRFLOW-6907) Simplify SchedulerJob
[ https://issues.apache.org/jira/browse/AIRFLOW-6907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17044482#comment-17044482 ] ASF subversion and git services commented on AIRFLOW-6907: -- Commit 77b532062cc3fb4407f7e4a99626c31b29133854 in airflow's branch refs/heads/master from Kamil Breguła [ https://gitbox.apache.org/repos/asf?p=airflow.git;h=77b5320 ] [AIRFLOW-6907] Simplify SchedulerJob (#7527) > Simplify SchedulerJob > - > > Key: AIRFLOW-6907 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6907 > Project: Apache Airflow > Issue Type: New Feature > Components: scheduler >Affects Versions: 1.10.9 >Reporter: Kamil Bregula >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (AIRFLOW-6907) Simplify SchedulerJob
[ https://issues.apache.org/jira/browse/AIRFLOW-6907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17044478#comment-17044478 ] ASF GitHub Bot commented on AIRFLOW-6907: - mik-laj commented on pull request #7527: [AIRFLOW-6907] Simplify SchedulerJob URL: https://github.com/apache/airflow/pull/7527 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Simplify SchedulerJob > - > > Key: AIRFLOW-6907 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6907 > Project: Apache Airflow > Issue Type: New Feature > Components: scheduler >Affects Versions: 1.10.9 >Reporter: Kamil Bregula >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] mik-laj merged pull request #7527: [AIRFLOW-6907] Simplify SchedulerJob
mik-laj merged pull request #7527: [AIRFLOW-6907] Simplify SchedulerJob URL: https://github.com/apache/airflow/pull/7527 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] Fokko commented on a change in pull request #7532: [AIRFLOW-XXXX] Fix outdated doc on settings.policy
Fokko commented on a change in pull request #7532: [AIRFLOW-] Fix outdated doc on settings.policy URL: https://github.com/apache/airflow/pull/7532#discussion_r383905306 ## File path: airflow/settings.py ## @@ -79,28 +79,26 @@ json = json -def policy(task_instance): +def policy(task): Review comment: I prefer 2, but if there is any chance of having cyclic dependencies we should just leave it out. The first one is a bit awkward. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7532: [AIRFLOW-XXXX] Fix outdated doc on settings.policy
kaxil commented on a change in pull request #7532: [AIRFLOW-] Fix outdated doc on settings.policy URL: https://github.com/apache/airflow/pull/7532#discussion_r383902307 ## File path: airflow/settings.py ## @@ -79,28 +79,26 @@ json = json -def policy(task_instance): +def policy(task): Review comment: What version would you like/recommend: 1. With TYPE_CHECKING - https://github.com/apache/airflow/pull/7532/commits/fb4ede5523cb1e0aea8bff95766feffb54d5f0a5 2. Without TYPE_CHECKING - https://github.com/apache/airflow/pull/7532/commits/7776f8178d3b61cf003b10eccb7caa2be3f504a2 With (2) I am just a bit worried that there might be a chance for cyclic dependencies. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] abdulbasitds commented on issue #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration
abdulbasitds commented on issue #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration URL: https://github.com/apache/airflow/pull/6007#issuecomment-590883802 @feluelle I am not sure bout the old files in the contrib and docs, can you please elaborate more on which files needs to be deleted? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Issue Comment Deleted] (AIRFLOW-6867) Decouple DagBag and TaskInstance
[ https://issues.apache.org/jira/browse/AIRFLOW-6867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ash Berlin-Taylor updated AIRFLOW-6867: --- Comment: was deleted (was: ashb commented on pull request #7534: [AIRFLOW-6867] Fix bug in how we call kill_zombies URL: https://github.com/apache/airflow/pull/7534 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org ) > Decouple DagBag and TaskInstance > > > Key: AIRFLOW-6867 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6867 > Project: Apache Airflow > Issue Type: Bug > Components: DAG >Affects Versions: 1.10.9 >Reporter: Kamil Bregula >Assignee: Kamil Bregula >Priority: Major > Fix For: 2.0.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (AIRFLOW-6867) Decouple DagBag and TaskInstance
[ https://issues.apache.org/jira/browse/AIRFLOW-6867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ash Berlin-Taylor updated AIRFLOW-6867: --- Comment: was deleted (was: ashb commented on pull request #7534: [AIRFLOW-6867] Fix bug in kill zombies URL: https://github.com/apache/airflow/pull/7534 The refactor in #7488 introduced a scheduler-breaking bug. I am not happy about merging this without a unit test to stop it breaking again, but given how much process_file does it's hard to test. Thoughts? --- Issue link: WILL BE INSERTED BY [boring-cyborg](https://github.com/kaxil/boring-cyborg) Make sure to mark the boxes below before creating PR: [x] - [ ] Description above provides context of the change - [ ] Commit message/PR title starts with `[AIRFLOW-]`. AIRFLOW- = JIRA ID* - [ ] Unit tests coverage for changes (not needed for documentation changes) - [ ] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)" - [ ] Relevant documentation is updated including usage instructions. - [ ] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example). * For document-only changes commit message can start with `[AIRFLOW-]`. --- In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org ) > Decouple DagBag and TaskInstance > > > Key: AIRFLOW-6867 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6867 > Project: Apache Airflow > Issue Type: Bug > Components: DAG >Affects Versions: 1.10.9 >Reporter: Kamil Bregula >Assignee: Kamil Bregula >Priority: Major > Fix For: 2.0.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (AIRFLOW-6910) Fix kill_zombie method call
[ https://issues.apache.org/jira/browse/AIRFLOW-6910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ash Berlin-Taylor updated AIRFLOW-6910: --- Affects Version/s: (was: 1.10.9) 2.0.0 > Fix kill_zombie method call > --- > > Key: AIRFLOW-6910 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6910 > Project: Apache Airflow > Issue Type: New Feature > Components: scheduler >Affects Versions: 2.0.0 >Reporter: Kamil Bregula >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (AIRFLOW-6910) Fix kill_zombie method call
[ https://issues.apache.org/jira/browse/AIRFLOW-6910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ash Berlin-Taylor resolved AIRFLOW-6910. Resolution: Fixed > Fix kill_zombie method call > --- > > Key: AIRFLOW-6910 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6910 > Project: Apache Airflow > Issue Type: New Feature > Components: scheduler >Affects Versions: 1.10.9 >Reporter: Kamil Bregula >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] Fokko commented on a change in pull request #7532: [AIRFLOW-XXXX] Fix outdated doc on settings.policy
Fokko commented on a change in pull request #7532: [AIRFLOW-] Fix outdated doc on settings.policy URL: https://github.com/apache/airflow/pull/7532#discussion_r383895918 ## File path: airflow/settings.py ## @@ -79,28 +79,26 @@ json = json -def policy(task_instance): +def policy(task): Review comment: When the user copies the function, they'll also copy the annotation, makes the code more robust :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on issue #7531: [AIRFLOW-6910] Fix kill_zombie method call
ashb commented on issue #7531: [AIRFLOW-6910] Fix kill_zombie method call URL: https://github.com/apache/airflow/pull/7531#issuecomment-590880959 @mik-laj @Fokko In situations like this can we please get in to the habit of targeting the bug fix at the same Jira issue as the original breaking change -- that way if we ever need to backport it we get the change _and_ the fix. Two jiras means the fix is likely to get missed. (Plus right now one PR = one Jira is just busy work, if we're going to do this lets just stop using Jira) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (AIRFLOW-6867) Decouple DagBag and TaskInstance
[ https://issues.apache.org/jira/browse/AIRFLOW-6867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17044460#comment-17044460 ] ASF GitHub Bot commented on AIRFLOW-6867: - ashb commented on pull request #7534: [AIRFLOW-6867] Fix bug in how we call kill_zombies URL: https://github.com/apache/airflow/pull/7534 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Decouple DagBag and TaskInstance > > > Key: AIRFLOW-6867 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6867 > Project: Apache Airflow > Issue Type: Bug > Components: DAG >Affects Versions: 1.10.9 >Reporter: Kamil Bregula >Assignee: Kamil Bregula >Priority: Major > Fix For: 2.0.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] ashb closed pull request #7534: [AIRFLOW-6867] Fix bug in how we call kill_zombies
ashb closed pull request #7534: [AIRFLOW-6867] Fix bug in how we call kill_zombies URL: https://github.com/apache/airflow/pull/7534 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7532: [AIRFLOW-XXXX] Fix outdated doc on settings.policy
kaxil commented on a change in pull request #7532: [AIRFLOW-] Fix outdated doc on settings.policy URL: https://github.com/apache/airflow/pull/7532#discussion_r383887968 ## File path: airflow/settings.py ## @@ -79,28 +79,26 @@ json = json -def policy(task_instance): +def policy(task): Review comment: Can add but this function needs to be overriden by users in their `airflow_local_settings.py` - so adding type annotation here won't have any impact I feel. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] Fokko commented on a change in pull request #7532: [AIRFLOW-XXXX] Fix outdated doc on settings.policy
Fokko commented on a change in pull request #7532: [AIRFLOW-] Fix outdated doc on settings.policy URL: https://github.com/apache/airflow/pull/7532#discussion_r383884871 ## File path: airflow/settings.py ## @@ -79,28 +79,26 @@ json = json -def policy(task_instance): +def policy(task): Review comment: Maybe add a type annotation here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (AIRFLOW-6867) Decouple DagBag and TaskInstance
[ https://issues.apache.org/jira/browse/AIRFLOW-6867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fokko Driesprong reassigned AIRFLOW-6867: - Assignee: Kamil Bregula > Decouple DagBag and TaskInstance > > > Key: AIRFLOW-6867 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6867 > Project: Apache Airflow > Issue Type: Bug > Components: DAG >Affects Versions: 1.10.9 >Reporter: Kamil Bregula >Assignee: Kamil Bregula >Priority: Major > Fix For: 2.0.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (AIRFLOW-6910) Fix kill_zombie method call
[ https://issues.apache.org/jira/browse/AIRFLOW-6910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1703#comment-1703 ] ASF subversion and git services commented on AIRFLOW-6910: -- Commit a62bb619320544404271dd1a41eaf398f00aeef9 in airflow's branch refs/heads/master from Kamil Breguła [ https://gitbox.apache.org/repos/asf?p=airflow.git;h=a62bb61 ] [AIRFLOW-6910] Fix kill_zombie method call (#7531) > Fix kill_zombie method call > --- > > Key: AIRFLOW-6910 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6910 > Project: Apache Airflow > Issue Type: New Feature > Components: scheduler >Affects Versions: 1.10.9 >Reporter: Kamil Bregula >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (AIRFLOW-6910) Fix kill_zombie method call
[ https://issues.apache.org/jira/browse/AIRFLOW-6910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1702#comment-1702 ] ASF GitHub Bot commented on AIRFLOW-6910: - Fokko commented on pull request #7531: [AIRFLOW-6910] Fix kill_zombie method call URL: https://github.com/apache/airflow/pull/7531 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix kill_zombie method call > --- > > Key: AIRFLOW-6910 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6910 > Project: Apache Airflow > Issue Type: New Feature > Components: scheduler >Affects Versions: 1.10.9 >Reporter: Kamil Bregula >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] Fokko commented on issue #7531: [AIRFLOW-6910] Fix kill_zombie method call
Fokko commented on issue #7531: [AIRFLOW-6910] Fix kill_zombie method call URL: https://github.com/apache/airflow/pull/7531#issuecomment-590871285 Thanks @mik-laj This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] Fokko commented on issue #7534: [AIRFLOW-6867] Fix bug in how we call kill_zombies
Fokko commented on issue #7534: [AIRFLOW-6867] Fix bug in how we call kill_zombies URL: https://github.com/apache/airflow/pull/7534#issuecomment-590871188 Duplicate of https://github.com/apache/airflow/pull/7531/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] Fokko merged pull request #7531: [AIRFLOW-6910] Fix kill_zombie method call
Fokko merged pull request #7531: [AIRFLOW-6910] Fix kill_zombie method call URL: https://github.com/apache/airflow/pull/7531 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (AIRFLOW-6914) Add a default robots.txt to deny all search engines
[ https://issues.apache.org/jira/browse/AIRFLOW-6914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kaxil Naik updated AIRFLOW-6914: Description: If the Airflow UI is public, Google can index it and if the Authentication has not been enabled it is a serious security threat if it is a prod cluster. Something like this probably should work {code:python} @app.route('/robots.txt', methods=['GET']) def robotstxt(): return send_from_directory(os.path.join(app.root_path, 'static', 'txt'), 'robots.txt') {code} was:If the Airflow UI is public, Google can index it and if the Authentication has not been enabled it is a serious security threat if it is a prod cluster > Add a default robots.txt to deny all search engines > --- > > Key: AIRFLOW-6914 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6914 > Project: Apache Airflow > Issue Type: Improvement > Components: security, ui >Affects Versions: 1.10.6, 1.10.7, 1.10.8, 1.10.9 >Reporter: Kaxil Naik >Assignee: Kaxil Naik >Priority: Major > > If the Airflow UI is public, Google can index it and if the Authentication > has not been enabled it is a serious security threat if it is a prod cluster. > Something like this probably should work > {code:python} > @app.route('/robots.txt', methods=['GET']) > def robotstxt(): > return send_from_directory(os.path.join(app.root_path, 'static', 'txt'), >'robots.txt') > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (AIRFLOW-6914) Add a default robots.txt to deny all search engines
Kaxil Naik created AIRFLOW-6914: --- Summary: Add a default robots.txt to deny all search engines Key: AIRFLOW-6914 URL: https://issues.apache.org/jira/browse/AIRFLOW-6914 Project: Apache Airflow Issue Type: Improvement Components: security, ui Affects Versions: 1.10.9, 1.10.8, 1.10.7, 1.10.6 Reporter: Kaxil Naik Assignee: Kaxil Naik If the Airflow UI is public, Google can index it and if the Authentication has not been enabled it is a serious security threat if it is a prod cluster -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [airflow] kaxil commented on a change in pull request #7534: [AIRFLOW-6867] Fix bug in how we call kill_zombies
kaxil commented on a change in pull request #7534: [AIRFLOW-6867] Fix bug in how we call kill_zombies URL: https://github.com/apache/airflow/pull/7534#discussion_r383878207 ## File path: airflow/jobs/scheduler_job.py ## @@ -877,7 +877,7 @@ def process_file(self, file_path, zombies, pickle_dags=False, session=None): except Exception: self.log.exception("Error logging import errors!") try: -self.kill_zombies(zombies) +self.kill_zombies(dagbag, zombies) Review comment: Maybe pylint would have caught it but it is currently in **pylint_todo**, so might have missed: https://github.com/apache/airflow/blob/6eaa7e3b1845644d5ec65a00a997f4029bec9628/scripts/ci/pylint_todo.txt#L11 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7534: [AIRFLOW-6867] Fix bug in how we call kill_zombies
kaxil commented on a change in pull request #7534: [AIRFLOW-6867] Fix bug in how we call kill_zombies URL: https://github.com/apache/airflow/pull/7534#discussion_r383878207 ## File path: airflow/jobs/scheduler_job.py ## @@ -877,7 +877,7 @@ def process_file(self, file_path, zombies, pickle_dags=False, session=None): except Exception: self.log.exception("Error logging import errors!") try: -self.kill_zombies(zombies) +self.kill_zombies(dagbag, zombies) Review comment: Maybe pylint would have caught it but it is currently in **ToDo**: https://github.com/apache/airflow/blob/6eaa7e3b1845644d5ec65a00a997f4029bec9628/scripts/ci/pylint_todo.txt#L11 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7534: [AIRFLOW-6867] Fix bug in how we call kill_zombies
kaxil commented on a change in pull request #7534: [AIRFLOW-6867] Fix bug in how we call kill_zombies URL: https://github.com/apache/airflow/pull/7534#discussion_r383875096 ## File path: airflow/jobs/scheduler_job.py ## @@ -877,7 +877,7 @@ def process_file(self, file_path, zombies, pickle_dags=False, session=None): except Exception: self.log.exception("Error logging import errors!") try: -self.kill_zombies(zombies) +self.kill_zombies(dagbag, zombies) Review comment: Surprised that our static checkers didn't catch this! Required arg was not passed ! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #7534: [AIRFLOW-6867] Fix bug in how we call kill_zombies
ashb commented on a change in pull request #7534: [AIRFLOW-6867] Fix bug in how we call kill_zombies URL: https://github.com/apache/airflow/pull/7534#discussion_r383876717 ## File path: airflow/jobs/scheduler_job.py ## @@ -877,7 +877,7 @@ def process_file(self, file_path, zombies, pickle_dags=False, session=None): except Exception: self.log.exception("Error logging import errors!") try: -self.kill_zombies(zombies) +self.kill_zombies(dagbag, zombies) Review comment: Yeah quite! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] kaxil commented on a change in pull request #7534: [AIRFLOW-6867] Fix bug in how we call kill_zombies
kaxil commented on a change in pull request #7534: [AIRFLOW-6867] Fix bug in how we call kill_zombies URL: https://github.com/apache/airflow/pull/7534#discussion_r383875096 ## File path: airflow/jobs/scheduler_job.py ## @@ -877,7 +877,7 @@ def process_file(self, file_path, zombies, pickle_dags=False, session=None): except Exception: self.log.exception("Error logging import errors!") try: -self.kill_zombies(zombies) +self.kill_zombies(dagbag, zombies) Review comment: Surprised that our static checkers didn't catch this? Required arg was not passed ! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #7527: [AIRFLOW-6907] Simplify SchedulerJob
ashb commented on a change in pull request #7527: [AIRFLOW-6907] Simplify SchedulerJob URL: https://github.com/apache/airflow/pull/7527#discussion_r383874965 ## File path: airflow/jobs/scheduler_job.py ## @@ -1098,37 +1094,20 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None): TI = models.TaskInstance DR = models.DagRun DM = models.DagModel -ti_query = ( +task_instances_to_examine = ( session .query(TI) .filter(TI.dag_id.in_(simple_dag_bag.dag_ids)) .outerjoin( -DR, -and_(DR.dag_id == TI.dag_id, DR.execution_date == TI.execution_date) +DR, and_(DR.dag_id == TI.dag_id, DR.execution_date == TI.execution_date) ) -.filter(or_(DR.run_id == None, # noqa: E711 pylint: disable=singleton-comparison -not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%' +.filter(or_(DR.run_id.is_(None), not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%' .outerjoin(DM, DM.dag_id == TI.dag_id) -.filter(or_(DM.dag_id == None, # noqa: E711 pylint: disable=singleton-comparison -not_(DM.is_paused))) +.filter(or_(DM.dag_id.is_(None), not_(DM.is_paused))) Review comment: (Sorry for not being clear. This comment was not about this PR, but about the other one) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] JavierLopezT commented on a change in pull request #6670: [AIRFLOW-4816]MySqlToS3Operator
JavierLopezT commented on a change in pull request #6670: [AIRFLOW-4816]MySqlToS3Operator URL: https://github.com/apache/airflow/pull/6670#discussion_r383873800 ## File path: airflow/operators/mysql_to_s3_operator.py ## @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Transfer data from MySQL into a S3 bucket +""" +from io import StringIO +from typing import Optional, Union + +import numpy as np +import pandas as pd + +from airflow.hooks.mysql_hook import MySqlHook +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.utils.decorators import apply_defaults + + +class MySQLToS3Operator(BaseOperator): +""" +Saves data from an specific MySQL query into a file in S3. + +:param query: the sql query to be executed. +:type query: str +:param s3_bucket: bucket where the data will be stored +:type s3_bucket: str +:param s3_key: desired key for the file. It includes the name of the file. +If a csv file is wanted, the param must end with ".csv". +:type s3_key: str +:param mysql_conn_id: reference to a specific mysql database +:type mysql_conn_id: str + :param aws_conn_id: reference to a specific S3 connection +:type aws_conn_id: str +:param verify: Whether or not to verify SSL certificates for S3 connection. +By default SSL certificates are verified. +You can provide the following values: + +- False: do not validate SSL certificates. SSL will still be used + (unless use_ssl is False), but SSL certificates will not be + verified. +- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses. + You can specify this argument if you want to use a different + CA cert bundle than the one used by botocore. +:type verify: bool or str +""" + +@apply_defaults +def __init__( +self, +query: str, +s3_bucket: str, +s3_key: str, +mysql_conn_id: str = 'mysql_default', +aws_conn_id: str = 'aws_default', +verify: Optional[Union[bool, str]] = None, +header: Optional[bool] = False, +index: Optional[bool] = False, +*args, **kwargs) -> None: +super().__init__(*args, **kwargs) +self.query = query +self.s3_bucket = s3_bucket +self.s3_key = s3_key +self.mysql_conn_id = mysql_conn_id +self.aws_conn_id = aws_conn_id +self.verify = verify +self.header = header +self.index = index + +def fill_na_with_none(self, series): +"""Replace NaN values with None""" +return np.where(series.isnull(), None, series) + +def fix_int_dtypes(self, df): +""" +Mutate DataFrame to set dtypes for int columns containing NaN values." +""" +for col in df: +if "float" in df[col].dtype.name and df[col].hasnans: +# inspect values to determine if dtype of non-null values is int or float +notna_series = df[col].dropna().values +if np.isclose(notna_series, notna_series.astype(int)).all(): +# set to dtype that retains integers and supports NaNs +df[col] = self.fill_na_with_none(df[col]).astype(pd.Int64Dtype) + +def execute(self, context, **kwargs): +self.hook = MySqlHook(mysql_conn_id=self.mysql_conn_id) +self.s3_conn = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify) +data_df = self.hook.get_pandas_df(self.query) +self.log.info("Data from MySQL obtained") + +self.fix_int_dtypes(data_df) +file_obj = StringIO() Review comment: I have been testing this. Unfortunately (with the changes I had to make with pickle) this doesn't generate a readable csv. I have been trying to imitate the functionality of NamedTemporaryFile using: ``` with open('tmp_file.csv', 'w+') as tmp_csv: df.to_csv(tmp_csv) temporary_file_path = os.path.realpath(tmp_csv.name) os.remove(temporary_file_path) ``` However, Ubuntu s
[jira] [Commented] (AIRFLOW-6867) Decouple DagBag and TaskInstance
[ https://issues.apache.org/jira/browse/AIRFLOW-6867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17044426#comment-17044426 ] ASF GitHub Bot commented on AIRFLOW-6867: - ashb commented on pull request #7534: [AIRFLOW-6867] Fix bug in kill zombies URL: https://github.com/apache/airflow/pull/7534 The refactor in #7488 introduced a scheduler-breaking bug. I am not happy about merging this without a unit test to stop it breaking again, but given how much process_file does it's hard to test. Thoughts? --- Issue link: WILL BE INSERTED BY [boring-cyborg](https://github.com/kaxil/boring-cyborg) Make sure to mark the boxes below before creating PR: [x] - [ ] Description above provides context of the change - [ ] Commit message/PR title starts with `[AIRFLOW-]`. AIRFLOW- = JIRA ID* - [ ] Unit tests coverage for changes (not needed for documentation changes) - [ ] Commits follow "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)" - [ ] Relevant documentation is updated including usage instructions. - [ ] I will engage committers as explained in [Contribution Workflow Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example). * For document-only changes commit message can start with `[AIRFLOW-]`. --- In case of fundamental code change, Airflow Improvement Proposal ([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals)) is needed. In case of a new dependency, check compliance with the [ASF 3rd Party License Policy](https://www.apache.org/legal/resolved.html#category-x). In case of backwards incompatible changes please leave a note in [UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md). Read the [Pull Request Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines) for more information. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Decouple DagBag and TaskInstance > > > Key: AIRFLOW-6867 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6867 > Project: Apache Airflow > Issue Type: Bug > Components: DAG >Affects Versions: 1.10.9 >Reporter: Kamil Bregula >Priority: Major > Fix For: 2.0.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)