[jira] [Created] (AIRFLOW-6916) Replace SimpleDag & SimpleDagBag representation with Serialized Dag

2020-02-25 Thread Kaxil Naik (Jira)
Kaxil Naik created AIRFLOW-6916:
---

 Summary: Replace SimpleDag & SimpleDagBag representation with 
Serialized Dag
 Key: AIRFLOW-6916
 URL: https://issues.apache.org/jira/browse/AIRFLOW-6916
 Project: Apache Airflow
  Issue Type: Improvement
  Components: serialization
Affects Versions: 2.0.0
Reporter: Kaxil Naik
Assignee: Kaxil Naik
 Fix For: 2.0.0


Currently, we have 2 Serialized Representation:
* SimpleDags (were created because SimpleDags were not pickleable)
* Serialized DAG


We should remove SimpleDags



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-6916) Replace SimpleDag & SimpleDagBag representation with Serialized Dag

2020-02-25 Thread Kaxil Naik (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kaxil Naik updated AIRFLOW-6916:

Labels: dag-serialization  (was: )

> Replace SimpleDag & SimpleDagBag representation with Serialized Dag
> ---
>
> Key: AIRFLOW-6916
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6916
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: serialization
>Affects Versions: 2.0.0
>Reporter: Kaxil Naik
>Assignee: Kaxil Naik
>Priority: Major
>  Labels: dag-serialization
> Fix For: 2.0.0
>
>
> Currently, we have 2 Serialized Representation:
> * SimpleDags (were created because SimpleDags were not pickleable)
> * Serialized DAG
> We should remove SimpleDags



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] kaxil commented on a change in pull request #7535: [AIRFLOW-6915] Adds AI Platform Console link for MLEngineStartTrainingOperator

2020-02-25 Thread GitBox
kaxil commented on a change in pull request #7535: [AIRFLOW-6915] Adds AI 
Platform Console link for MLEngineStartTrainingOperator
URL: https://github.com/apache/airflow/pull/7535#discussion_r384106760
 
 

 ##
 File path: tests/providers/google/cloud/operators/test_mlengine.py
 ##
 @@ -404,6 +405,34 @@ def test_failed_job_error(self, mock_hook):
 project_id='test-project', job=self.TRAINING_INPUT, 
use_existing_job_fn=ANY)
 self.assertEqual('A failure message', str(context.exception))
 
+@patch('airflow.providers.google.cloud.operators.mlengine.MLEngineHook')
+def test_console_extra_link(self, mock_hook):
+training_op = MLEngineStartTrainingJobOperator(
+**self.TRAINING_DEFAULT_ARGS)
+
+ti = TaskInstance(
+task=training_op,
+execution_date=DEFAULT_DATE,
+)
+
+job_id = self.TRAINING_DEFAULT_ARGS['job_id']
+project_id = self.TRAINING_DEFAULT_ARGS['project_id']
+gcp_metadata = {
+"job_id": job_id,
+"project_id": project_id,
+}
+ti.xcom_push(key='gcp_metadata', value=gcp_metadata)
+
+self.assertEqual(
+
f"https://console.cloud.google.com/ai-platform/jobs/{job_id}?project={project_id}";,
+training_op.get_extra_links(DEFAULT_DATE, 
AIPlatformConsoleLink.name),
+)
+
+self.assertEqual(
+'',
+training_op.get_extra_links(datetime.datetime(2019, 1, 1), 
AIPlatformConsoleLink.name),
+)
+
 
 Review comment:
   We need to add a test to check this Operator link works well with 
Serialization like:
   
   
https://github.com/apache/airflow/blob/746d8de2fcaa9554b4ce7dbf261e4ab148233222/tests/providers/google/cloud/operators/test_bigquery.py#L503-L538


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] jj-ookla commented on a change in pull request #6670: [AIRFLOW-4816]MySqlToS3Operator

2020-02-25 Thread GitBox
jj-ookla commented on a change in pull request #6670: 
[AIRFLOW-4816]MySqlToS3Operator
URL: https://github.com/apache/airflow/pull/6670#discussion_r384067287
 
 

 ##
 File path: airflow/operators/mysql_to_s3_operator.py
 ##
 @@ -0,0 +1,117 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Transfer data from MySQL into a S3 bucket
+"""
+from io import StringIO
+from typing import Optional, Union
+
+import numpy as np
+import pandas as pd
+
+from airflow.hooks.mysql_hook import MySqlHook
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.utils.decorators import apply_defaults
+
+
+class MySQLToS3Operator(BaseOperator):
+"""
+Saves data from an specific MySQL query into a file in S3.
+
+:param query: the sql query to be executed.
+:type query: str
+:param s3_bucket: bucket where the data will be stored
+:type s3_bucket: str
+:param s3_key: desired key for the file. It includes the name of the file.
+If a csv file is wanted, the param must end with ".csv".
+:type s3_key: str
+:param mysql_conn_id: reference to a specific mysql database
+:type mysql_conn_id: str
+ :param aws_conn_id: reference to a specific S3 connection
+:type aws_conn_id: str
+:param verify: Whether or not to verify SSL certificates for S3 connection.
+By default SSL certificates are verified.
+You can provide the following values:
+
+- False: do not validate SSL certificates. SSL will still be used
+ (unless use_ssl is False), but SSL certificates will not be
+ verified.
+- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
+ You can specify this argument if you want to use a different
+ CA cert bundle than the one used by botocore.
+:type verify: bool or str
+"""
+
+@apply_defaults
+def __init__(
+self,
+query: str,
+s3_bucket: str,
+s3_key: str,
+mysql_conn_id: str = 'mysql_default',
+aws_conn_id: str = 'aws_default',
+verify: Optional[Union[bool, str]] = None,
+header: Optional[bool] = False,
+index: Optional[bool] = False,
+*args, **kwargs) -> None:
+super().__init__(*args, **kwargs)
+self.query = query
+self.s3_bucket = s3_bucket
+self.s3_key = s3_key
+self.mysql_conn_id = mysql_conn_id
+self.aws_conn_id = aws_conn_id
+self.verify = verify
+self.header = header
+self.index = index
+
+def fill_na_with_none(self, series):
+"""Replace NaN values with None"""
+return np.where(series.isnull(), None, series)
+
+def fix_int_dtypes(self, df):
+"""
+Mutate DataFrame to set dtypes for int columns containing NaN values."
+"""
+for col in df:
+if "float" in df[col].dtype.name and df[col].hasnans:
+# inspect values to determine if dtype of non-null values is 
int or float
+notna_series = df[col].dropna().values
+if np.isclose(notna_series, notna_series.astype(int)).all():
+# set to dtype that retains integers and supports NaNs
+df[col] = 
self.fill_na_with_none(df[col]).astype(pd.Int64Dtype)
+
+def execute(self, context, **kwargs):
+self.hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)
+self.s3_conn = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+data_df = self.hook.get_pandas_df(self.query)
+self.log.info("Data from MySQL obtained")
+
+self.fix_int_dtypes(data_df)
+file_obj = StringIO()
 
 Review comment:
   I think including the option is a good idea. The implementation is tricky to 
support index / header kwargs and any other supported by the pandas method. I 
have a custom operator that works similar to this and settled on an approach 
similar to what is below. The overall goal being to set `index=False` by 
default. 
   
   ```Python
   def __init__(
   self,
   ...,
   index=False,
   pd

[GitHub] [airflow] JavierLopezT commented on a change in pull request #6670: [AIRFLOW-4816]MySqlToS3Operator

2020-02-25 Thread GitBox
JavierLopezT commented on a change in pull request #6670: 
[AIRFLOW-4816]MySqlToS3Operator
URL: https://github.com/apache/airflow/pull/6670#discussion_r384056558
 
 

 ##
 File path: airflow/operators/mysql_to_s3_operator.py
 ##
 @@ -0,0 +1,117 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Transfer data from MySQL into a S3 bucket
+"""
+from io import StringIO
+from typing import Optional, Union
+
+import numpy as np
+import pandas as pd
+
+from airflow.hooks.mysql_hook import MySqlHook
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.utils.decorators import apply_defaults
+
+
+class MySQLToS3Operator(BaseOperator):
+"""
+Saves data from an specific MySQL query into a file in S3.
+
+:param query: the sql query to be executed.
+:type query: str
+:param s3_bucket: bucket where the data will be stored
+:type s3_bucket: str
+:param s3_key: desired key for the file. It includes the name of the file.
+If a csv file is wanted, the param must end with ".csv".
+:type s3_key: str
+:param mysql_conn_id: reference to a specific mysql database
+:type mysql_conn_id: str
+ :param aws_conn_id: reference to a specific S3 connection
+:type aws_conn_id: str
+:param verify: Whether or not to verify SSL certificates for S3 connection.
+By default SSL certificates are verified.
+You can provide the following values:
+
+- False: do not validate SSL certificates. SSL will still be used
+ (unless use_ssl is False), but SSL certificates will not be
+ verified.
+- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
+ You can specify this argument if you want to use a different
+ CA cert bundle than the one used by botocore.
+:type verify: bool or str
+"""
+
+@apply_defaults
+def __init__(
+self,
+query: str,
+s3_bucket: str,
+s3_key: str,
+mysql_conn_id: str = 'mysql_default',
+aws_conn_id: str = 'aws_default',
+verify: Optional[Union[bool, str]] = None,
+header: Optional[bool] = False,
+index: Optional[bool] = False,
+*args, **kwargs) -> None:
+super().__init__(*args, **kwargs)
+self.query = query
+self.s3_bucket = s3_bucket
+self.s3_key = s3_key
+self.mysql_conn_id = mysql_conn_id
+self.aws_conn_id = aws_conn_id
+self.verify = verify
+self.header = header
+self.index = index
+
+def fill_na_with_none(self, series):
+"""Replace NaN values with None"""
+return np.where(series.isnull(), None, series)
+
+def fix_int_dtypes(self, df):
+"""
+Mutate DataFrame to set dtypes for int columns containing NaN values."
+"""
+for col in df:
+if "float" in df[col].dtype.name and df[col].hasnans:
+# inspect values to determine if dtype of non-null values is 
int or float
+notna_series = df[col].dropna().values
+if np.isclose(notna_series, notna_series.astype(int)).all():
+# set to dtype that retains integers and supports NaNs
+df[col] = 
self.fill_na_with_none(df[col]).astype(pd.Int64Dtype)
+
+def execute(self, context, **kwargs):
+self.hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)
+self.s3_conn = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+data_df = self.hook.get_pandas_df(self.query)
+self.log.info("Data from MySQL obtained")
+
+self.fix_int_dtypes(data_df)
+file_obj = StringIO()
 
 Review comment:
   Thank you very much for your help. Regarding the kwargs, I already had 
included both index and header in the __init__ as 
   ```
   header: Optional[bool] = False,
   index: Optional[bool] = False
   ```
   But I indeed forgot to include them in the code. Do you recommend to change 
that to your pd_csv_kwargs? 


This is an automated me

[GitHub] [airflow] JavierLopezT commented on a change in pull request #6670: [AIRFLOW-4816]MySqlToS3Operator

2020-02-25 Thread GitBox
JavierLopezT commented on a change in pull request #6670: 
[AIRFLOW-4816]MySqlToS3Operator
URL: https://github.com/apache/airflow/pull/6670#discussion_r384056558
 
 

 ##
 File path: airflow/operators/mysql_to_s3_operator.py
 ##
 @@ -0,0 +1,117 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Transfer data from MySQL into a S3 bucket
+"""
+from io import StringIO
+from typing import Optional, Union
+
+import numpy as np
+import pandas as pd
+
+from airflow.hooks.mysql_hook import MySqlHook
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.utils.decorators import apply_defaults
+
+
+class MySQLToS3Operator(BaseOperator):
+"""
+Saves data from an specific MySQL query into a file in S3.
+
+:param query: the sql query to be executed.
+:type query: str
+:param s3_bucket: bucket where the data will be stored
+:type s3_bucket: str
+:param s3_key: desired key for the file. It includes the name of the file.
+If a csv file is wanted, the param must end with ".csv".
+:type s3_key: str
+:param mysql_conn_id: reference to a specific mysql database
+:type mysql_conn_id: str
+ :param aws_conn_id: reference to a specific S3 connection
+:type aws_conn_id: str
+:param verify: Whether or not to verify SSL certificates for S3 connection.
+By default SSL certificates are verified.
+You can provide the following values:
+
+- False: do not validate SSL certificates. SSL will still be used
+ (unless use_ssl is False), but SSL certificates will not be
+ verified.
+- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
+ You can specify this argument if you want to use a different
+ CA cert bundle than the one used by botocore.
+:type verify: bool or str
+"""
+
+@apply_defaults
+def __init__(
+self,
+query: str,
+s3_bucket: str,
+s3_key: str,
+mysql_conn_id: str = 'mysql_default',
+aws_conn_id: str = 'aws_default',
+verify: Optional[Union[bool, str]] = None,
+header: Optional[bool] = False,
+index: Optional[bool] = False,
+*args, **kwargs) -> None:
+super().__init__(*args, **kwargs)
+self.query = query
+self.s3_bucket = s3_bucket
+self.s3_key = s3_key
+self.mysql_conn_id = mysql_conn_id
+self.aws_conn_id = aws_conn_id
+self.verify = verify
+self.header = header
+self.index = index
+
+def fill_na_with_none(self, series):
+"""Replace NaN values with None"""
+return np.where(series.isnull(), None, series)
+
+def fix_int_dtypes(self, df):
+"""
+Mutate DataFrame to set dtypes for int columns containing NaN values."
+"""
+for col in df:
+if "float" in df[col].dtype.name and df[col].hasnans:
+# inspect values to determine if dtype of non-null values is 
int or float
+notna_series = df[col].dropna().values
+if np.isclose(notna_series, notna_series.astype(int)).all():
+# set to dtype that retains integers and supports NaNs
+df[col] = 
self.fill_na_with_none(df[col]).astype(pd.Int64Dtype)
+
+def execute(self, context, **kwargs):
+self.hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)
+self.s3_conn = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+data_df = self.hook.get_pandas_df(self.query)
+self.log.info("Data from MySQL obtained")
+
+self.fix_int_dtypes(data_df)
+file_obj = StringIO()
 
 Review comment:
   Thank you very much for your help. Regarding the kwargs, I already had 
included both index and header in the __init__ as 
   ```
   header: Optional[bool] = False,
   index: Optional[bool] = False
   ```
   But I indeed forgot to include them in the code. Do you recommend to change 
that to your pd_csv_kwargs? 


This is an 

[GitHub] [airflow] JavierLopezT commented on a change in pull request #6670: [AIRFLOW-4816]MySqlToS3Operator

2020-02-25 Thread GitBox
JavierLopezT commented on a change in pull request #6670: 
[AIRFLOW-4816]MySqlToS3Operator
URL: https://github.com/apache/airflow/pull/6670#discussion_r384056558
 
 

 ##
 File path: airflow/operators/mysql_to_s3_operator.py
 ##
 @@ -0,0 +1,117 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Transfer data from MySQL into a S3 bucket
+"""
+from io import StringIO
+from typing import Optional, Union
+
+import numpy as np
+import pandas as pd
+
+from airflow.hooks.mysql_hook import MySqlHook
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.utils.decorators import apply_defaults
+
+
+class MySQLToS3Operator(BaseOperator):
+"""
+Saves data from an specific MySQL query into a file in S3.
+
+:param query: the sql query to be executed.
+:type query: str
+:param s3_bucket: bucket where the data will be stored
+:type s3_bucket: str
+:param s3_key: desired key for the file. It includes the name of the file.
+If a csv file is wanted, the param must end with ".csv".
+:type s3_key: str
+:param mysql_conn_id: reference to a specific mysql database
+:type mysql_conn_id: str
+ :param aws_conn_id: reference to a specific S3 connection
+:type aws_conn_id: str
+:param verify: Whether or not to verify SSL certificates for S3 connection.
+By default SSL certificates are verified.
+You can provide the following values:
+
+- False: do not validate SSL certificates. SSL will still be used
+ (unless use_ssl is False), but SSL certificates will not be
+ verified.
+- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
+ You can specify this argument if you want to use a different
+ CA cert bundle than the one used by botocore.
+:type verify: bool or str
+"""
+
+@apply_defaults
+def __init__(
+self,
+query: str,
+s3_bucket: str,
+s3_key: str,
+mysql_conn_id: str = 'mysql_default',
+aws_conn_id: str = 'aws_default',
+verify: Optional[Union[bool, str]] = None,
+header: Optional[bool] = False,
+index: Optional[bool] = False,
+*args, **kwargs) -> None:
+super().__init__(*args, **kwargs)
+self.query = query
+self.s3_bucket = s3_bucket
+self.s3_key = s3_key
+self.mysql_conn_id = mysql_conn_id
+self.aws_conn_id = aws_conn_id
+self.verify = verify
+self.header = header
+self.index = index
+
+def fill_na_with_none(self, series):
+"""Replace NaN values with None"""
+return np.where(series.isnull(), None, series)
+
+def fix_int_dtypes(self, df):
+"""
+Mutate DataFrame to set dtypes for int columns containing NaN values."
+"""
+for col in df:
+if "float" in df[col].dtype.name and df[col].hasnans:
+# inspect values to determine if dtype of non-null values is 
int or float
+notna_series = df[col].dropna().values
+if np.isclose(notna_series, notna_series.astype(int)).all():
+# set to dtype that retains integers and supports NaNs
+df[col] = 
self.fill_na_with_none(df[col]).astype(pd.Int64Dtype)
+
+def execute(self, context, **kwargs):
+self.hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)
+self.s3_conn = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+data_df = self.hook.get_pandas_df(self.query)
+self.log.info("Data from MySQL obtained")
+
+self.fix_int_dtypes(data_df)
+file_obj = StringIO()
 
 Review comment:
   Thank you very much for your help. Regarding the kwargs, I already had 
included both index and header in the __init__ as ```
   header: Optional[bool] = False,
   index: Optional[bool] = False
   ```
   But I indeed forgot to include them in the code. Do you recommend to change 
that to your pd_csv_kwargs? 


This is an auto

[GitHub] [airflow] jj-ookla commented on a change in pull request #6670: [AIRFLOW-4816]MySqlToS3Operator

2020-02-25 Thread GitBox
jj-ookla commented on a change in pull request #6670: 
[AIRFLOW-4816]MySqlToS3Operator
URL: https://github.com/apache/airflow/pull/6670#discussion_r384052041
 
 

 ##
 File path: airflow/operators/mysql_to_s3_operator.py
 ##
 @@ -0,0 +1,117 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Transfer data from MySQL into a S3 bucket
+"""
+from io import StringIO
+from typing import Optional, Union
+
+import numpy as np
+import pandas as pd
+
+from airflow.hooks.mysql_hook import MySqlHook
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.utils.decorators import apply_defaults
+
+
+class MySQLToS3Operator(BaseOperator):
+"""
+Saves data from an specific MySQL query into a file in S3.
+
+:param query: the sql query to be executed.
+:type query: str
+:param s3_bucket: bucket where the data will be stored
+:type s3_bucket: str
+:param s3_key: desired key for the file. It includes the name of the file.
+If a csv file is wanted, the param must end with ".csv".
+:type s3_key: str
+:param mysql_conn_id: reference to a specific mysql database
+:type mysql_conn_id: str
+ :param aws_conn_id: reference to a specific S3 connection
+:type aws_conn_id: str
+:param verify: Whether or not to verify SSL certificates for S3 connection.
+By default SSL certificates are verified.
+You can provide the following values:
+
+- False: do not validate SSL certificates. SSL will still be used
+ (unless use_ssl is False), but SSL certificates will not be
+ verified.
+- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
+ You can specify this argument if you want to use a different
+ CA cert bundle than the one used by botocore.
+:type verify: bool or str
+"""
+
+@apply_defaults
+def __init__(
+self,
+query: str,
+s3_bucket: str,
+s3_key: str,
+mysql_conn_id: str = 'mysql_default',
+aws_conn_id: str = 'aws_default',
+verify: Optional[Union[bool, str]] = None,
+header: Optional[bool] = False,
+index: Optional[bool] = False,
+*args, **kwargs) -> None:
+super().__init__(*args, **kwargs)
+self.query = query
+self.s3_bucket = s3_bucket
+self.s3_key = s3_key
+self.mysql_conn_id = mysql_conn_id
+self.aws_conn_id = aws_conn_id
+self.verify = verify
+self.header = header
+self.index = index
+
+def fill_na_with_none(self, series):
+"""Replace NaN values with None"""
+return np.where(series.isnull(), None, series)
+
+def fix_int_dtypes(self, df):
+"""
+Mutate DataFrame to set dtypes for int columns containing NaN values."
+"""
+for col in df:
+if "float" in df[col].dtype.name and df[col].hasnans:
+# inspect values to determine if dtype of non-null values is 
int or float
+notna_series = df[col].dropna().values
+if np.isclose(notna_series, notna_series.astype(int)).all():
+# set to dtype that retains integers and supports NaNs
+df[col] = 
self.fill_na_with_none(df[col]).astype(pd.Int64Dtype)
+
+def execute(self, context, **kwargs):
+self.hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)
+self.s3_conn = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+data_df = self.hook.get_pandas_df(self.query)
+self.log.info("Data from MySQL obtained")
+
+self.fix_int_dtypes(data_df)
+file_obj = StringIO()
 
 Review comment:
   Gotcha. This has been an issue for me in the past, as well. 
   
   `NamedTemporaryFile` needs to be open in `r+` to allow for writing / reading 
strings to support: 
   - `df.to_csv` string output
   - s3_hook reading from file 
   
   One caveat is that the to_csv kwarg `index` should be set to False to 
enforce compatibility when loading from S3 into Redshift / Athena / MySQL. I 
would recommend inclu

[GitHub] [airflow] jj-ookla commented on a change in pull request #6670: [AIRFLOW-4816]MySqlToS3Operator

2020-02-25 Thread GitBox
jj-ookla commented on a change in pull request #6670: 
[AIRFLOW-4816]MySqlToS3Operator
URL: https://github.com/apache/airflow/pull/6670#discussion_r384052041
 
 

 ##
 File path: airflow/operators/mysql_to_s3_operator.py
 ##
 @@ -0,0 +1,117 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Transfer data from MySQL into a S3 bucket
+"""
+from io import StringIO
+from typing import Optional, Union
+
+import numpy as np
+import pandas as pd
+
+from airflow.hooks.mysql_hook import MySqlHook
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.utils.decorators import apply_defaults
+
+
+class MySQLToS3Operator(BaseOperator):
+"""
+Saves data from an specific MySQL query into a file in S3.
+
+:param query: the sql query to be executed.
+:type query: str
+:param s3_bucket: bucket where the data will be stored
+:type s3_bucket: str
+:param s3_key: desired key for the file. It includes the name of the file.
+If a csv file is wanted, the param must end with ".csv".
+:type s3_key: str
+:param mysql_conn_id: reference to a specific mysql database
+:type mysql_conn_id: str
+ :param aws_conn_id: reference to a specific S3 connection
+:type aws_conn_id: str
+:param verify: Whether or not to verify SSL certificates for S3 connection.
+By default SSL certificates are verified.
+You can provide the following values:
+
+- False: do not validate SSL certificates. SSL will still be used
+ (unless use_ssl is False), but SSL certificates will not be
+ verified.
+- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
+ You can specify this argument if you want to use a different
+ CA cert bundle than the one used by botocore.
+:type verify: bool or str
+"""
+
+@apply_defaults
+def __init__(
+self,
+query: str,
+s3_bucket: str,
+s3_key: str,
+mysql_conn_id: str = 'mysql_default',
+aws_conn_id: str = 'aws_default',
+verify: Optional[Union[bool, str]] = None,
+header: Optional[bool] = False,
+index: Optional[bool] = False,
+*args, **kwargs) -> None:
+super().__init__(*args, **kwargs)
+self.query = query
+self.s3_bucket = s3_bucket
+self.s3_key = s3_key
+self.mysql_conn_id = mysql_conn_id
+self.aws_conn_id = aws_conn_id
+self.verify = verify
+self.header = header
+self.index = index
+
+def fill_na_with_none(self, series):
+"""Replace NaN values with None"""
+return np.where(series.isnull(), None, series)
+
+def fix_int_dtypes(self, df):
+"""
+Mutate DataFrame to set dtypes for int columns containing NaN values."
+"""
+for col in df:
+if "float" in df[col].dtype.name and df[col].hasnans:
+# inspect values to determine if dtype of non-null values is 
int or float
+notna_series = df[col].dropna().values
+if np.isclose(notna_series, notna_series.astype(int)).all():
+# set to dtype that retains integers and supports NaNs
+df[col] = 
self.fill_na_with_none(df[col]).astype(pd.Int64Dtype)
+
+def execute(self, context, **kwargs):
+self.hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)
+self.s3_conn = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+data_df = self.hook.get_pandas_df(self.query)
+self.log.info("Data from MySQL obtained")
+
+self.fix_int_dtypes(data_df)
+file_obj = StringIO()
 
 Review comment:
   Gotcha. This has been an issue for me in the past, as well. 
   
   One caveat is that the to_csv kwarg `index` should be set to False to 
enforce compatibility when loading from S3 into Redshift / Athena / MySQL. I 
would recommend including a `pd_csv_kwargs` in the operator to allow for users 
to set these options as needed. 
   
   ```Python
   
   # in operator 
   pd_csv_kwargs = {"index": False}
  

[GitHub] [airflow] brandonwillard edited a comment on issue #7423: [AIRFLOW-3126] Add option to specify additional K8s volumes

2020-02-25 Thread GitBox
brandonwillard edited a comment on issue #7423: [AIRFLOW-3126] Add option to 
specify additional K8s volumes
URL: https://github.com/apache/airflow/pull/7423#issuecomment-590958733
 
 
   @davlum, I've tried it and It did not appear to help—specifically, for 
default worker pod creation under the K8s Executor.  It required too much 
specificity in the template and seemed to deactivate most other configuration 
options.  Can you give an example of how it can be used to add volumes in 
exactly this way without sacrificing any of the existing 
functionality/configurability of worker pod creation?
   
   Also, I don't see how it could possibly help with 
https://github.com/apache/airflow/pull/7405.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] konpap94 edited a comment on issue #6337: [AIRFLOW-5659] - Add support for ephemeral storage on KubernetesPodOp…

2020-02-25 Thread GitBox
konpap94 edited a comment on issue #6337: [AIRFLOW-5659] - Add support for 
ephemeral storage on KubernetesPodOp…
URL: https://github.com/apache/airflow/pull/6337#issuecomment-587121417
 
 
   Is there any update on this being merged in? @potiuk 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] JavierLopezT commented on a change in pull request #6670: [AIRFLOW-4816]MySqlToS3Operator

2020-02-25 Thread GitBox
JavierLopezT commented on a change in pull request #6670: 
[AIRFLOW-4816]MySqlToS3Operator
URL: https://github.com/apache/airflow/pull/6670#discussion_r384031703
 
 

 ##
 File path: airflow/operators/mysql_to_s3_operator.py
 ##
 @@ -0,0 +1,117 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Transfer data from MySQL into a S3 bucket
+"""
+from io import StringIO
+from typing import Optional, Union
+
+import numpy as np
+import pandas as pd
+
+from airflow.hooks.mysql_hook import MySqlHook
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.utils.decorators import apply_defaults
+
+
+class MySQLToS3Operator(BaseOperator):
+"""
+Saves data from an specific MySQL query into a file in S3.
+
+:param query: the sql query to be executed.
+:type query: str
+:param s3_bucket: bucket where the data will be stored
+:type s3_bucket: str
+:param s3_key: desired key for the file. It includes the name of the file.
+If a csv file is wanted, the param must end with ".csv".
+:type s3_key: str
+:param mysql_conn_id: reference to a specific mysql database
+:type mysql_conn_id: str
+ :param aws_conn_id: reference to a specific S3 connection
+:type aws_conn_id: str
+:param verify: Whether or not to verify SSL certificates for S3 connection.
+By default SSL certificates are verified.
+You can provide the following values:
+
+- False: do not validate SSL certificates. SSL will still be used
+ (unless use_ssl is False), but SSL certificates will not be
+ verified.
+- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
+ You can specify this argument if you want to use a different
+ CA cert bundle than the one used by botocore.
+:type verify: bool or str
+"""
+
+@apply_defaults
+def __init__(
+self,
+query: str,
+s3_bucket: str,
+s3_key: str,
+mysql_conn_id: str = 'mysql_default',
+aws_conn_id: str = 'aws_default',
+verify: Optional[Union[bool, str]] = None,
+header: Optional[bool] = False,
+index: Optional[bool] = False,
+*args, **kwargs) -> None:
+super().__init__(*args, **kwargs)
+self.query = query
+self.s3_bucket = s3_bucket
+self.s3_key = s3_key
+self.mysql_conn_id = mysql_conn_id
+self.aws_conn_id = aws_conn_id
+self.verify = verify
+self.header = header
+self.index = index
+
+def fill_na_with_none(self, series):
+"""Replace NaN values with None"""
+return np.where(series.isnull(), None, series)
+
+def fix_int_dtypes(self, df):
+"""
+Mutate DataFrame to set dtypes for int columns containing NaN values."
+"""
+for col in df:
+if "float" in df[col].dtype.name and df[col].hasnans:
+# inspect values to determine if dtype of non-null values is 
int or float
+notna_series = df[col].dropna().values
+if np.isclose(notna_series, notna_series.astype(int)).all():
+# set to dtype that retains integers and supports NaNs
+df[col] = 
self.fill_na_with_none(df[col]).astype(pd.Int64Dtype)
+
+def execute(self, context, **kwargs):
+self.hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)
+self.s3_conn = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+data_df = self.hook.get_pandas_df(self.query)
+self.log.info("Data from MySQL obtained")
+
+self.fix_int_dtypes(data_df)
+file_obj = StringIO()
 
 Review comment:
   > Why is the DataFrame pickled then written to a file with a csv extension? 
This would definitely explain why the csv is unreadable.
   > 
   > 
https://github.com/apache/airflow/blob/69ff9f3dbf25ecefc627f95bbb75e3b9b9835b7c/airflow/operators/mysql_to_s3_operator.py#L108
   
   Because if you put the code as @nuclearpinguin proposed originally you get 
the error:
   `AttributeError: 'DataFrame' obje

[GitHub] [airflow] codecov-io edited a comment on issue #6652: [AIRFLOW-5548] [AIRFLOW-5550] REST API enhancement - dag info, task …

2020-02-25 Thread GitBox
codecov-io edited a comment on issue #6652: [AIRFLOW-5548] [AIRFLOW-5550] REST 
API enhancement - dag info, task …
URL: https://github.com/apache/airflow/pull/6652#issuecomment-558277657
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/6652?src=pr&el=h1) 
Report
   > Merging 
[#6652](https://codecov.io/gh/apache/airflow/pull/6652?src=pr&el=desc) into 
[master](https://codecov.io/gh/apache/airflow/commit/746d8de2fcaa9554b4ce7dbf261e4ab148233222?src=pr&el=desc)
 will **increase** coverage by `<.01%`.
   > The diff coverage is `88.18%`.
   
   [![Impacted file tree 
graph](https://codecov.io/gh/apache/airflow/pull/6652/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/6652?src=pr&el=tree)
   
   ```diff
   @@Coverage Diff @@
   ##   master#6652  +/-   ##
   ==
   + Coverage   86.85%   86.86%   +<.01% 
   ==
 Files 896  899   +3 
 Lines   4263442740 +106 
   ==
   + Hits3703137124  +93 
   - Misses   5603 5616  +13
   ```
   
   
   | [Impacted 
Files](https://codecov.io/gh/apache/airflow/pull/6652?src=pr&el=tree) | 
Coverage Δ | |
   |---|---|---|
   | 
[airflow/api/common/experimental/get\_dag.py](https://codecov.io/gh/apache/airflow/pull/6652/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9nZXRfZGFnLnB5)
 | `100% <100%> (ø)` | |
   | 
[airflow/api/common/experimental/get\_dags.py](https://codecov.io/gh/apache/airflow/pull/6652/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9nZXRfZGFncy5weQ==)
 | `100% <100%> (ø)` | |
   | 
[airflow/api/common/experimental/get\_tasks.py](https://codecov.io/gh/apache/airflow/pull/6652/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9nZXRfdGFza3MucHk=)
 | `100% <100%> (ø)` | |
   | 
[airflow/www/api/experimental/endpoints.py](https://codecov.io/gh/apache/airflow/pull/6652/diff?src=pr&el=tree#diff-YWlyZmxvdy93d3cvYXBpL2V4cGVyaW1lbnRhbC9lbmRwb2ludHMucHk=)
 | `88.32% <83.05%> (-1.5%)` | :arrow_down: |
   | 
[airflow/api/common/experimental/get\_task.py](https://codecov.io/gh/apache/airflow/pull/6652/diff?src=pr&el=tree#diff-YWlyZmxvdy9hcGkvY29tbW9uL2V4cGVyaW1lbnRhbC9nZXRfdGFzay5weQ==)
 | `88.46% <86.95%> (-11.54%)` | :arrow_down: |
   
   --
   
   [Continue to review full report at 
Codecov](https://codecov.io/gh/apache/airflow/pull/6652?src=pr&el=continue).
   > **Legend** - [Click here to learn 
more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute  (impact)`, `ø = not affected`, `? = missing data`
   > Powered by 
[Codecov](https://codecov.io/gh/apache/airflow/pull/6652?src=pr&el=footer). 
Last update 
[746d8de...189a2a8](https://codecov.io/gh/apache/airflow/pull/6652?src=pr&el=lastupdated).
 Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] jj-ookla commented on a change in pull request #6670: [AIRFLOW-4816]MySqlToS3Operator

2020-02-25 Thread GitBox
jj-ookla commented on a change in pull request #6670: 
[AIRFLOW-4816]MySqlToS3Operator
URL: https://github.com/apache/airflow/pull/6670#discussion_r384017782
 
 

 ##
 File path: airflow/operators/mysql_to_s3_operator.py
 ##
 @@ -0,0 +1,117 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Transfer data from MySQL into a S3 bucket
+"""
+from io import StringIO
+from typing import Optional, Union
+
+import numpy as np
+import pandas as pd
+
+from airflow.hooks.mysql_hook import MySqlHook
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.utils.decorators import apply_defaults
+
+
+class MySQLToS3Operator(BaseOperator):
+"""
+Saves data from an specific MySQL query into a file in S3.
+
+:param query: the sql query to be executed.
+:type query: str
+:param s3_bucket: bucket where the data will be stored
+:type s3_bucket: str
+:param s3_key: desired key for the file. It includes the name of the file.
+If a csv file is wanted, the param must end with ".csv".
+:type s3_key: str
+:param mysql_conn_id: reference to a specific mysql database
+:type mysql_conn_id: str
+ :param aws_conn_id: reference to a specific S3 connection
+:type aws_conn_id: str
+:param verify: Whether or not to verify SSL certificates for S3 connection.
+By default SSL certificates are verified.
+You can provide the following values:
+
+- False: do not validate SSL certificates. SSL will still be used
+ (unless use_ssl is False), but SSL certificates will not be
+ verified.
+- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
+ You can specify this argument if you want to use a different
+ CA cert bundle than the one used by botocore.
+:type verify: bool or str
+"""
+
+@apply_defaults
+def __init__(
+self,
+query: str,
+s3_bucket: str,
+s3_key: str,
+mysql_conn_id: str = 'mysql_default',
+aws_conn_id: str = 'aws_default',
+verify: Optional[Union[bool, str]] = None,
+header: Optional[bool] = False,
+index: Optional[bool] = False,
+*args, **kwargs) -> None:
+super().__init__(*args, **kwargs)
+self.query = query
+self.s3_bucket = s3_bucket
+self.s3_key = s3_key
+self.mysql_conn_id = mysql_conn_id
+self.aws_conn_id = aws_conn_id
+self.verify = verify
+self.header = header
+self.index = index
+
+def fill_na_with_none(self, series):
+"""Replace NaN values with None"""
+return np.where(series.isnull(), None, series)
+
+def fix_int_dtypes(self, df):
+"""
+Mutate DataFrame to set dtypes for int columns containing NaN values."
+"""
+for col in df:
+if "float" in df[col].dtype.name and df[col].hasnans:
+# inspect values to determine if dtype of non-null values is 
int or float
+notna_series = df[col].dropna().values
+if np.isclose(notna_series, notna_series.astype(int)).all():
+# set to dtype that retains integers and supports NaNs
+df[col] = 
self.fill_na_with_none(df[col]).astype(pd.Int64Dtype)
+
+def execute(self, context, **kwargs):
+self.hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)
+self.s3_conn = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+data_df = self.hook.get_pandas_df(self.query)
+self.log.info("Data from MySQL obtained")
+
+self.fix_int_dtypes(data_df)
+file_obj = StringIO()
 
 Review comment:
   Why is the DataFrame pickled then written to a file with a csv extension? 
This would definitely explain why the csv is unreadable. 
   
   
https://github.com/apache/airflow/blob/69ff9f3dbf25ecefc627f95bbb75e3b9b9835b7c/airflow/operators/mysql_to_s3_operator.py#L108


This is an automated message from the Apache Git Service.
To respond to t

[jira] [Updated] (AIRFLOW-5869) Creating DagRuns fails for Deserialized tasks with no start_date

2020-02-25 Thread Kaxil Naik (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kaxil Naik updated AIRFLOW-5869:

Labels: dag-serialization  (was: )

> Creating DagRuns fails for Deserialized tasks with no start_date
> 
>
> Key: AIRFLOW-5869
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5869
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.0.0
>Reporter: Kaxil Naik
>Assignee: Kaxil Naik
>Priority: Major
>  Labels: dag-serialization
> Fix For: 1.10.7
>
>
> Deserialized operators do not always have start_date set. 
> That, for instance, breaks triggering dags.
> See the code from DAG.create_dagrun():
> {code:python}
> run = DagRun(...)
> session.add(run)
> session.commit()
> run.dag = self
> run.verify_integrity(session=session) # this validation fails because 
> run assumes that all operators have start_date set
> run.refresh_from_db()
> {code}
> One of the optimisation 
> (https://github.com/coufon/airflow/commit/b5ee858f44f55818c589cf2c8bf3866fa5d50e30)
>  we did as part of DAG Serialization was to not store dates in tasks if they 
> have a matching date (start_date or end_date) in DAG. Unfortunately, when 
> triggering DAG containing such tasks, it fails on DagRun.run.verify_integrity.
> The fix is to add the start_date when deserializing the operator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-5943) DAG Serialization & DB Persistence

2020-02-25 Thread Kaxil Naik (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kaxil Naik updated AIRFLOW-5943:

Labels: dag-serialization  (was: )

> DAG Serialization & DB Persistence
> --
>
> Key: AIRFLOW-5943
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5943
> Project: Apache Airflow
>  Issue Type: Epic
>  Components: core
>Affects Versions: 1.10.5, 1.10.6
>Reporter: Kaxil Naik
>Assignee: Kaxil Naik
>Priority: Major
>  Labels: dag-serialization
>
> This Epic aims to decouple Webserver from having to parse the DAG files. 
> Short-term Scheduler will parse the DAG files and persist it in DB. Longer 
> term we would have a new airflow component "Serializer"/"Parser" that would 
> be the component parsing DAG files, serializing DAG objects and persisting it 
> to DB. This would be the Webserver & Scheduler would not need access to DAG 
> files.
> More details Implementing AIP-24: 
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-24+DAG+Persistence+in+DB+using+JSON+for+Airflow+Webserver+and+%28optional%29+Scheduler



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-5946) Store & Read code from DB for Code View

2020-02-25 Thread Kaxil Naik (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kaxil Naik updated AIRFLOW-5946:

Labels: dag-serialization  (was: )

> Store & Read code from DB for Code View
> ---
>
> Key: AIRFLOW-5946
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5946
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: webserver
>Affects Versions: 2.0.0, 1.10.7
>Reporter: Kaxil Naik
>Assignee: Kaxil Naik
>Priority: Major
>  Labels: dag-serialization
>
> To make Webserver not need DAG Files we need to find a way to get Code to 
> display in *Code View*.
> - Store in lazy-loaded column in SerializedDag table
> - Save in a new table with DAG_id and store versions as well. Add a limit of 
> last 10 versions. This is just needed by Code View so not a problem if we 
> store in New table
> OR - Just keep as reading from file?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-5947) Make the json backend pluggable for DAG Serialization

2020-02-25 Thread Kaxil Naik (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kaxil Naik updated AIRFLOW-5947:

Labels: dag-serialization  (was: )

> Make the json backend pluggable for DAG Serialization
> -
>
> Key: AIRFLOW-5947
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5947
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: core, scheduler
>Affects Versions: 2.0.0, 1.10.7
>Reporter: Kaxil Naik
>Assignee: Kaxil Naik
>Priority: Major
>  Labels: dag-serialization
> Fix For: 1.10.7
>
>
> Allow users the option to choose the JSON library of their choice for DAG 
> Serialization.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-5948) Replace SimpleDag with serialized version

2020-02-25 Thread Kaxil Naik (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kaxil Naik updated AIRFLOW-5948:

Labels: dag-serialization  (was: )

> Replace SimpleDag with serialized version
> -
>
> Key: AIRFLOW-5948
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5948
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: core, scheduler
>Affects Versions: 2.0.0, 1.10.7
>Reporter: Kaxil Naik
>Assignee: Kaxil Naik
>Priority: Critical
>  Labels: dag-serialization
>
> Replace SimpleDag with serialized version (json over multiprocessing) in 
> SchedulerJob etc., no other change in scheduler behaviour. (This doesn't make 
> sense long term, but does tidy up the code)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-5945) Fix limitation with OperatorLinks when using DAG Serialization

2020-02-25 Thread Kaxil Naik (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5945?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kaxil Naik updated AIRFLOW-5945:

Labels: dag-serialization  (was: )

> Fix limitation with OperatorLinks when using DAG Serialization
> --
>
> Key: AIRFLOW-5945
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5945
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: core, webserver
>Affects Versions: 2.0.0, 1.10.7
>Reporter: Kaxil Naik
>Assignee: Kaxil Naik
>Priority: Major
>  Labels: dag-serialization
> Fix For: 1.10.7
>
>
> Operator links (Re-write the Quoble link to not need an operator of the right 
> class and add this to the docs)
> or store the info in a new column in Task Instance or Serialized DAG table.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-5944) Rendering templated_fields without accessing DAG files

2020-02-25 Thread Kaxil Naik (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kaxil Naik updated AIRFLOW-5944:

Labels: dag-serialization  (was: )

> Rendering templated_fields without accessing DAG files
> --
>
> Key: AIRFLOW-5944
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5944
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.0.0, 1.10.7
>Reporter: Kaxil Naik
>Assignee: Kaxil Naik
>Priority: Major
>  Labels: dag-serialization
>
> One of the limitations with the current implementation of DAG s10n is that 
> Templated fields still need access to DAG Files.
> *Potential solution 1*: Store rendered version in the DB somewhere. 
> *Limitation*: Tasks that are not yet run won't be able to render the 
> templated fields (this is a useful debugging aid, especially for those 
> without CLI access to run `airflow tasks render`) .
> *Potential solution 2*: Store both rendered & unrendered version in the DB 
> somewhere. This will allow getting past the Limitation in Solution (1). We 
> should store only the current unrendered version and store the last X number 
> of rendered versions. This will also have an advantage that old task would 
> have the rendered version that was correct at that time (which is not 
> possible currently).
> *Limitation/Issue*: Maintaining two separate tables without much benefit
> *Potential solution 3*: Store rendered version in TI table and unrendered 
> version in a separate column in DAG serialization table. This means that each 
> TI would have an associated rendered field without creating an extra table 
> and duplicating dag_id, task_id & execution_date. Once we start storing 
> different version of serialized DAGs we can have each DAG row have it's 
> unrendered version.
> **



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-5088) To implement DAG JSON serialization and DB persistence for webserver scalability improvement

2020-02-25 Thread Kaxil Naik (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5088?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kaxil Naik updated AIRFLOW-5088:

Labels: dag-serialization  (was: )

> To implement DAG JSON serialization and DB persistence for webserver 
> scalability improvement
> 
>
> Key: AIRFLOW-5088
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5088
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: DAG, webserver
>Affects Versions: 1.10.5
>Reporter: Zhou Fang
>Assignee: Zhou Fang
>Priority: Major
>  Labels: dag-serialization
> Fix For: 1.10.7
>
>
> Created this issue for starting to implement DAG serialization using JSON and 
> persistence in DB. Serialized DAG will be used in webserver for solving the 
> webserver scalability issue.
>  
> The implementation is based on AIP-24: 
> [https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-24+DAG+Persistence+in+DB+using+JSON+for+Airflow+Webserver+and+%28optional%29+Scheduler]
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] brandonwillard commented on issue #7423: [AIRFLOW-3126] Add option to specify additional K8s volumes

2020-02-25 Thread GitBox
brandonwillard commented on issue #7423: [AIRFLOW-3126] Add option to specify 
additional K8s volumes
URL: https://github.com/apache/airflow/pull/7423#issuecomment-590958733
 
 
   @davlum, I've tried it and It did not appear to help.  Can you give an 
example of how it can be used to add volumes in this way without sacrificing 
any of the existing functionality/configurability of worker pods?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] brandonwillard removed a comment on issue #7423: [AIRFLOW-3126] Add option to specify additional K8s volumes

2020-02-25 Thread GitBox
brandonwillard removed a comment on issue #7423: [AIRFLOW-3126] Add option to 
specify additional K8s volumes
URL: https://github.com/apache/airflow/pull/7423#issuecomment-590955205
 
 
   It does not appear to help.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] codecov-io commented on issue #7523: [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod

2020-02-25 Thread GitBox
codecov-io commented on issue #7523: [AIRFLOW-6843] Add delete_option_kwargs to 
delete_namespaced_pod
URL: https://github.com/apache/airflow/pull/7523#issuecomment-590956652
 
 
   # [Codecov](https://codecov.io/gh/apache/airflow/pull/7523?src=pr&el=h1) 
Report
   > :exclamation: No coverage uploaded for pull request base 
(`master@b6a11c1`). [Click here to learn what that 
means](https://docs.codecov.io/docs/error-reference#section-missing-base-commit).
   > The diff coverage is `93.18%`.
   
   [![Impacted file tree 
graph](https://codecov.io/gh/apache/airflow/pull/7523/graphs/tree.svg?width=650&token=WdLKlKHOAU&height=150&src=pr)](https://codecov.io/gh/apache/airflow/pull/7523?src=pr&el=tree)
   
   ```diff
   @@Coverage Diff@@
   ## master#7523   +/-   ##
   =
 Coverage  ?   86.85%   
   =
 Files ?  896   
 Lines ?42638   
 Branches  ?0   
   =
 Hits  ?37035   
 Misses? 5603   
 Partials  ?0
   ```
   
   
   | [Impacted 
Files](https://codecov.io/gh/apache/airflow/pull/7523?src=pr&el=tree) | 
Coverage Δ | |
   |---|---|---|
   | 
[airflow/utils/helpers.py](https://codecov.io/gh/apache/airflow/pull/7523/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9oZWxwZXJzLnB5)
 | `82.6% <100%> (ø)` | |
   | 
[airflow/settings.py](https://codecov.io/gh/apache/airflow/pull/7523/diff?src=pr&el=tree#diff-YWlyZmxvdy9zZXR0aW5ncy5weQ==)
 | `93.66% <100%> (ø)` | |
   | 
[airflow/models/dagbag.py](https://codecov.io/gh/apache/airflow/pull/7523/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnYmFnLnB5)
 | `89.61% <100%> (ø)` | |
   | 
[...s/google/cloud/example\_dags/example\_stackdriver.py](https://codecov.io/gh/apache/airflow/pull/7523/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvZ29vZ2xlL2Nsb3VkL2V4YW1wbGVfZGFncy9leGFtcGxlX3N0YWNrZHJpdmVyLnB5)
 | `100% <100%> (ø)` | |
   | 
[airflow/models/taskinstance.py](https://codecov.io/gh/apache/airflow/pull/7523/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvdGFza2luc3RhbmNlLnB5)
 | `95.17% <100%> (ø)` | |
   | 
[...ow/providers/microsoft/azure/hooks/azure\_cosmos.py](https://codecov.io/gh/apache/airflow/pull/7523/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvbWljcm9zb2Z0L2F6dXJlL2hvb2tzL2F6dXJlX2Nvc21vcy5weQ==)
 | `76.72% <100%> (ø)` | |
   | 
[airflow/providers/amazon/aws/hooks/s3.py](https://codecov.io/gh/apache/airflow/pull/7523/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvYW1hem9uL2F3cy9ob29rcy9zMy5weQ==)
 | `96.6% <100%> (ø)` | |
   | 
[...ow/providers/google/cloud/operators/stackdriver.py](https://codecov.io/gh/apache/airflow/pull/7523/diff?src=pr&el=tree#diff-YWlyZmxvdy9wcm92aWRlcnMvZ29vZ2xlL2Nsb3VkL29wZXJhdG9ycy9zdGFja2RyaXZlci5weQ==)
 | `100% <100%> (ø)` | |
   | 
[airflow/models/dag.py](https://codecov.io/gh/apache/airflow/pull/7523/diff?src=pr&el=tree#diff-YWlyZmxvdy9tb2RlbHMvZGFnLnB5)
 | `91.37% <100%> (ø)` | |
   | 
[airflow/utils/log/logging\_mixin.py](https://codecov.io/gh/apache/airflow/pull/7523/diff?src=pr&el=tree#diff-YWlyZmxvdy91dGlscy9sb2cvbG9nZ2luZ19taXhpbi5weQ==)
 | `95.38% <100%> (ø)` | |
   | ... and [7 
more](https://codecov.io/gh/apache/airflow/pull/7523/diff?src=pr&el=tree-more) 
| |
   
   --
   
   [Continue to review full report at 
Codecov](https://codecov.io/gh/apache/airflow/pull/7523?src=pr&el=continue).
   > **Legend** - [Click here to learn 
more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute  (impact)`, `ø = not affected`, `? = missing data`
   > Powered by 
[Codecov](https://codecov.io/gh/apache/airflow/pull/7523?src=pr&el=footer). 
Last update 
[b6a11c1...e13f6c1](https://codecov.io/gh/apache/airflow/pull/7523?src=pr&el=lastupdated).
 Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] brandonwillard commented on issue #7423: [AIRFLOW-3126] Add option to specify additional K8s volumes

2020-02-25 Thread GitBox
brandonwillard commented on issue #7423: [AIRFLOW-3126] Add option to specify 
additional K8s volumes
URL: https://github.com/apache/airflow/pull/7423#issuecomment-590955205
 
 
   It does not appear to help.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (AIRFLOW-5191) SubDag is marked failed

2020-02-25 Thread nexoriv (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-5191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17044639#comment-17044639
 ] 

nexoriv commented on AIRFLOW-5191:
--

thanks [~puhrez] that was useful!

> SubDag is marked failed 
> 
>
> Key: AIRFLOW-5191
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5191
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DAG, DagRun
>Affects Versions: 1.10.4
> Environment: CentOS 7, Maria-DB, python 3.6.7, Airflow 1.10.4
>Reporter: Oliver Ricken
>Priority: Blocker
>
> Dear all,
> after having upgraded from Airflow version 1.10.2 to 1.10.4, we experience 
> strange and very problematic behaviour of SubDags (which are crucial for our 
> environment and used frequently).
> Tasks inside the SubDag failing and awaiting retry ("up-for-retry") mark the 
> SubDag "failed" (while in 1.10.2, the SubDag was still in "running"-state). 
> This is particularly problematic for downstream tasks depending on the state 
> of the SubDag. Since we have downstream tasks triggered on "all_done", the 
> downstream task is triggered by the "failed" SubDag although a 
> SubDag-internal task is awaiting retry and might (in our case: most likely) 
> yield successfully processed data. This data is thus not available to the 
> prematurely triggered task downstream of the SubDag.
> This is a severe problem for us and worth rolling back to 1.10.2 if there is 
> no quick solution or work-around to this issue!
> We urgently need help on this matter.
> Thanks allot in advance, any suggestions and input is highly appreciated!
> Cheers
> Oliver



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db

2020-02-25 Thread GitBox
anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store 
source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383986936
 
 

 ##
 File path: airflow/models/dagcode.py
 ##
 @@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+from typing import List
+
+from sqlalchemy import Column, Index, Integer, String, Text, and_
+
+from airflow.models import Base
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+log = logging.getLogger(__name__)
+
+
+class DagCodeModel(Base):
+"""A table for DAGs code.
+
+dag_code table contains code of DAG files synchronized by scheduler.
+This feature is controlled by:
+
+* ``[core] store_serialized_dags = True``: enable this feature
+* ``[core] store_code = True``: enable this feature
+
+For details on dag serialization see SerializedDagModel
+"""
+__tablename__ = 'dag_code'
+
+fileloc = Column(String(2000), primary_key=True)
+# The max length of fileloc exceeds the limit of indexing.
+fileloc_hash = Column(Integer, primary_key=True)
+last_updated = Column(UtcDateTime, nullable=False)
+source_code = Column(Text(), nullable=False)
+
+__table_args__ = (
+Index('idx_fileloc_code_hash', fileloc_hash, unique=False),
+)
+
+def __init__(self, full_filepath: str):
+self.fileloc = full_filepath
+self.fileloc_hash = DagCodeModel.dag_fileloc_hash(self.fileloc)
+self.last_updated = timezone.utcnow()
+self.source_code = DagCodeModel._read_code(self.fileloc)
+
+@classmethod
+def _read_code(cls, fileloc: str):
+try:
+with open(fileloc, 'r') as source:
+source_code = source.read()
+except IOError:
+source_code = "Couldn't read source file {}.".format(fileloc)
+return source_code
+
+@provide_session
+def write_code(self, session=None):
+"""Writes code into database.
+
+:param session: ORM Session
+"""
+session.merge(self)
+
+@classmethod
+@provide_session
+def remove_deleted_code(cls, alive_dag_filelocs: List[str], session=None):
+"""Deletes code not included in alive_dag_filelocs.
+
+:param alive_dag_filelocs: file paths of alive DAGs
+:param session: ORM Session
+"""
+alive_fileloc_hashes = [
+cls.dag_fileloc_hash(fileloc) for fileloc in alive_dag_filelocs]
+
+log.debug("Deleting code from %s table ", cls.__tablename__)
+
+session.execute(
+cls.__table__.delete().where(
+and_(cls.fileloc_hash.notin_(alive_fileloc_hashes),
+ cls.fileloc.notin_(alive_dag_filelocs
+
+@classmethod
+def dag_fileloc_hash(cls, full_filepath: str) -> int:
+Hashing file location for indexing.
+
+:param full_filepath: full filepath of DAG file
+:return: hashed full_filepath
+"""
+# hashing is needed because the length of fileloc is 2000 as an 
Airflow convention,
+# which is over the limit of indexing. If we can reduce the length of 
fileloc, then
+# hashing is not needed.
+import hashlib
+return int.from_bytes(
+hashlib.sha1(
+full_filepath.encode('utf-8')).digest()[-2:],
+byteorder='big', signed=False)
 
 Review comment:
   I agree. I shall create this migration


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store source code in db

2020-02-25 Thread GitBox
anitakar commented on a change in pull request #7217: [AIRFLOW-5946] Store 
source code in db
URL: https://github.com/apache/airflow/pull/7217#discussion_r383986117
 
 

 ##
 File path: airflow/models/dagcode.py
 ##
 @@ -0,0 +1,108 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import logging
+from typing import List
+
+from sqlalchemy import Column, Index, Integer, String, Text, and_
+
+from airflow.models import Base
+from airflow.utils import timezone
+from airflow.utils.session import provide_session
+from airflow.utils.sqlalchemy import UtcDateTime
+
+log = logging.getLogger(__name__)
+
+
+class DagCodeModel(Base):
+"""A table for DAGs code.
+
+dag_code table contains code of DAG files synchronized by scheduler.
+This feature is controlled by:
+
+* ``[core] store_serialized_dags = True``: enable this feature
+* ``[core] store_code = True``: enable this feature
+
+For details on dag serialization see SerializedDagModel
+"""
+__tablename__ = 'dag_code'
+
+fileloc = Column(String(2000), primary_key=True)
+# The max length of fileloc exceeds the limit of indexing.
+fileloc_hash = Column(Integer, primary_key=True)
+last_updated = Column(UtcDateTime, nullable=False)
+source_code = Column(Text())
+
+__table_args__ = (
+Index('idx_fileloc_code_hash', fileloc_hash, unique=False),
+)
+
+def __init__(self, full_filepath: str):
+self.fileloc = full_filepath
+self.fileloc_hash = DagCodeModel.dag_fileloc_hash(self.fileloc)
+self.last_updated = timezone.utcnow()
+self.source_code = DagCodeModel._read_code(self.fileloc)
+
+@classmethod
+def _read_code(cls, fileloc: str):
+try:
+with open(fileloc, 'r') as source:
+source_code = source.read()
+except IOError:
+source_code = "Couldn't read source file {}.".format(fileloc)
 
 Review comment:
   I agree


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on issue #7484: [AIRFLOW-6864] Make airflow/jobs pylint compatible

2020-02-25 Thread GitBox
ashb commented on issue #7484: [AIRFLOW-6864] Make airflow/jobs pylint 
compatible
URL: https://github.com/apache/airflow/pull/7484#issuecomment-590940199
 
 
   Ah, because we have ignored that file (or mostly ignored it) it's not yet 
showing up in Pylint. SQLA DelcarativeModels does _funny_ things with the 
classes/instances that pylint can't detect.
   
   I guess it's unavoidable, and we will have to ignore those few lines.
   
   I think https://github.com/PyCQA/pylint/issues/3334 is the same issue we 
have (open, no feedback).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (AIRFLOW-6904) Airflow 1.10.9 suppresses Operator logs

2020-02-25 Thread RAHUL JAIN (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6904?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RAHUL JAIN updated AIRFLOW-6904:

Affects Version/s: 1.10.7

> Airflow 1.10.9 suppresses Operator logs
> ---
>
> Key: AIRFLOW-6904
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6904
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.10.7, 1.10.8, 1.10.9
>Reporter: RAHUL JAIN
>Priority: Critical
> Attachments: 1.10.2.png, 1.10.9.png
>
>
> After upgrading from 1.10.2 to 1.10.9, we noticed that the Operator logs are 
> no longer printed. See the attachments for comparison. There is also a slack 
> channel discussion pointing to a recen change that may have broken this - 
> [https://apache-airflow.slack.com/archives/CCQ7EGB1P/p1582548602014200]
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (AIRFLOW-6904) Airflow 1.10.9 suppresses Operator logs

2020-02-25 Thread Kaxil Naik (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-6904?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17044604#comment-17044604
 ] 

Kaxil Naik commented on AIRFLOW-6904:
-

This worked fine with Airflow <=1.10.6 and is caused by 
https://github.com/apache/airflow/pull/6627

> Airflow 1.10.9 suppresses Operator logs
> ---
>
> Key: AIRFLOW-6904
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6904
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.10.8, 1.10.9
>Reporter: RAHUL JAIN
>Priority: Critical
> Attachments: 1.10.2.png, 1.10.9.png
>
>
> After upgrading from 1.10.2 to 1.10.9, we noticed that the Operator logs are 
> no longer printed. See the attachments for comparison. There is also a slack 
> channel discussion pointing to a recen change that may have broken this - 
> [https://apache-airflow.slack.com/archives/CCQ7EGB1P/p1582548602014200]
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] davlum commented on issue #7423: [AIRFLOW-3126] Add option to specify additional K8s volumes

2020-02-25 Thread GitBox
davlum commented on issue #7423: [AIRFLOW-3126] Add option to specify 
additional K8s volumes
URL: https://github.com/apache/airflow/pull/7423#issuecomment-590936689
 
 
   I've noticed you've added several PRs recently to add missing functionality 
to the `airflow.cfg` for configuring the KubernetesExecutor. There is also a 
[new 
field](https://github.com/apache/airflow/blob/746d8de2fcaa9554b4ce7dbf261e4ab148233222/airflow/config_templates/default_airflow.cfg#L756)
 which allows you to pass the YAML directly if that helps at all.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] kaxil commented on a change in pull request #7523: [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod

2020-02-25 Thread GitBox
kaxil commented on a change in pull request #7523: [AIRFLOW-6843] Add 
delete_option_kwargs to delete_namespaced_pod
URL: https://github.com/apache/airflow/pull/7523#discussion_r383966365
 
 

 ##
 File path: airflow/config_templates/config.yml
 ##
 @@ -2035,6 +2035,17 @@
   type: string
   example: ~
   default: ""
+- name: delete_option_kwargs
+  description: |
+Optional keyword arguments to pass to the ``delete_namespaced_pod`` 
kubernetes client
+``core_v1_api`` method when using the Kubernetes Executor.
+This should be an object and can contain any of the options listed in 
the ``v1DeleteOptions``
+class defined here:
+
https://github.com/kubernetes-client/python/blob/41f11a09995efcd0142e25946adc7591431bfb2f/kubernetes/client/models/v1_delete_options.py#L19
+  version_added: ~
+  type: string
+  example: "{{\"grace_period_seconds\": 10}}"
 
 Review comment:
   Updated !


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] kaxil commented on a change in pull request #7523: [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod

2020-02-25 Thread GitBox
kaxil commented on a change in pull request #7523: [AIRFLOW-6843] Add 
delete_option_kwargs to delete_namespaced_pod
URL: https://github.com/apache/airflow/pull/7523#discussion_r383964107
 
 

 ##
 File path: setup.py
 ##
 @@ -309,7 +309,7 @@ def write_version(filename: str = 
os.path.join(*[dirname(__file__), "airflow", "
 'pinotdb==0.1.1',
 ]
 postgres = [
-'psycopg2-binary>=2.7.4',
 
 Review comment:
   Whoops this was not meant for this PR, removing it


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] potiuk commented on a change in pull request #7523: [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod

2020-02-25 Thread GitBox
potiuk commented on a change in pull request #7523: [AIRFLOW-6843] Add 
delete_option_kwargs to delete_namespaced_pod
URL: https://github.com/apache/airflow/pull/7523#discussion_r383963718
 
 

 ##
 File path: setup.py
 ##
 @@ -309,7 +309,7 @@ def write_version(filename: str = 
os.path.join(*[dirname(__file__), "airflow", "
 'pinotdb==0.1.1',
 ]
 postgres = [
-'psycopg2-binary>=2.7.4',
 
 Review comment:
   Why this ? Just wondering?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] Fokko commented on a change in pull request #6577: [AIRFLOW-5908] Add download_file to S3 Hook

2020-02-25 Thread GitBox
Fokko commented on a change in pull request #6577: [AIRFLOW-5908] Add 
download_file to S3 Hook
URL: https://github.com/apache/airflow/pull/6577#discussion_r383956654
 
 

 ##
 File path: airflow/providers/amazon/aws/hooks/s3.py
 ##
 @@ -652,3 +654,30 @@ def delete_objects(self, bucket, keys):
 if "Errors" in response:
 errors_keys = [x['Key'] for x in response.get("Errors", [])]
 raise AirflowException("Errors when deleting: 
{}".format(errors_keys))
+
+@provide_bucket_name
+@unify_bucket_name_and_key
+def download_file(self, key: str, bucket_name: Optional[str] = None, 
local_path: str = '/tmp') -> str:
 
 Review comment:
   Instead of hardcoding the `/tmp` dir. We could ask this using a Python 
function, which is system independent since it will ask the OS for a temp dir: 
https://docs.python.org/3/library/tempfile.html#tempfile.mkdtemp


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] nuclearpinguin edited a comment on issue #7484: [AIRFLOW-6864] Make airflow/jobs pylint compatible

2020-02-25 Thread GitBox
nuclearpinguin edited a comment on issue #7484: [AIRFLOW-6864] Make 
airflow/jobs pylint compatible
URL: https://github.com/apache/airflow/pull/7484#issuecomment-590924614
 
 
   ```
   * Module airflow.jobs.scheduler_job
   airflow/jobs/scheduler_job.py:1031:16: E1101: Method 'state' has no 'is_' 
member (no-member)
   ```
   @ashb 
   
   I assume that the difference is that here `DM.dag_id.is_(None)` we are 
referencing `dag_id` attribute of sqla model. And here 
`models.DagRun.state.is_(None)` reference a method of `DagRun`:
   ```python
   @declared_attr
   def state(self):
   return synonym('_state',
  descriptor=property(self.get_state, self.set_state))
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] nuclearpinguin edited a comment on issue #7484: [AIRFLOW-6864] Make airflow/jobs pylint compatible

2020-02-25 Thread GitBox
nuclearpinguin edited a comment on issue #7484: [AIRFLOW-6864] Make 
airflow/jobs pylint compatible
URL: https://github.com/apache/airflow/pull/7484#issuecomment-590924614
 
 
   ```
   * Module airflow.jobs.scheduler_job
   airflow/jobs/scheduler_job.py:1031:16: E1101: Method 'state' has no 'is_' 
member (no-member)
   ```
   @ashb 
   
   I assume that the difference is that here 
`models.TaskInstance.state.in_(old_states)` we are referencing `state` 
attribute of sqla model. And here `models.DagRun.state.is_(None)` reference a 
method of `DagRun`:
   ```python
   @declared_attr
   def state(self):
   return synonym('_state',
  descriptor=property(self.get_state, self.set_state))
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] nuclearpinguin commented on issue #7484: [AIRFLOW-6864] Make airflow/jobs pylint compatible

2020-02-25 Thread GitBox
nuclearpinguin commented on issue #7484: [AIRFLOW-6864] Make airflow/jobs 
pylint compatible
URL: https://github.com/apache/airflow/pull/7484#issuecomment-590924614
 
 
   ```
   * Module airflow.jobs.scheduler_job
   airflow/jobs/scheduler_job.py:1031:16: E1101: Method 'state' has no 'is_' 
member (no-member)
   ```
   @ashb 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (AIRFLOW-6915) Adds AI Platform Console link for MLEngineStartTrainingJobOperator

2020-02-25 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-6915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17044571#comment-17044571
 ] 

ASF GitHub Bot commented on AIRFLOW-6915:
-

MarkYHZhang commented on pull request #7535: [AIRFLOW-6915] Adds AI Platform 
Console link for MLEngineStartTrainingOperator
URL: https://github.com/apache/airflow/pull/7535
 
 
   Adds AI Platform Console link for MLEngineStartTrainingOperator for the RBAC 
UI
   
   ---
   Issue link: WILL BE INSERTED BY 
[boring-cyborg](https://github.com/kaxil/boring-cyborg)
   
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Commit message/PR title starts with `[AIRFLOW-]`. AIRFLOW- = 
JIRA ID*
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   * For document-only changes commit message can start with 
`[AIRFLOW-]`.
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Adds AI Platform Console link for MLEngineStartTrainingJobOperator
> --
>
> Key: AIRFLOW-6915
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6915
> Project: Apache Airflow
>  Issue Type: New Feature
>  Components: gcp, operators
>Affects Versions: 2.0.0
>Reporter: Mark Zhang
>Assignee: Mark Zhang
>Priority: Major
>
> Adds AI Platform Console link for MLEngineStartTrainingJobOperator



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] MarkYHZhang opened a new pull request #7535: [AIRFLOW-6915] Adds AI Platform Console link for MLEngineStartTrainingOperator

2020-02-25 Thread GitBox
MarkYHZhang opened a new pull request #7535: [AIRFLOW-6915] Adds AI Platform 
Console link for MLEngineStartTrainingOperator
URL: https://github.com/apache/airflow/pull/7535
 
 
   Adds AI Platform Console link for MLEngineStartTrainingOperator for the RBAC 
UI
   
   ---
   Issue link: WILL BE INSERTED BY 
[boring-cyborg](https://github.com/kaxil/boring-cyborg)
   
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Commit message/PR title starts with `[AIRFLOW-]`. AIRFLOW- = 
JIRA ID*
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   * For document-only changes commit message can start with 
`[AIRFLOW-]`.
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (AIRFLOW-5354) Scheduler - constant CPU usage of 25% with nothing running and scheduling loop running too frequently

2020-02-25 Thread Ash Berlin-Taylor (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ash Berlin-Taylor reassigned AIRFLOW-5354:
--

Assignee: Ash Berlin-Taylor

> Scheduler - constant CPU usage of 25% with nothing running and scheduling 
> loop running too frequently
> -
>
> Key: AIRFLOW-5354
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5354
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: scheduler
>Affects Versions: 1.10.4
>Reporter: t oo
>Assignee: Ash Berlin-Taylor
>Priority: Major
>
> I've put logging level to debug, cpu utilisation is constant 25% but no dags 
> are running (they have none schedule since i only externally trigger). Why is 
> the scheduling loop running every 2 seconds? can I make it every 30 seconds?
>  
> here is some of the scheduler log
>  
> [2019-08-30 09:28:57,538] \{scheduler_job.py:1363} DEBUG - Starting Loop...
> [2019-08-30 09:28:57,539] \{scheduler_job.py:1374} DEBUG - Harvesting DAG 
> parsing results
> [2019-08-30 09:28:57,539] \{dag_processing.py:635} DEBUG - Received message 
> of type DagParsingStat
> [2019-08-30 09:28:57,539] \{dag_processing.py:635} DEBUG - Received message 
> of type DagParsingStat
> [2019-08-30 09:28:57,539] \{dag_processing.py:635} DEBUG - Received message 
> of type DagParsingStat
> [2019-08-30 09:28:57,539] \{dag_processing.py:635} DEBUG - Received message 
> of type DagParsingStat
> [2019-08-30 09:28:57,540] \{scheduler_job.py:1376} DEBUG - Harvested 0 
> SimpleDAGs
> [2019-08-30 09:28:57,540] \{scheduler_job.py:1411} DEBUG - Heartbeating the 
> executor
> [2019-08-30 09:28:57,540] \{base_executor.py:124} DEBUG - 0 running task 
> instances
> [2019-08-30 09:28:57,540] \{base_executor.py:125} DEBUG - 0 in queue
> [2019-08-30 09:28:57,540] \{base_executor.py:126} DEBUG - 3 open slots
> [2019-08-30 09:28:57,540] \{base_executor.py:135} DEBUG - Calling the  'airflow.executors.local_executor.LocalExecutor'> sync method
> [2019-08-30 09:28:57,541] \{scheduler_job.py:1432} DEBUG - Ran scheduling 
> loop in 0.00 seconds
> [2019-08-30 09:28:57,541] \{scheduler_job.py:1435} DEBUG - Sleeping for 1.00 
> seconds
> [2019-08-30 09:28:58,543] \{scheduler_job.py:1447} DEBUG - Sleeping for 1.00 
> seconds to prevent excessive logging
> [2019-08-30 09:28:59,541] \{scheduler_job.py:1363} DEBUG - Starting Loop...
> [2019-08-30 09:28:59,541] \{scheduler_job.py:1374} DEBUG - Harvesting DAG 
> parsing results
> [2019-08-30 09:28:59,541] \{dag_processing.py:635} DEBUG - Received message 
> of type DagParsingStat
> [2019-08-30 09:28:59,542] \{dag_processing.py:635} DEBUG - Received message 
> of type DagParsingStat
> [2019-08-30 09:28:59,542] \{dag_processing.py:635} DEBUG - Received message 
> of type DagParsingStat
> [2019-08-30 09:28:59,542] \{dag_processing.py:635} DEBUG - Received message 
> of type DagParsingStat
> [2019-08-30 09:28:59,542] \{scheduler_job.py:1376} DEBUG - Harvested 0 
> SimpleDAGs
> [2019-08-30 09:28:59,542] \{scheduler_job.py:1411} DEBUG - Heartbeating the 
> executor
> [2019-08-30 09:28:59,542] \{base_executor.py:124} DEBUG - 0 running task 
> instances
> [2019-08-30 09:28:59,543] \{base_executor.py:125} DEBUG - 0 in queue
> [2019-08-30 09:28:59,543] \{base_executor.py:126} DEBUG - 3 open slots
> [2019-08-30 09:28:59,543] \{base_executor.py:135} DEBUG - Calling the  'airflow.executors.local_executor.LocalExecutor'> sync method
> [2019-08-30 09:28:59,544] \{scheduler_job.py:1432} DEBUG - Ran scheduling 
> loop in 0.00 seconds
> [2019-08-30 09:28:59,544] \{scheduler_job.py:1435} DEBUG - Sleeping for 1.00 
> seconds
> [2019-08-30 09:29:00,545] \{scheduler_job.py:1447} DEBUG - Sleeping for 1.00 
> seconds to prevent excessive logging
> [2019-08-30 09:29:01,544] \{scheduler_job.py:1363} DEBUG - Starting Loop...
> [2019-08-30 09:29:01,545] \{scheduler_job.py:1374} DEBUG - Harvesting DAG 
> parsing results
> [2019-08-30 09:29:01,545] \{dag_processing.py:635} DEBUG - Received message 
> of type DagParsingStat
> [2019-08-30 09:29:01,545] \{dag_processing.py:635} DEBUG - Received message 
> of type DagParsingStat
> [2019-08-30 09:29:01,545] \{dag_processing.py:635} DEBUG - Received message 
> of type DagParsingStat
> [2019-08-30 09:29:01,545] \{dag_processing.py:635} DEBUG - Received message 
> of type DagParsingStat
> [2019-08-30 09:29:01,546] \{scheduler_job.py:1376} DEBUG - Harvested 0 
> SimpleDAGs
> [2019-08-30 09:29:01,546] \{scheduler_job.py:1411} DEBUG - Heartbeating the 
> executor
> [2019-08-30 09:29:01,546] \{base_executor.py:124} DEBUG - 0 running task 
> instances
> [2019-08-30 09:29:01,546] \{base_executor.py:125} DEBUG - 0 in queue
> [2019-08-30 09:29:01,547] \{base_executor.py:126} DEBUG - 3 open slots
> [2019-08-30 09:29:01,

[GitHub] [airflow] kaxil commented on a change in pull request #7523: [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod

2020-02-25 Thread GitBox
kaxil commented on a change in pull request #7523: [AIRFLOW-6843] Add 
delete_option_kwargs to delete_namespaced_pod
URL: https://github.com/apache/airflow/pull/7523#discussion_r383944608
 
 

 ##
 File path: airflow/config_templates/config.yml
 ##
 @@ -2035,6 +2035,17 @@
   type: string
   example: ~
   default: ""
+- name: delete_option_kwargs
+  description: |
+Optional keyword arguments to pass to the ``delete_namespaced_pod`` 
kubernetes client
+``core_v1_api`` method when using the Kubernetes Executor.
+This should be an object and can contain any of the options listed in 
the ``v1DeleteOptions``
+class defined here:
+
https://github.com/kubernetes-client/python/blob/41f11a09995efcd0142e25946adc7591431bfb2f/kubernetes/client/models/v1_delete_options.py#L19
+  version_added: ~
+  type: string
+  example: "{{\"grace_period_seconds\": 10}}"
 
 Review comment:
   It is tricky because of `task_log_prefix_template = 
{{ti.dag_id}}-{{ti.task_id}}-{{execution_date}}-{{try_number}}`. It would 
replcae it too `task_log_prefix_template = 
ti.dag_id-ti.task_id-execution_date-try_number`
   
   > Maybe we need to update the pre-commit script to replace `{` with `{{` and 
`}` with `}}` so that this and the default_airflow.cfg are correct.
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] potiuk commented on a change in pull request #7534: [AIRFLOW-6867] Fix bug in how we call kill_zombies

2020-02-25 Thread GitBox
potiuk commented on a change in pull request #7534: [AIRFLOW-6867] Fix bug in 
how we call kill_zombies
URL: https://github.com/apache/airflow/pull/7534#discussion_r383942394
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -877,7 +877,7 @@ def process_file(self, file_path, zombies, 
pickle_dags=False, session=None):
 except Exception:
 self.log.exception("Error logging import errors!")
 try:
-self.kill_zombies(zombies)
+self.kill_zombies(dagbag, zombies)
 
 Review comment:
   Tomek has the PR #7484 in progress for that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] potiuk commented on a change in pull request #7534: [AIRFLOW-6867] Fix bug in how we call kill_zombies

2020-02-25 Thread GitBox
potiuk commented on a change in pull request #7534: [AIRFLOW-6867] Fix bug in 
how we call kill_zombies
URL: https://github.com/apache/airflow/pull/7534#discussion_r383942031
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -877,7 +877,7 @@ def process_file(self, file_path, zombies, 
pickle_dags=False, session=None):
 except Exception:
 self.log.exception("Error logging import errors!")
 try:
-self.kill_zombies(zombies)
+self.kill_zombies(dagbag, zombies)
 
 Review comment:
   Indeed .


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on issue #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-02-25 Thread GitBox
feluelle commented on issue #6007: [AIRFLOW-2310] Enable AWS Glue Job 
Integration
URL: https://github.com/apache/airflow/pull/6007#issuecomment-590915162
 
 
   Please also move the tests from contrib to providers


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] feluelle commented on issue #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-02-25 Thread GitBox
feluelle commented on issue #6007: [AIRFLOW-2310] Enable AWS Glue Job 
Integration
URL: https://github.com/apache/airflow/pull/6007#issuecomment-590914376
 
 
   If you check the Files Changed tab you can see what is included in your PR 
and there are still the docs/integration and docs/code files and the contrib 
files (hook, sensor and operator) in it. Please remove those. You have the 
operators-and-hooks and the providers files instead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] kaxil commented on a change in pull request #7523: [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod

2020-02-25 Thread GitBox
kaxil commented on a change in pull request #7523: [AIRFLOW-6843] Add 
delete_option_kwargs to delete_namespaced_pod
URL: https://github.com/apache/airflow/pull/7523#discussion_r383936665
 
 

 ##
 File path: airflow/config_templates/config.yml
 ##
 @@ -2035,6 +2035,17 @@
   type: string
   example: ~
   default: ""
+- name: delete_option_kwargs
+  description: |
+Optional keyword arguments to pass to the ``delete_namespaced_pod`` 
kubernetes client
+``core_v1_api`` method when using the Kubernetes Executor.
+This should be an object and can contain any of the options listed in 
the ``v1DeleteOptions``
+class defined here:
+
https://github.com/kubernetes-client/python/blob/41f11a09995efcd0142e25946adc7591431bfb2f/kubernetes/client/models/v1_delete_options.py#L19
+  version_added: ~
+  type: string
+  example: "{{\"grace_period_seconds\": 10}}"
 
 Review comment:
   We need `{{"grace_period_seconds": 10}}` in airflow.cfg because ConfigParser 
does not parse `{` properly


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] kaxil commented on a change in pull request #7523: [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod

2020-02-25 Thread GitBox
kaxil commented on a change in pull request #7523: [AIRFLOW-6843] Add 
delete_option_kwargs to delete_namespaced_pod
URL: https://github.com/apache/airflow/pull/7523#discussion_r383936665
 
 

 ##
 File path: airflow/config_templates/config.yml
 ##
 @@ -2035,6 +2035,17 @@
   type: string
   example: ~
   default: ""
+- name: delete_option_kwargs
+  description: |
+Optional keyword arguments to pass to the ``delete_namespaced_pod`` 
kubernetes client
+``core_v1_api`` method when using the Kubernetes Executor.
+This should be an object and can contain any of the options listed in 
the ``v1DeleteOptions``
+class defined here:
+
https://github.com/kubernetes-client/python/blob/41f11a09995efcd0142e25946adc7591431bfb2f/kubernetes/client/models/v1_delete_options.py#L19
+  version_added: ~
+  type: string
+  example: "{{\"grace_period_seconds\": 10}}"
 
 Review comment:
   We need `{{"grace_period_seconds": 10}}` in airflow.cfg because ConfigParser 
does not parse `{` properly


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (AIRFLOW-6915) Adds AI Platform Console link for MLEngineStartTrainingJobOperator

2020-02-25 Thread Mark Zhang (Jira)
Mark Zhang created AIRFLOW-6915:
---

 Summary: Adds AI Platform Console link for 
MLEngineStartTrainingJobOperator
 Key: AIRFLOW-6915
 URL: https://issues.apache.org/jira/browse/AIRFLOW-6915
 Project: Apache Airflow
  Issue Type: New Feature
  Components: gcp, operators
Affects Versions: 2.0.0
Reporter: Mark Zhang
Assignee: Mark Zhang


Adds AI Platform Console link for MLEngineStartTrainingJobOperator



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] ashb commented on a change in pull request #7523: [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod

2020-02-25 Thread GitBox
ashb commented on a change in pull request #7523: [AIRFLOW-6843] Add 
delete_option_kwargs to delete_namespaced_pod
URL: https://github.com/apache/airflow/pull/7523#discussion_r383935977
 
 

 ##
 File path: airflow/config_templates/config.yml
 ##
 @@ -2035,6 +2035,17 @@
   type: string
   example: ~
   default: ""
+- name: delete_option_kwargs
+  description: |
+Optional keyword arguments to pass to the ``delete_namespaced_pod`` 
kubernetes client
+``core_v1_api`` method when using the Kubernetes Executor.
+This should be an object and can contain any of the options listed in 
the ``v1DeleteOptions``
+class defined here:
+
https://github.com/kubernetes-client/python/blob/41f11a09995efcd0142e25946adc7591431bfb2f/kubernetes/client/models/v1_delete_options.py#L19
+  version_added: ~
+  type: string
+  example: "{{\"grace_period_seconds\": 10}}"
 
 Review comment:
   Maybe we need to update the pre-commit script to replace `{` with `{{` and 
`}` with `}}` so that this and the default_airflow.cfg are correct.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #7523: [AIRFLOW-6843] Add delete_option_kwargs to delete_namespaced_pod

2020-02-25 Thread GitBox
ashb commented on a change in pull request #7523: [AIRFLOW-6843] Add 
delete_option_kwargs to delete_namespaced_pod
URL: https://github.com/apache/airflow/pull/7523#discussion_r383935377
 
 

 ##
 File path: airflow/config_templates/config.yml
 ##
 @@ -2035,6 +2035,17 @@
   type: string
   example: ~
   default: ""
+- name: delete_option_kwargs
+  description: |
+Optional keyword arguments to pass to the ``delete_namespaced_pod`` 
kubernetes client
+``core_v1_api`` method when using the Kubernetes Executor.
+This should be an object and can contain any of the options listed in 
the ``v1DeleteOptions``
+class defined here:
+
https://github.com/kubernetes-client/python/blob/41f11a09995efcd0142e25946adc7591431bfb2f/kubernetes/client/models/v1_delete_options.py#L19
+  version_added: ~
+  type: string
+  example: "{{\"grace_period_seconds\": 10}}"
 
 Review comment:
   Doesn't affect the output, but could be done as:
   
   ```suggestion
 example: '{{"grace_period_seconds": 10}}'
   ```
   
   Also: OH GOD. Do we show this as `{` or `{{` when we render the docs. 
Becasuse it needs to be `{` -- `{{` is only needed in default_airflow.cfg 
because that is a template, but we should show in our docs is not 
templated/formated.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (AIRFLOW-5485) scheduler_job: replace == None with is_() comparison

2020-02-25 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-5485?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17044549#comment-17044549
 ] 

ASF GitHub Bot commented on AIRFLOW-5485:
-

ashb commented on pull request #7533: [AIRFLOW-5485] - scheduler_job: replace 
== None with is
URL: https://github.com/apache/airflow/pull/7533
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> scheduler_job: replace == None with is_() comparison
> 
>
> Key: AIRFLOW-5485
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5485
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: hooks
>Affects Versions: 1.10.6
>Reporter: Jakob Homan
>Assignee: Saurabh Dhupar
>Priority: Minor
>  Labels: ccoss2019, newbie
>
> Note: This ticket's being created to facilitate a new contributor's workshop 
> for Airflow. After the workshop has completed, I'll mark these all available 
> for anyone that might like to take them on.
> airflow/jobs/scheduler_job.py:855
> {code:java}
> .filter(or_(DR.run_id == None,  # noqa: E711 pylint: 
> disable=singleton-comparison
> not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%'
> .outerjoin(DM, DM.dag_id == TI.dag_id)
> .filter(or_(DM.dag_id == None,  # noqa: E711 pylint: 
> disable=singleton-comparison
> not_(DM.is_paused))) {code}
> We're using {{col == None}} here where we should use {{col.is_(None)}} to 
> avoid confusion as and lint warnings. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (AIRFLOW-5485) scheduler_job: replace == None with is_() comparison

2020-02-25 Thread Ash Berlin-Taylor (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-5485?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ash Berlin-Taylor resolved AIRFLOW-5485.

Resolution: Fixed

(Resolved without fix for as it was fixed in the linked parent issue)

> scheduler_job: replace == None with is_() comparison
> 
>
> Key: AIRFLOW-5485
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5485
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: hooks
>Affects Versions: 1.10.6
>Reporter: Jakob Homan
>Assignee: Saurabh Dhupar
>Priority: Minor
>  Labels: ccoss2019, newbie
>
> Note: This ticket's being created to facilitate a new contributor's workshop 
> for Airflow. After the workshop has completed, I'll mark these all available 
> for anyone that might like to take them on.
> airflow/jobs/scheduler_job.py:855
> {code:java}
> .filter(or_(DR.run_id == None,  # noqa: E711 pylint: 
> disable=singleton-comparison
> not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%'
> .outerjoin(DM, DM.dag_id == TI.dag_id)
> .filter(or_(DM.dag_id == None,  # noqa: E711 pylint: 
> disable=singleton-comparison
> not_(DM.is_paused))) {code}
> We're using {{col == None}} here where we should use {{col.is_(None)}} to 
> avoid confusion as and lint warnings. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (AIRFLOW-6907) Simplify SchedulerJob

2020-02-25 Thread Ash Berlin-Taylor (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ash Berlin-Taylor resolved AIRFLOW-6907.

Fix Version/s: 2.0.0
   Resolution: Fixed

> Simplify SchedulerJob
> -
>
> Key: AIRFLOW-6907
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6907
> Project: Apache Airflow
>  Issue Type: New Feature
>  Components: scheduler
>Affects Versions: 1.10.9
>Reporter: Kamil Bregula
>Priority: Major
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] ashb closed pull request #7533: [AIRFLOW-5485] - scheduler_job: replace == None with is

2020-02-25 Thread GitBox
ashb closed pull request #7533: [AIRFLOW-5485] - scheduler_job: replace == None 
with is
URL: https://github.com/apache/airflow/pull/7533
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on issue #7533: [AIRFLOW-5485] - scheduler_job: replace == None with is

2020-02-25 Thread GitBox
ashb commented on issue #7533: [AIRFLOW-5485] - scheduler_job: replace == None 
with is
URL: https://github.com/apache/airflow/pull/7533#issuecomment-590909998
 
 
   The other PR has been merged, so I'm closing this. Thanks for the PR though!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] nuclearpinguin commented on issue #7484: [AIRFLOW-6864] Make airflow/jobs pylint compatible

2020-02-25 Thread GitBox
nuclearpinguin commented on issue #7484: [AIRFLOW-6864] Make airflow/jobs 
pylint compatible
URL: https://github.com/apache/airflow/pull/7484#issuecomment-590904410
 
 
   > @nuclearpinguin See 
https://github.com/apache/airflow/pull/7527/files#r383812108 -- this is my only 
outstanding query about this PR.
   
   This PR runs pylint over `scheduler_job.py` the one you mention does not. I 
suppose once I rebase I will have to fix it in more places. @ashb 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] kaxil commented on a change in pull request #7532: [AIRFLOW-XXXX] Fix outdated doc on settings.policy

2020-02-25 Thread GitBox
kaxil commented on a change in pull request #7532: [AIRFLOW-] Fix outdated 
doc on settings.policy
URL: https://github.com/apache/airflow/pull/7532#discussion_r383925599
 
 

 ##
 File path: airflow/settings.py
 ##
 @@ -79,28 +79,26 @@
 json = json
 
 
-def policy(task_instance):
+def policy(task):
 
 Review comment:
   No worries :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] Fokko commented on a change in pull request #7532: [AIRFLOW-XXXX] Fix outdated doc on settings.policy

2020-02-25 Thread GitBox
Fokko commented on a change in pull request #7532: [AIRFLOW-] Fix outdated 
doc on settings.policy
URL: https://github.com/apache/airflow/pull/7532#discussion_r383924936
 
 

 ##
 File path: airflow/settings.py
 ##
 @@ -79,28 +79,26 @@
 json = json
 
 
-def policy(task_instance):
+def policy(task):
 
 Review comment:
   Much better, sorry for the fuzz.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7527: [AIRFLOW-6907] Simplify SchedulerJob

2020-02-25 Thread GitBox
nuclearpinguin commented on a change in pull request #7527: [AIRFLOW-6907] 
Simplify SchedulerJob
URL: https://github.com/apache/airflow/pull/7527#discussion_r383921545
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1098,37 +1094,20 @@ def _find_executable_task_instances(self, 
simple_dag_bag, states, session=None):
 TI = models.TaskInstance
 DR = models.DagRun
 DM = models.DagModel
-ti_query = (
+task_instances_to_examine = (
 session
 .query(TI)
 .filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
 .outerjoin(
-DR,
-and_(DR.dag_id == TI.dag_id, DR.execution_date == 
TI.execution_date)
+DR, and_(DR.dag_id == TI.dag_id, DR.execution_date == 
TI.execution_date)
 )
-.filter(or_(DR.run_id == None,  # noqa: E711 pylint: 
disable=singleton-comparison
-not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%'
+.filter(or_(DR.run_id.is_(None), 
not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%'
 .outerjoin(DM, DM.dag_id == TI.dag_id)
-.filter(or_(DM.dag_id == None,  # noqa: E711 pylint: 
disable=singleton-comparison
-not_(DM.is_paused)))
+.filter(or_(DM.dag_id.is_(None), not_(DM.is_paused)))
 
 Review comment:
   It's still in TODO list:
   
https://github.com/PolideaInternal/airflow/blob/37c630da636d1ef352273e33fbdb417727914386/scripts/ci/pylint_todo.txt#L11
   
   And I suppose that once I rebase onto new master I will have to fix it in 
more places...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] kaxil commented on a change in pull request #7532: [AIRFLOW-XXXX] Fix outdated doc on settings.policy

2020-02-25 Thread GitBox
kaxil commented on a change in pull request #7532: [AIRFLOW-] Fix outdated 
doc on settings.policy
URL: https://github.com/apache/airflow/pull/7532#discussion_r383921528
 
 

 ##
 File path: airflow/settings.py
 ##
 @@ -79,28 +79,26 @@
 json = json
 
 
-def policy(task_instance):
+def policy(task):
 
 Review comment:
   Removed Type Annotation


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] kaxil merged pull request #7532: [AIRFLOW-XXXX] Fix outdated doc on settings.policy

2020-02-25 Thread GitBox
kaxil merged pull request #7532: [AIRFLOW-] Fix outdated doc on 
settings.policy
URL: https://github.com/apache/airflow/pull/7532
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7527: [AIRFLOW-6907] Simplify SchedulerJob

2020-02-25 Thread GitBox
nuclearpinguin commented on a change in pull request #7527: [AIRFLOW-6907] 
Simplify SchedulerJob
URL: https://github.com/apache/airflow/pull/7527#discussion_r383920198
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1098,37 +1094,20 @@ def _find_executable_task_instances(self, 
simple_dag_bag, states, session=None):
 TI = models.TaskInstance
 DR = models.DagRun
 DM = models.DagModel
-ti_query = (
+task_instances_to_examine = (
 session
 .query(TI)
 .filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
 .outerjoin(
-DR,
-and_(DR.dag_id == TI.dag_id, DR.execution_date == 
TI.execution_date)
+DR, and_(DR.dag_id == TI.dag_id, DR.execution_date == 
TI.execution_date)
 )
-.filter(or_(DR.run_id == None,  # noqa: E711 pylint: 
disable=singleton-comparison
-not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%'
+.filter(or_(DR.run_id.is_(None), 
not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%'
 .outerjoin(DM, DM.dag_id == TI.dag_id)
-.filter(or_(DM.dag_id == None,  # noqa: E711 pylint: 
disable=singleton-comparison
-not_(DM.is_paused)))
+.filter(or_(DM.dag_id.is_(None), not_(DM.is_paused)))
 
 Review comment:
   I don't know, but here we do not run pylint over this file, do we? @ashb 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] kaxil commented on a change in pull request #7532: [AIRFLOW-XXXX] Fix outdated doc on settings.policy

2020-02-25 Thread GitBox
kaxil commented on a change in pull request #7532: [AIRFLOW-] Fix outdated 
doc on settings.policy
URL: https://github.com/apache/airflow/pull/7532#discussion_r383920439
 
 

 ##
 File path: airflow/settings.py
 ##
 @@ -79,28 +79,26 @@
 json = json
 
 
-def policy(task_instance):
+def policy(task):
 
 Review comment:
   Ya it is causing issues: https://travis-ci.org/apache/airflow/jobs/654914183


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] nuclearpinguin commented on a change in pull request #7527: [AIRFLOW-6907] Simplify SchedulerJob

2020-02-25 Thread GitBox
nuclearpinguin commented on a change in pull request #7527: [AIRFLOW-6907] 
Simplify SchedulerJob
URL: https://github.com/apache/airflow/pull/7527#discussion_r383920198
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1098,37 +1094,20 @@ def _find_executable_task_instances(self, 
simple_dag_bag, states, session=None):
 TI = models.TaskInstance
 DR = models.DagRun
 DM = models.DagModel
-ti_query = (
+task_instances_to_examine = (
 session
 .query(TI)
 .filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
 .outerjoin(
-DR,
-and_(DR.dag_id == TI.dag_id, DR.execution_date == 
TI.execution_date)
+DR, and_(DR.dag_id == TI.dag_id, DR.execution_date == 
TI.execution_date)
 )
-.filter(or_(DR.run_id == None,  # noqa: E711 pylint: 
disable=singleton-comparison
-not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%'
+.filter(or_(DR.run_id.is_(None), 
not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%'
 .outerjoin(DM, DM.dag_id == TI.dag_id)
-.filter(or_(DM.dag_id == None,  # noqa: E711 pylint: 
disable=singleton-comparison
-not_(DM.is_paused)))
+.filter(or_(DM.dag_id.is_(None), not_(DM.is_paused)))
 
 Review comment:
   I don't know, but here we do not run pylint over this file, do we?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (AIRFLOW-6907) Simplify SchedulerJob

2020-02-25 Thread ASF subversion and git services (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-6907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17044481#comment-17044481
 ] 

ASF subversion and git services commented on AIRFLOW-6907:
--

Commit 77b532062cc3fb4407f7e4a99626c31b29133854 in airflow's branch 
refs/heads/master from Kamil Breguła
[ https://gitbox.apache.org/repos/asf?p=airflow.git;h=77b5320 ]

[AIRFLOW-6907] Simplify SchedulerJob (#7527)



> Simplify SchedulerJob
> -
>
> Key: AIRFLOW-6907
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6907
> Project: Apache Airflow
>  Issue Type: New Feature
>  Components: scheduler
>Affects Versions: 1.10.9
>Reporter: Kamil Bregula
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (AIRFLOW-6907) Simplify SchedulerJob

2020-02-25 Thread ASF subversion and git services (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-6907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17044482#comment-17044482
 ] 

ASF subversion and git services commented on AIRFLOW-6907:
--

Commit 77b532062cc3fb4407f7e4a99626c31b29133854 in airflow's branch 
refs/heads/master from Kamil Breguła
[ https://gitbox.apache.org/repos/asf?p=airflow.git;h=77b5320 ]

[AIRFLOW-6907] Simplify SchedulerJob (#7527)



> Simplify SchedulerJob
> -
>
> Key: AIRFLOW-6907
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6907
> Project: Apache Airflow
>  Issue Type: New Feature
>  Components: scheduler
>Affects Versions: 1.10.9
>Reporter: Kamil Bregula
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (AIRFLOW-6907) Simplify SchedulerJob

2020-02-25 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-6907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17044478#comment-17044478
 ] 

ASF GitHub Bot commented on AIRFLOW-6907:
-

mik-laj commented on pull request #7527: [AIRFLOW-6907] Simplify SchedulerJob
URL: https://github.com/apache/airflow/pull/7527
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Simplify SchedulerJob
> -
>
> Key: AIRFLOW-6907
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6907
> Project: Apache Airflow
>  Issue Type: New Feature
>  Components: scheduler
>Affects Versions: 1.10.9
>Reporter: Kamil Bregula
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] mik-laj merged pull request #7527: [AIRFLOW-6907] Simplify SchedulerJob

2020-02-25 Thread GitBox
mik-laj merged pull request #7527: [AIRFLOW-6907] Simplify SchedulerJob
URL: https://github.com/apache/airflow/pull/7527
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] Fokko commented on a change in pull request #7532: [AIRFLOW-XXXX] Fix outdated doc on settings.policy

2020-02-25 Thread GitBox
Fokko commented on a change in pull request #7532: [AIRFLOW-] Fix outdated 
doc on settings.policy
URL: https://github.com/apache/airflow/pull/7532#discussion_r383905306
 
 

 ##
 File path: airflow/settings.py
 ##
 @@ -79,28 +79,26 @@
 json = json
 
 
-def policy(task_instance):
+def policy(task):
 
 Review comment:
   I prefer 2, but if there is any chance of having cyclic dependencies we 
should just leave it out. The first one is a bit awkward.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] kaxil commented on a change in pull request #7532: [AIRFLOW-XXXX] Fix outdated doc on settings.policy

2020-02-25 Thread GitBox
kaxil commented on a change in pull request #7532: [AIRFLOW-] Fix outdated 
doc on settings.policy
URL: https://github.com/apache/airflow/pull/7532#discussion_r383902307
 
 

 ##
 File path: airflow/settings.py
 ##
 @@ -79,28 +79,26 @@
 json = json
 
 
-def policy(task_instance):
+def policy(task):
 
 Review comment:
   What version would you like/recommend:
   
   1. With TYPE_CHECKING - 
https://github.com/apache/airflow/pull/7532/commits/fb4ede5523cb1e0aea8bff95766feffb54d5f0a5
   2. Without TYPE_CHECKING - 
https://github.com/apache/airflow/pull/7532/commits/7776f8178d3b61cf003b10eccb7caa2be3f504a2
   
   With (2) I am just a bit worried that there might be a chance for cyclic 
dependencies.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] abdulbasitds commented on issue #6007: [AIRFLOW-2310] Enable AWS Glue Job Integration

2020-02-25 Thread GitBox
abdulbasitds commented on issue #6007: [AIRFLOW-2310] Enable AWS Glue Job 
Integration
URL: https://github.com/apache/airflow/pull/6007#issuecomment-590883802
 
 
   @feluelle I am not sure bout the old files in the contrib and docs, can you 
please elaborate more on which files needs to be deleted?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Issue Comment Deleted] (AIRFLOW-6867) Decouple DagBag and TaskInstance

2020-02-25 Thread Ash Berlin-Taylor (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ash Berlin-Taylor updated AIRFLOW-6867:
---
Comment: was deleted

(was: ashb commented on pull request #7534: [AIRFLOW-6867] Fix bug in how we 
call kill_zombies
URL: https://github.com/apache/airflow/pull/7534
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org
)

> Decouple DagBag and TaskInstance
> 
>
> Key: AIRFLOW-6867
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6867
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DAG
>Affects Versions: 1.10.9
>Reporter: Kamil Bregula
>Assignee: Kamil Bregula
>Priority: Major
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (AIRFLOW-6867) Decouple DagBag and TaskInstance

2020-02-25 Thread Ash Berlin-Taylor (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ash Berlin-Taylor updated AIRFLOW-6867:
---
Comment: was deleted

(was: ashb commented on pull request #7534: [AIRFLOW-6867] Fix bug in kill 
zombies
URL: https://github.com/apache/airflow/pull/7534
 
 
   The refactor in #7488 introduced a scheduler-breaking bug.
   
   I am not happy about merging this without a unit test to stop it breaking 
again, but given how much process_file does it's hard to test. Thoughts?
   
   ---
   Issue link: WILL BE INSERTED BY 
[boring-cyborg](https://github.com/kaxil/boring-cyborg)
   
   Make sure to mark the boxes below before creating PR: [x]
   
   - [ ] Description above provides context of the change
   - [ ] Commit message/PR title starts with `[AIRFLOW-]`. AIRFLOW- = 
JIRA ID*
   - [ ] Unit tests coverage for changes (not needed for documentation changes)
   - [ ] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [ ] Relevant documentation is updated including usage instructions.
   - [ ] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   * For document-only changes commit message can start with 
`[AIRFLOW-]`.
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org
)

> Decouple DagBag and TaskInstance
> 
>
> Key: AIRFLOW-6867
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6867
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DAG
>Affects Versions: 1.10.9
>Reporter: Kamil Bregula
>Assignee: Kamil Bregula
>Priority: Major
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (AIRFLOW-6910) Fix kill_zombie method call

2020-02-25 Thread Ash Berlin-Taylor (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ash Berlin-Taylor updated AIRFLOW-6910:
---
Affects Version/s: (was: 1.10.9)
   2.0.0

> Fix kill_zombie method call
> ---
>
> Key: AIRFLOW-6910
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6910
> Project: Apache Airflow
>  Issue Type: New Feature
>  Components: scheduler
>Affects Versions: 2.0.0
>Reporter: Kamil Bregula
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (AIRFLOW-6910) Fix kill_zombie method call

2020-02-25 Thread Ash Berlin-Taylor (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ash Berlin-Taylor resolved AIRFLOW-6910.

Resolution: Fixed

> Fix kill_zombie method call
> ---
>
> Key: AIRFLOW-6910
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6910
> Project: Apache Airflow
>  Issue Type: New Feature
>  Components: scheduler
>Affects Versions: 1.10.9
>Reporter: Kamil Bregula
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] Fokko commented on a change in pull request #7532: [AIRFLOW-XXXX] Fix outdated doc on settings.policy

2020-02-25 Thread GitBox
Fokko commented on a change in pull request #7532: [AIRFLOW-] Fix outdated 
doc on settings.policy
URL: https://github.com/apache/airflow/pull/7532#discussion_r383895918
 
 

 ##
 File path: airflow/settings.py
 ##
 @@ -79,28 +79,26 @@
 json = json
 
 
-def policy(task_instance):
+def policy(task):
 
 Review comment:
   When the user copies the function, they'll also copy the annotation, makes 
the code more robust :)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on issue #7531: [AIRFLOW-6910] Fix kill_zombie method call

2020-02-25 Thread GitBox
ashb commented on issue #7531: [AIRFLOW-6910] Fix kill_zombie method call
URL: https://github.com/apache/airflow/pull/7531#issuecomment-590880959
 
 
   @mik-laj @Fokko In situations like this can we please get in to the habit of 
targeting the bug fix at the same Jira issue as the original breaking change  
-- that way if we ever need to backport it we get the change _and_ the fix. Two 
jiras means the fix is likely to get missed.
   
   (Plus right now one PR = one Jira is just busy work, if we're going to do 
this lets just stop using Jira)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (AIRFLOW-6867) Decouple DagBag and TaskInstance

2020-02-25 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-6867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17044460#comment-17044460
 ] 

ASF GitHub Bot commented on AIRFLOW-6867:
-

ashb commented on pull request #7534: [AIRFLOW-6867] Fix bug in how we call 
kill_zombies
URL: https://github.com/apache/airflow/pull/7534
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Decouple DagBag and TaskInstance
> 
>
> Key: AIRFLOW-6867
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6867
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DAG
>Affects Versions: 1.10.9
>Reporter: Kamil Bregula
>Assignee: Kamil Bregula
>Priority: Major
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] ashb closed pull request #7534: [AIRFLOW-6867] Fix bug in how we call kill_zombies

2020-02-25 Thread GitBox
ashb closed pull request #7534: [AIRFLOW-6867] Fix bug in how we call 
kill_zombies
URL: https://github.com/apache/airflow/pull/7534
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] kaxil commented on a change in pull request #7532: [AIRFLOW-XXXX] Fix outdated doc on settings.policy

2020-02-25 Thread GitBox
kaxil commented on a change in pull request #7532: [AIRFLOW-] Fix outdated 
doc on settings.policy
URL: https://github.com/apache/airflow/pull/7532#discussion_r383887968
 
 

 ##
 File path: airflow/settings.py
 ##
 @@ -79,28 +79,26 @@
 json = json
 
 
-def policy(task_instance):
+def policy(task):
 
 Review comment:
   Can add but this function needs to be overriden by users in their 
`airflow_local_settings.py` - so adding type annotation here won't have any 
impact I feel. WDYT?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] Fokko commented on a change in pull request #7532: [AIRFLOW-XXXX] Fix outdated doc on settings.policy

2020-02-25 Thread GitBox
Fokko commented on a change in pull request #7532: [AIRFLOW-] Fix outdated 
doc on settings.policy
URL: https://github.com/apache/airflow/pull/7532#discussion_r383884871
 
 

 ##
 File path: airflow/settings.py
 ##
 @@ -79,28 +79,26 @@
 json = json
 
 
-def policy(task_instance):
+def policy(task):
 
 Review comment:
   Maybe add a type annotation here?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (AIRFLOW-6867) Decouple DagBag and TaskInstance

2020-02-25 Thread Fokko Driesprong (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6867?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fokko Driesprong reassigned AIRFLOW-6867:
-

Assignee: Kamil Bregula

> Decouple DagBag and TaskInstance
> 
>
> Key: AIRFLOW-6867
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6867
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DAG
>Affects Versions: 1.10.9
>Reporter: Kamil Bregula
>Assignee: Kamil Bregula
>Priority: Major
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (AIRFLOW-6910) Fix kill_zombie method call

2020-02-25 Thread ASF subversion and git services (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-6910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1703#comment-1703
 ] 

ASF subversion and git services commented on AIRFLOW-6910:
--

Commit a62bb619320544404271dd1a41eaf398f00aeef9 in airflow's branch 
refs/heads/master from Kamil Breguła
[ https://gitbox.apache.org/repos/asf?p=airflow.git;h=a62bb61 ]

[AIRFLOW-6910] Fix kill_zombie method call (#7531)



> Fix kill_zombie method call
> ---
>
> Key: AIRFLOW-6910
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6910
> Project: Apache Airflow
>  Issue Type: New Feature
>  Components: scheduler
>Affects Versions: 1.10.9
>Reporter: Kamil Bregula
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (AIRFLOW-6910) Fix kill_zombie method call

2020-02-25 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-6910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1702#comment-1702
 ] 

ASF GitHub Bot commented on AIRFLOW-6910:
-

Fokko commented on pull request #7531: [AIRFLOW-6910] Fix kill_zombie method 
call
URL: https://github.com/apache/airflow/pull/7531
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix kill_zombie method call
> ---
>
> Key: AIRFLOW-6910
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6910
> Project: Apache Airflow
>  Issue Type: New Feature
>  Components: scheduler
>Affects Versions: 1.10.9
>Reporter: Kamil Bregula
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] Fokko commented on issue #7531: [AIRFLOW-6910] Fix kill_zombie method call

2020-02-25 Thread GitBox
Fokko commented on issue #7531: [AIRFLOW-6910] Fix kill_zombie method call
URL: https://github.com/apache/airflow/pull/7531#issuecomment-590871285
 
 
   Thanks @mik-laj 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] Fokko commented on issue #7534: [AIRFLOW-6867] Fix bug in how we call kill_zombies

2020-02-25 Thread GitBox
Fokko commented on issue #7534: [AIRFLOW-6867] Fix bug in how we call 
kill_zombies
URL: https://github.com/apache/airflow/pull/7534#issuecomment-590871188
 
 
   Duplicate of https://github.com/apache/airflow/pull/7531/


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] Fokko merged pull request #7531: [AIRFLOW-6910] Fix kill_zombie method call

2020-02-25 Thread GitBox
Fokko merged pull request #7531: [AIRFLOW-6910] Fix kill_zombie method call
URL: https://github.com/apache/airflow/pull/7531
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (AIRFLOW-6914) Add a default robots.txt to deny all search engines

2020-02-25 Thread Kaxil Naik (Jira)


 [ 
https://issues.apache.org/jira/browse/AIRFLOW-6914?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kaxil Naik updated AIRFLOW-6914:

Description: 
If the Airflow UI is public, Google can index it and if the Authentication has 
not been enabled it is a serious security threat if it is a prod cluster.

Something like this probably should work


{code:python}
@app.route('/robots.txt', methods=['GET'])
def robotstxt():
return send_from_directory(os.path.join(app.root_path, 'static', 'txt'),
   'robots.txt')
{code}


  was:If the Airflow UI is public, Google can index it and if the 
Authentication has not been enabled it is a serious security threat if it is a 
prod cluster


> Add a default robots.txt to deny all search engines
> ---
>
> Key: AIRFLOW-6914
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6914
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: security, ui
>Affects Versions: 1.10.6, 1.10.7, 1.10.8, 1.10.9
>Reporter: Kaxil Naik
>Assignee: Kaxil Naik
>Priority: Major
>
> If the Airflow UI is public, Google can index it and if the Authentication 
> has not been enabled it is a serious security threat if it is a prod cluster.
> Something like this probably should work
> {code:python}
> @app.route('/robots.txt', methods=['GET'])
> def robotstxt():
> return send_from_directory(os.path.join(app.root_path, 'static', 'txt'),
>'robots.txt')
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (AIRFLOW-6914) Add a default robots.txt to deny all search engines

2020-02-25 Thread Kaxil Naik (Jira)
Kaxil Naik created AIRFLOW-6914:
---

 Summary: Add a default robots.txt to deny all search engines
 Key: AIRFLOW-6914
 URL: https://issues.apache.org/jira/browse/AIRFLOW-6914
 Project: Apache Airflow
  Issue Type: Improvement
  Components: security, ui
Affects Versions: 1.10.9, 1.10.8, 1.10.7, 1.10.6
Reporter: Kaxil Naik
Assignee: Kaxil Naik


If the Airflow UI is public, Google can index it and if the Authentication has 
not been enabled it is a serious security threat if it is a prod cluster



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] kaxil commented on a change in pull request #7534: [AIRFLOW-6867] Fix bug in how we call kill_zombies

2020-02-25 Thread GitBox
kaxil commented on a change in pull request #7534: [AIRFLOW-6867] Fix bug in 
how we call kill_zombies
URL: https://github.com/apache/airflow/pull/7534#discussion_r383878207
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -877,7 +877,7 @@ def process_file(self, file_path, zombies, 
pickle_dags=False, session=None):
 except Exception:
 self.log.exception("Error logging import errors!")
 try:
-self.kill_zombies(zombies)
+self.kill_zombies(dagbag, zombies)
 
 Review comment:
   Maybe pylint would have caught it but it is currently in **pylint_todo**, so 
might have missed:
   
   
https://github.com/apache/airflow/blob/6eaa7e3b1845644d5ec65a00a997f4029bec9628/scripts/ci/pylint_todo.txt#L11


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] kaxil commented on a change in pull request #7534: [AIRFLOW-6867] Fix bug in how we call kill_zombies

2020-02-25 Thread GitBox
kaxil commented on a change in pull request #7534: [AIRFLOW-6867] Fix bug in 
how we call kill_zombies
URL: https://github.com/apache/airflow/pull/7534#discussion_r383878207
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -877,7 +877,7 @@ def process_file(self, file_path, zombies, 
pickle_dags=False, session=None):
 except Exception:
 self.log.exception("Error logging import errors!")
 try:
-self.kill_zombies(zombies)
+self.kill_zombies(dagbag, zombies)
 
 Review comment:
   Maybe pylint would have caught it but it is currently in **ToDo**:
   
   
https://github.com/apache/airflow/blob/6eaa7e3b1845644d5ec65a00a997f4029bec9628/scripts/ci/pylint_todo.txt#L11


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] kaxil commented on a change in pull request #7534: [AIRFLOW-6867] Fix bug in how we call kill_zombies

2020-02-25 Thread GitBox
kaxil commented on a change in pull request #7534: [AIRFLOW-6867] Fix bug in 
how we call kill_zombies
URL: https://github.com/apache/airflow/pull/7534#discussion_r383875096
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -877,7 +877,7 @@ def process_file(self, file_path, zombies, 
pickle_dags=False, session=None):
 except Exception:
 self.log.exception("Error logging import errors!")
 try:
-self.kill_zombies(zombies)
+self.kill_zombies(dagbag, zombies)
 
 Review comment:
   Surprised that our static checkers didn't catch this! Required arg was not 
passed !
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #7534: [AIRFLOW-6867] Fix bug in how we call kill_zombies

2020-02-25 Thread GitBox
ashb commented on a change in pull request #7534: [AIRFLOW-6867] Fix bug in how 
we call kill_zombies
URL: https://github.com/apache/airflow/pull/7534#discussion_r383876717
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -877,7 +877,7 @@ def process_file(self, file_path, zombies, 
pickle_dags=False, session=None):
 except Exception:
 self.log.exception("Error logging import errors!")
 try:
-self.kill_zombies(zombies)
+self.kill_zombies(dagbag, zombies)
 
 Review comment:
   Yeah quite!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] kaxil commented on a change in pull request #7534: [AIRFLOW-6867] Fix bug in how we call kill_zombies

2020-02-25 Thread GitBox
kaxil commented on a change in pull request #7534: [AIRFLOW-6867] Fix bug in 
how we call kill_zombies
URL: https://github.com/apache/airflow/pull/7534#discussion_r383875096
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -877,7 +877,7 @@ def process_file(self, file_path, zombies, 
pickle_dags=False, session=None):
 except Exception:
 self.log.exception("Error logging import errors!")
 try:
-self.kill_zombies(zombies)
+self.kill_zombies(dagbag, zombies)
 
 Review comment:
   Surprised that our static checkers didn't catch this? Required arg was not 
passed !
   
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] ashb commented on a change in pull request #7527: [AIRFLOW-6907] Simplify SchedulerJob

2020-02-25 Thread GitBox
ashb commented on a change in pull request #7527: [AIRFLOW-6907] Simplify 
SchedulerJob
URL: https://github.com/apache/airflow/pull/7527#discussion_r383874965
 
 

 ##
 File path: airflow/jobs/scheduler_job.py
 ##
 @@ -1098,37 +1094,20 @@ def _find_executable_task_instances(self, 
simple_dag_bag, states, session=None):
 TI = models.TaskInstance
 DR = models.DagRun
 DM = models.DagModel
-ti_query = (
+task_instances_to_examine = (
 session
 .query(TI)
 .filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
 .outerjoin(
-DR,
-and_(DR.dag_id == TI.dag_id, DR.execution_date == 
TI.execution_date)
+DR, and_(DR.dag_id == TI.dag_id, DR.execution_date == 
TI.execution_date)
 )
-.filter(or_(DR.run_id == None,  # noqa: E711 pylint: 
disable=singleton-comparison
-not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%'
+.filter(or_(DR.run_id.is_(None), 
not_(DR.run_id.like(BackfillJob.ID_PREFIX + '%'
 .outerjoin(DM, DM.dag_id == TI.dag_id)
-.filter(or_(DM.dag_id == None,  # noqa: E711 pylint: 
disable=singleton-comparison
-not_(DM.is_paused)))
+.filter(or_(DM.dag_id.is_(None), not_(DM.is_paused)))
 
 Review comment:
   (Sorry for not being clear. This comment was not about this PR, but about 
the other one)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [airflow] JavierLopezT commented on a change in pull request #6670: [AIRFLOW-4816]MySqlToS3Operator

2020-02-25 Thread GitBox
JavierLopezT commented on a change in pull request #6670: 
[AIRFLOW-4816]MySqlToS3Operator
URL: https://github.com/apache/airflow/pull/6670#discussion_r383873800
 
 

 ##
 File path: airflow/operators/mysql_to_s3_operator.py
 ##
 @@ -0,0 +1,117 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""
+Transfer data from MySQL into a S3 bucket
+"""
+from io import StringIO
+from typing import Optional, Union
+
+import numpy as np
+import pandas as pd
+
+from airflow.hooks.mysql_hook import MySqlHook
+from airflow.models import BaseOperator
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.utils.decorators import apply_defaults
+
+
+class MySQLToS3Operator(BaseOperator):
+"""
+Saves data from an specific MySQL query into a file in S3.
+
+:param query: the sql query to be executed.
+:type query: str
+:param s3_bucket: bucket where the data will be stored
+:type s3_bucket: str
+:param s3_key: desired key for the file. It includes the name of the file.
+If a csv file is wanted, the param must end with ".csv".
+:type s3_key: str
+:param mysql_conn_id: reference to a specific mysql database
+:type mysql_conn_id: str
+ :param aws_conn_id: reference to a specific S3 connection
+:type aws_conn_id: str
+:param verify: Whether or not to verify SSL certificates for S3 connection.
+By default SSL certificates are verified.
+You can provide the following values:
+
+- False: do not validate SSL certificates. SSL will still be used
+ (unless use_ssl is False), but SSL certificates will not be
+ verified.
+- path/to/cert/bundle.pem: A filename of the CA cert bundle to uses.
+ You can specify this argument if you want to use a different
+ CA cert bundle than the one used by botocore.
+:type verify: bool or str
+"""
+
+@apply_defaults
+def __init__(
+self,
+query: str,
+s3_bucket: str,
+s3_key: str,
+mysql_conn_id: str = 'mysql_default',
+aws_conn_id: str = 'aws_default',
+verify: Optional[Union[bool, str]] = None,
+header: Optional[bool] = False,
+index: Optional[bool] = False,
+*args, **kwargs) -> None:
+super().__init__(*args, **kwargs)
+self.query = query
+self.s3_bucket = s3_bucket
+self.s3_key = s3_key
+self.mysql_conn_id = mysql_conn_id
+self.aws_conn_id = aws_conn_id
+self.verify = verify
+self.header = header
+self.index = index
+
+def fill_na_with_none(self, series):
+"""Replace NaN values with None"""
+return np.where(series.isnull(), None, series)
+
+def fix_int_dtypes(self, df):
+"""
+Mutate DataFrame to set dtypes for int columns containing NaN values."
+"""
+for col in df:
+if "float" in df[col].dtype.name and df[col].hasnans:
+# inspect values to determine if dtype of non-null values is 
int or float
+notna_series = df[col].dropna().values
+if np.isclose(notna_series, notna_series.astype(int)).all():
+# set to dtype that retains integers and supports NaNs
+df[col] = 
self.fill_na_with_none(df[col]).astype(pd.Int64Dtype)
+
+def execute(self, context, **kwargs):
+self.hook = MySqlHook(mysql_conn_id=self.mysql_conn_id)
+self.s3_conn = S3Hook(aws_conn_id=self.aws_conn_id, verify=self.verify)
+data_df = self.hook.get_pandas_df(self.query)
+self.log.info("Data from MySQL obtained")
+
+self.fix_int_dtypes(data_df)
+file_obj = StringIO()
 
 Review comment:
   I have been testing this. Unfortunately (with the changes I had to make with 
pickle) this doesn't generate a readable csv. I have been trying to imitate the 
functionality of NamedTemporaryFile using:
   ```
   with open('tmp_file.csv', 'w+') as tmp_csv:
   df.to_csv(tmp_csv)
   temporary_file_path = os.path.realpath(tmp_csv.name)
   os.remove(temporary_file_path)
   ```
   However, Ubuntu s

[jira] [Commented] (AIRFLOW-6867) Decouple DagBag and TaskInstance

2020-02-25 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-6867?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17044426#comment-17044426
 ] 

ASF GitHub Bot commented on AIRFLOW-6867:
-

ashb commented on pull request #7534: [AIRFLOW-6867] Fix bug in kill zombies
URL: https://github.com/apache/airflow/pull/7534
 
 
   The refactor in #7488 introduced a scheduler-breaking bug.
   
   I am not happy about merging this without a unit test to stop it breaking 
again, but given how much process_file does it's hard to test. Thoughts?
   
   ---
   Issue link: WILL BE INSERTED BY 
[boring-cyborg](https://github.com/kaxil/boring-cyborg)
   
   Make sure to mark the boxes below before creating PR: [x]
   
   - [ ] Description above provides context of the change
   - [ ] Commit message/PR title starts with `[AIRFLOW-]`. AIRFLOW- = 
JIRA ID*
   - [ ] Unit tests coverage for changes (not needed for documentation changes)
   - [ ] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [ ] Relevant documentation is updated including usage instructions.
   - [ ] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   * For document-only changes commit message can start with 
`[AIRFLOW-]`.
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Decouple DagBag and TaskInstance
> 
>
> Key: AIRFLOW-6867
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6867
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: DAG
>Affects Versions: 1.10.9
>Reporter: Kamil Bregula
>Priority: Major
> Fix For: 2.0.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


<    1   2   3   >