[GitHub] [airflow] khyurri edited a comment on pull request #8818: WIP: Add support for Microsoft Azure Blob Storage in Google Cloud Storage Transfer Service operators

2020-06-23 Thread GitBox


khyurri edited a comment on pull request #8818:
URL: https://github.com/apache/airflow/pull/8818#issuecomment-647571911


   @potiuk @ad-m 
   I builded documentation locally, but I can't find example html for this 
file. 
`airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_azure.py`
   
   How examples are displayed in the documentation?
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] khyurri edited a comment on pull request #8818: WIP: Add support for Microsoft Azure Blob Storage in Google Cloud Storage Transfer Service operators

2020-06-23 Thread GitBox


khyurri edited a comment on pull request #8818:
URL: https://github.com/apache/airflow/pull/8818#issuecomment-647571911


   @potiuk @ad-m 
   I builded documentation locally, but I can't find example html for this 
file. 
`airflow/providers/google/cloud/example_dags/example_cloud_storage_transfer_service_azure.py`
   
   How examples are displayed in the documentation?
   As far as I understand this documentation is displayed here 
https://airflow.apache.org/docs/stable/howto/operator/gcp/transfer.html. 
   Is it enough for me to check that the examples from the documentation work, 
or should I do something else?
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] CodingJonas closed issue #8420: DockerSwarmOperator always pulls docker image

2020-06-23 Thread GitBox


CodingJonas closed issue #8420:
URL: https://github.com/apache/airflow/issues/8420


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

2020-06-23 Thread GitBox


turbaszek commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r444023173



##
File path: airflow/operators/python.py
##
@@ -261,6 +265,14 @@ def __init__(  # pylint: disable=too-many-arguments
 *args,
 **kwargs
 ):
+if (
+not isinstance(python_callable, types.FunctionType) or
+isinstance(python_callable, types.LambdaType) and 
python_callable.__name__ == ""
+):
+raise AirflowException('PythonVirtualenvOperator only supports 
functions for python_callable arg')
+if python_version and str(python_version)[0] != 
str(sys.version_info[0]) and (op_args or op_kwargs):

Review comment:
   ```suggestion
   if python_version and str(python_version)[0] != 
sys.version_info.major and (op_args or op_kwargs):
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

2020-06-23 Thread GitBox


turbaszek commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r444027117



##
File path: airflow/operators/python.py
##
@@ -269,140 +281,132 @@ def __init__(  # pylint: disable=too-many-arguments
 templates_exts=templates_exts,
 *args,
 **kwargs)
-self.requirements = requirements or []
+self.requirements = list(requirements or [])
 self.string_args = string_args or []
 self.python_version = python_version
 self.use_dill = use_dill
 self.system_site_packages = system_site_packages
-# check that dill is present if needed
-dill_in_requirements = map(lambda x: x.lower().startswith('dill'),
-   self.requirements)
-if (not system_site_packages) and use_dill and not 
any(dill_in_requirements):
-raise AirflowException('If using dill, dill must be in the 
environment ' +
-   'either via system_site_packages or 
requirements')
-# check that a function is passed, and that it is not a lambda
-if (not isinstance(self.python_callable,
-   types.FunctionType) or 
(self.python_callable.__name__ ==
-   (lambda x: 0).__name__)):
-raise AirflowException('{} only supports functions for 
python_callable arg'.format(
-self.__class__.__name__))
-# check that args are passed iff python major version matches
-if (python_version is not None and
-   str(python_version)[0] != str(sys.version_info[0]) and
-   self._pass_op_args()):
-raise AirflowException("Passing op_args or op_kwargs is not 
supported across "
-   "different Python major versions "
-   "for PythonVirtualenvOperator. "
-   "Please use string_args.")
+if not self.system_site_packages and self.use_dill and 'dill' not in 
self.requirements:
+self.requirements.append('dill')
+self.pickling_library = dill if self.use_dill else pickle
 
 def execute_callable(self):
 with TemporaryDirectory(prefix='venv') as tmp_dir:
 if self.templates_dict:
 self.op_kwargs['templates_dict'] = self.templates_dict
-# generate filenames
+
 input_filename = os.path.join(tmp_dir, 'script.in')
 output_filename = os.path.join(tmp_dir, 'script.out')
 string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
 script_filename = os.path.join(tmp_dir, 'script.py')
 
-# set up virtualenv
-python_bin = 'python' + str(self.python_version) if 
self.python_version else None
 prepare_virtualenv(
 venv_directory=tmp_dir,
-python_bin=python_bin,
+python_bin=f'python{self.python_version}' if 
self.python_version else None,
 system_site_packages=self.system_site_packages,
-requirements=self.requirements,
+requirements=self.requirements
 )
 
 self._write_args(input_filename)
-self._write_script(script_filename)
 self._write_string_args(string_args_filename)
+self._write_script(script_filename)
+
+execute_in_subprocess(cmd=[
+f'{tmp_dir}/bin/python',
+script_filename,
+input_filename,
+output_filename,
+string_args_filename
+])
 
-# execute command in virtualenv
-execute_in_subprocess(
-self._generate_python_cmd(tmp_dir,
-  script_filename,
-  input_filename,
-  output_filename,
-  string_args_filename))
 return self._read_result(output_filename)
 
-def _pass_op_args(self):
-# we should only pass op_args if any are given to us
-return len(self.op_args) + len(self.op_kwargs) > 0
+def _write_args(self, filename):
+if self.op_args or self.op_kwargs:
+if self.op_kwargs:
+# some items from context can't be loaded in virtual env
+self._keep_serializable_op_kwargs()
+print(self.op_kwargs)
+with open(filename, 'wb') as file:
+self.pickling_library.dump({'args': self.op_args, 'kwargs': 
self.op_kwargs}, file)
+
+def _keep_serializable_op_kwargs(self):
+# Remove unserializable objects
+# otherwise "KeyError: 'Variable __getstate__ does not exist'" would 
be raised.
+self.op_kwargs.pop('var', None)
+# otherwise "TypeError: cannot serialize '_io

[GitHub] [airflow] feluelle commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

2020-06-23 Thread GitBox


feluelle commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r444036138



##
File path: airflow/operators/python.py
##
@@ -269,140 +281,132 @@ def __init__(  # pylint: disable=too-many-arguments
 templates_exts=templates_exts,
 *args,
 **kwargs)
-self.requirements = requirements or []
+self.requirements = list(requirements or [])
 self.string_args = string_args or []
 self.python_version = python_version
 self.use_dill = use_dill
 self.system_site_packages = system_site_packages
-# check that dill is present if needed
-dill_in_requirements = map(lambda x: x.lower().startswith('dill'),
-   self.requirements)
-if (not system_site_packages) and use_dill and not 
any(dill_in_requirements):
-raise AirflowException('If using dill, dill must be in the 
environment ' +
-   'either via system_site_packages or 
requirements')
-# check that a function is passed, and that it is not a lambda
-if (not isinstance(self.python_callable,
-   types.FunctionType) or 
(self.python_callable.__name__ ==
-   (lambda x: 0).__name__)):
-raise AirflowException('{} only supports functions for 
python_callable arg'.format(
-self.__class__.__name__))
-# check that args are passed iff python major version matches
-if (python_version is not None and
-   str(python_version)[0] != str(sys.version_info[0]) and
-   self._pass_op_args()):
-raise AirflowException("Passing op_args or op_kwargs is not 
supported across "
-   "different Python major versions "
-   "for PythonVirtualenvOperator. "
-   "Please use string_args.")
+if not self.system_site_packages and self.use_dill and 'dill' not in 
self.requirements:
+self.requirements.append('dill')
+self.pickling_library = dill if self.use_dill else pickle
 
 def execute_callable(self):
 with TemporaryDirectory(prefix='venv') as tmp_dir:
 if self.templates_dict:
 self.op_kwargs['templates_dict'] = self.templates_dict
-# generate filenames
+
 input_filename = os.path.join(tmp_dir, 'script.in')
 output_filename = os.path.join(tmp_dir, 'script.out')
 string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
 script_filename = os.path.join(tmp_dir, 'script.py')
 
-# set up virtualenv
-python_bin = 'python' + str(self.python_version) if 
self.python_version else None
 prepare_virtualenv(
 venv_directory=tmp_dir,
-python_bin=python_bin,
+python_bin=f'python{self.python_version}' if 
self.python_version else None,
 system_site_packages=self.system_site_packages,
-requirements=self.requirements,
+requirements=self.requirements
 )
 
 self._write_args(input_filename)
-self._write_script(script_filename)
 self._write_string_args(string_args_filename)
+self._write_script(script_filename)
+
+execute_in_subprocess(cmd=[
+f'{tmp_dir}/bin/python',
+script_filename,
+input_filename,
+output_filename,
+string_args_filename
+])
 
-# execute command in virtualenv
-execute_in_subprocess(
-self._generate_python_cmd(tmp_dir,
-  script_filename,
-  input_filename,
-  output_filename,
-  string_args_filename))
 return self._read_result(output_filename)
 
-def _pass_op_args(self):
-# we should only pass op_args if any are given to us
-return len(self.op_args) + len(self.op_kwargs) > 0
+def _write_args(self, filename):
+if self.op_args or self.op_kwargs:
+if self.op_kwargs:
+# some items from context can't be loaded in virtual env
+self._keep_serializable_op_kwargs()
+print(self.op_kwargs)
+with open(filename, 'wb') as file:
+self.pickling_library.dump({'args': self.op_args, 'kwargs': 
self.op_kwargs}, file)
+
+def _keep_serializable_op_kwargs(self):
+# Remove unserializable objects
+# otherwise "KeyError: 'Variable __getstate__ does not exist'" would 
be raised.
+self.op_kwargs.pop('var', None)
+# otherwise "TypeError: cannot serialize '_io.

[GitHub] [airflow] turbaszek commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

2020-06-23 Thread GitBox


turbaszek commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r444039647



##
File path: airflow/operators/python.py
##
@@ -269,140 +281,132 @@ def __init__(  # pylint: disable=too-many-arguments
 templates_exts=templates_exts,
 *args,
 **kwargs)
-self.requirements = requirements or []
+self.requirements = list(requirements or [])
 self.string_args = string_args or []
 self.python_version = python_version
 self.use_dill = use_dill
 self.system_site_packages = system_site_packages
-# check that dill is present if needed
-dill_in_requirements = map(lambda x: x.lower().startswith('dill'),
-   self.requirements)
-if (not system_site_packages) and use_dill and not 
any(dill_in_requirements):
-raise AirflowException('If using dill, dill must be in the 
environment ' +
-   'either via system_site_packages or 
requirements')
-# check that a function is passed, and that it is not a lambda
-if (not isinstance(self.python_callable,
-   types.FunctionType) or 
(self.python_callable.__name__ ==
-   (lambda x: 0).__name__)):
-raise AirflowException('{} only supports functions for 
python_callable arg'.format(
-self.__class__.__name__))
-# check that args are passed iff python major version matches
-if (python_version is not None and
-   str(python_version)[0] != str(sys.version_info[0]) and
-   self._pass_op_args()):
-raise AirflowException("Passing op_args or op_kwargs is not 
supported across "
-   "different Python major versions "
-   "for PythonVirtualenvOperator. "
-   "Please use string_args.")
+if not self.system_site_packages and self.use_dill and 'dill' not in 
self.requirements:
+self.requirements.append('dill')
+self.pickling_library = dill if self.use_dill else pickle
 
 def execute_callable(self):
 with TemporaryDirectory(prefix='venv') as tmp_dir:
 if self.templates_dict:
 self.op_kwargs['templates_dict'] = self.templates_dict
-# generate filenames
+
 input_filename = os.path.join(tmp_dir, 'script.in')
 output_filename = os.path.join(tmp_dir, 'script.out')
 string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
 script_filename = os.path.join(tmp_dir, 'script.py')
 
-# set up virtualenv
-python_bin = 'python' + str(self.python_version) if 
self.python_version else None
 prepare_virtualenv(
 venv_directory=tmp_dir,
-python_bin=python_bin,
+python_bin=f'python{self.python_version}' if 
self.python_version else None,
 system_site_packages=self.system_site_packages,
-requirements=self.requirements,
+requirements=self.requirements
 )
 
 self._write_args(input_filename)
-self._write_script(script_filename)
 self._write_string_args(string_args_filename)
+self._write_script(script_filename)
+
+execute_in_subprocess(cmd=[
+f'{tmp_dir}/bin/python',
+script_filename,
+input_filename,
+output_filename,
+string_args_filename
+])
 
-# execute command in virtualenv
-execute_in_subprocess(
-self._generate_python_cmd(tmp_dir,
-  script_filename,
-  input_filename,
-  output_filename,
-  string_args_filename))
 return self._read_result(output_filename)
 
-def _pass_op_args(self):
-# we should only pass op_args if any are given to us
-return len(self.op_args) + len(self.op_kwargs) > 0
+def _write_args(self, filename):
+if self.op_args or self.op_kwargs:
+if self.op_kwargs:
+# some items from context can't be loaded in virtual env
+self._keep_serializable_op_kwargs()
+print(self.op_kwargs)
+with open(filename, 'wb') as file:
+self.pickling_library.dump({'args': self.op_args, 'kwargs': 
self.op_kwargs}, file)
+
+def _keep_serializable_op_kwargs(self):
+# Remove unserializable objects
+# otherwise "KeyError: 'Variable __getstate__ does not exist'" would 
be raised.
+self.op_kwargs.pop('var', None)
+# otherwise "TypeError: cannot serialize '_io

[GitHub] [airflow] turbaszek commented on a change in pull request #9280: Functionality to shuffle HMS connections to be used by HiveMetastoreHook facilitating load balancing

2020-06-23 Thread GitBox


turbaszek commented on a change in pull request #9280:
URL: https://github.com/apache/airflow/pull/9280#discussion_r444053678



##
File path: tests/hooks/test_hive_hook.py
##
@@ -390,6 +390,25 @@ def test_table_exists(self):
 self.hook.table_exists(str(random.randint(1, 1)))
 )
 
+def test_check_hms_clients_load_balance(self):
+#   checks if every time HMS Hook is instantiated, it gets to
+#   different HMS server most of the time and not to the same HMS 
server.
+connection_count = {}
+hms_hook = HiveMetastoreHook()
+hms_server_count = len(hms_hook.get_connections('metastore_default'))
+
+if hms_server_count > 2:
+for index in range(2 * hms_server_count):
+conn = HiveMetastoreHook()._find_valid_server().host
+if conn in connection_count:
+if connection_count[conn] >= (2 * hms_server_count) - 1:
+self.assertTrue(1 == 2)
+else:
+connection_count[conn] = connection_count[conn] + 1
+else:
+connection_count[conn] = 1
+self.assertTrue(1 == 1)

Review comment:
   I appreciate the test, however I'm wondering if it would be sufficient 
to mock the `random.shuffle` and assert that it was called? Additionaly we can 
mock it with callable that gives deterministic result i.e `lambda xs: 
xs[::-1]`, WDYT?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ashb opened a new pull request #9484: Fix typo in helm chart upgrade command for 2.0

2020-06-23 Thread GitBox


ashb opened a new pull request #9484:
URL: https://github.com/apache/airflow/pull/9484


   This affects trying to run the chart with main/2.0
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Target Github ISSUE in description if exists
   - [x] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] CodingJonas opened a new issue #9485: Unwanted backfill when using empty database even with catchup set to false

2020-06-23 Thread GitBox


CodingJonas opened a new issue #9485:
URL: https://github.com/apache/airflow/issues/9485


   **Apache Airflow version**:v1.10
   
   **Environment**: Deployed inside a Docker container
   
   - **Cloud provider or hardware configuration**:
   - **OS** (e.g. from /etc/os-release): Ubuntu 18.4
   - **Install tools**: pipenv
   
   **What happened**:
   I have a DAG with start date of 1/1/2018 and schedule to run through a cron 
expression (e.g. every 2 hours). This is the first time I start Airflow with a 
new database. No DAG has ever run before. As soon as I unpause my DAG, the 
scheduler will start running one DAG. I set `catchup=False` to stop this 
behaviour from happening, yet it still schedules exactly one DAG for backfill.
   If there was a previous DAG run this behaviour does not happen.
   
   I found a [reference to this 
issue](https://stackoverflow.com/questions/52177418/how-to-stop-dag-from-backfilling-catchup-by-default-false-and-catchup-false-doe#comment92792366_52922501)
 already from 2018, which describes a very similar problem, but I couldn't find 
an open issue to this.
   
   **What you expected to happen**:
   If I set `catchup=False`, I expect no backfill to happen.
   
   **How to reproduce it**:
   These are the DAG settings I use:
   ```python
   dag_params = dict(
   dag_id='test',
   schedule_interval='0 */2 * * *',
   start_date=datetime(2018, 1, 1),
   max_active_runs=1,
   catchup=False,
   )
   ```
   
   **Comment**:
   I read somewhere that setting a future `start_date` could help, I tried it 
with `datetime(3018, 1, 1)`, which got rid of the unwanted backfill, but no 
DAGs got executed. More interestingly it started a DAG and instantly set it to 
'success´ without running a single task. But I think this is unrelated to this 
issue.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] OmairK commented on pull request #9476: Remove PATCH /dags/{dag_id}/dagRuns/{dag_run_id} endpoint

2020-06-23 Thread GitBox


OmairK commented on pull request #9476:
URL: https://github.com/apache/airflow/pull/9476#issuecomment-648026550


   > This code looks fine. @mik-laj do you know if there was a reason the spec 
included the ability to update dag runs via PATCH?
   
   @mik-laj himself suggested to remove this endpoint.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] kaxil merged pull request #9484: Fix typo in helm chart upgrade command for 2.0

2020-06-23 Thread GitBox


kaxil merged pull request #9484:
URL: https://github.com/apache/airflow/pull/9484


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[airflow] branch master updated (ee36142 -> b1cd382)

2020-06-23 Thread kaxilnaik
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git.


from ee36142  Remove unused recurse_tasks function (#9465)
 add b1cd382  Fix typo in helm chart upgrade command for 2.0 (#9484)

No new revisions were added by this update.

Summary of changes:
 chart/templates/scheduler/scheduler-deployment.yaml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



[GitHub] [airflow] turbaszek merged pull request #9483: Correct command for starting Celery Flower

2020-06-23 Thread GitBox


turbaszek merged pull request #9483:
URL: https://github.com/apache/airflow/pull/9483


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[airflow] branch master updated: Correct command for starting Celery Flower (#9483)

2020-06-23 Thread turbaszek
This is an automated email from the ASF dual-hosted git repository.

turbaszek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
 new a00e188  Correct command for starting Celery Flower (#9483)
a00e188 is described below

commit a00e188ded01028409e041130d1e4f02e4e3a109
Author: zikun <33176974+zi...@users.noreply.github.com>
AuthorDate: Tue Jun 23 17:40:36 2020 +0800

Correct command for starting Celery Flower (#9483)
---
 docs/executor/celery.rst | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/docs/executor/celery.rst b/docs/executor/celery.rst
index a3fa7ed..cc2a2af 100644
--- a/docs/executor/celery.rst
+++ b/docs/executor/celery.rst
@@ -70,7 +70,7 @@ to start a Flower web server:
 
 .. code-block:: bash
 
-airflow celery stop
+airflow celery flower
 
 Please note that you must have the ``flower`` python library already installed 
on your system. The recommend way is to install the airflow celery bundle.
 



[GitHub] [airflow] ChristianYeah commented on issue #9209: mark dag_run as failed internally, eg, in one of the internal tasks

2020-06-23 Thread GitBox


ChristianYeah commented on issue #9209:
URL: https://github.com/apache/airflow/issues/9209#issuecomment-648032574


   > can we explain the requirement with clear example?
   
   ```
   import airflow
   from airflow import DAG
   from airflow.operators.python_operator import PythonOperator
   import requests
   
   
   default_args = {
   'owner': 'airflow',
   }
   
   dag = DAG(
   dag_id='test_max_active_runs',
   default_args=default_args,
   schedule_interval=None,
   start_date=airflow.utils.dates.days_ago(1),
   max_active_runs=1,
   catchup=False
   )
   
   
   def example_func():
   data = requests.get("some url").json()
   if data.get('code', -1) == 0:
   #  no error in the response
   #  further process
   #  eg.
   print(data.get('results', []))
   else:
   #  error in response
   print(data.get('message', ''))
   # mark the task as failed here
   # something like below
   fail_this_task()
   
   
   example = PythonOperator(
   task_id='example',
   python_callable=example_func,
   dag=dag,
   )
   
   ```
   
   is that possible I can fail the task dynamically if I get the response of 
"code" except 0?
   
   @barmanand , thank you very much



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] boring-cyborg[bot] commented on issue #9486: ECSOperator failed to display logs from Cloudwatch after providing log configurations

2020-06-23 Thread GitBox


boring-cyborg[bot] commented on issue #9486:
URL: https://github.com/apache/airflow/issues/9486#issuecomment-648039392


   Thanks for opening your first issue here! Be sure to follow the issue 
template!
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] barmanand commented on issue #9209: mark dag_run as failed internally, eg, in one of the internal tasks

2020-06-23 Thread GitBox


barmanand commented on issue #9209:
URL: https://github.com/apache/airflow/issues/9209#issuecomment-648039812


   yes @ChristianYeah You can raise AirflowException and it will fail the task 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] Shadowsong27 opened a new issue #9486: ECSOperator failed to display logs from Cloudwatch after providing log configurations

2020-06-23 Thread GitBox


Shadowsong27 opened a new issue #9486:
URL: https://github.com/apache/airflow/issues/9486


   
   
   
   
   **Apache Airflow version**:
   
   v1.10.10
   
   **Kubernetes version (if you are using kubernetes)** (use `kubectl version`):
   
   **Environment**:
   
   - **Cloud provider or hardware configuration**: AWS
   - **OS** (e.g. from /etc/os-release): Deployed in ECS Fargate
   - **Install tools**: poetry
   
   **What happened**:
   
   I am trying to display the logs of Fargate Tasks in Airflow UI when using 
ECSOperator, and it is achieved by providing the following Task argument 
according to the docs. 
   1. `awslogs_group`
   2. `awslogs_region`
   3. `awslogs_stream_prefix`
   
   However, it failed to work with an error msg 
   ```
   An error occurred (ResourceNotFoundException) when calling the GetLogEvents 
operation: The specified log stream does not exist
   ```
   
   I went on and examine the source code and realise the log stream in the 
[source 
code](https://github.com/apache/airflow/blob/a00e188ded01028409e041130d1e4f02e4e3a109/airflow/providers/amazon/aws/operators/ecs.py#L208)
 was constructed using this 
   ```
   stream_name = "{}/{}".format(self.awslogs_stream_prefix, task_id)
   ```
   
   where in the [official AWS 
docs](https://docs.aws.amazon.com/AmazonECS/latest/developerguide/using_awslogs.html),
 it states
   
   ```
   The awslogs-stream-prefix option allows you to associate a log stream with 
the specified prefix, the container name, and the ID of the Amazon ECS task to 
which the container belongs. If you specify a prefix with this option, then the 
log stream takes the following format:
   
   prefix-name/container-name/ecs-task-id
   ```
   
   And after I manually patch my container name into the Airflow Task argument 
`awslogs_stream_prefix` it works (my current workaround).
   
   Personally I am not sure whether this issue arises due to the specific 
configuration in Fargate i have, or it is generic enough to be considered as a 
bug.

   
   
   
   
   
   **What you expected to happen**: Logs displaying in Airflow Log UI
   

   
   log stream prefix is not dynamically constructed properly
   
   **How to reproduce it**: 
   
   Running any ECS Fargate task with ECS Operator with default awslog 
configuration should be able to reproduce this.
   
   
   
   
   **Anything else we need to know**:
   
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[airflow] branch v1-10-test updated: fixup! Fix failing tests from #9250 (#9307)

2020-06-23 Thread kaxilnaik
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-test by this push:
 new c0544c8  fixup! Fix failing tests from #9250 (#9307)
c0544c8 is described below

commit c0544c85ab7c3824cde178d4b22060097b616ba7
Author: Kaxil Naik 
AuthorDate: Tue Jun 23 11:09:11 2020 +0100

fixup! Fix failing tests from #9250 (#9307)
---
 tests/www_rbac/test_views.py | 8 
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/tests/www_rbac/test_views.py b/tests/www_rbac/test_views.py
index a5210f3..35dab60 100644
--- a/tests/www_rbac/test_views.py
+++ b/tests/www_rbac/test_views.py
@@ -574,10 +574,10 @@ class TestAirflowBaseViews(TestBase):
 self.check_content_in_response('example_bash_operator', resp)
 
 @parameterized.expand([
-("hello\nworld", r'\"conf\":{\"abc\":\"hello\\nworld\"}}'),
-("hello'world", r'\"conf\":{\"abc\":\"hello\\u0027world\"}}'),
-("

[GitHub] [airflow] potiuk merged pull request #9481: Remove redundant parentheses in /test_datacatalog.py

2020-06-23 Thread GitBox


potiuk merged pull request #9481:
URL: https://github.com/apache/airflow/pull/9481


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[airflow] branch master updated (a00e188 -> 7a6fcc4)

2020-06-23 Thread potiuk
This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git.


from a00e188  Correct command for starting Celery Flower (#9483)
 add 7a6fcc4  Remove redundant parentheses in /test_datacatalog.py (#9481)

No new revisions were added by this update.

Summary of changes:
 tests/providers/google/cloud/hooks/test_datacatalog.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)



[airflow] branch master updated (7a6fcc4 -> 9278857)

2020-06-23 Thread potiuk
This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git.


from 7a6fcc4  Remove redundant parentheses in /test_datacatalog.py (#9481)
 add 9278857  Fix typo in the word "default" in www/forms.py (#9480)

No new revisions were added by this update.

Summary of changes:
 airflow/www/forms.py | 12 ++--
 1 file changed, 6 insertions(+), 6 deletions(-)



[GitHub] [airflow] potiuk merged pull request #9480: Fix typo in the word "default" in www/forms.py

2020-06-23 Thread GitBox


potiuk merged pull request #9480:
URL: https://github.com/apache/airflow/pull/9480


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ChristianYeah commented on issue #9209: mark dag_run as failed internally, eg, in one of the internal tasks

2020-06-23 Thread GitBox


ChristianYeah commented on issue #9209:
URL: https://github.com/apache/airflow/issues/9209#issuecomment-648054894


   @barmanand thank you



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ChristianYeah closed issue #9209: mark dag_run as failed internally, eg, in one of the internal tasks

2020-06-23 Thread GitBox


ChristianYeah closed issue #9209:
URL: https://github.com/apache/airflow/issues/9209


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] kaxil opened a new pull request #9487: Replace deprecated wtforms HTMLString with markupsafe.MarkUp

2020-06-23 Thread GitBox


kaxil opened a new pull request #9487:
URL: https://github.com/apache/airflow/pull/9487


   WTForms uses `MarkUp` to escape strings now and removed their internal class 
HTMLString in Master. Details: https://github.com/wtforms/wtforms/pull/400
   
   That change previously broke Airflow for new users (in 2.3.0). However on 
users request they added `HTMLString` that just passes all args to 
`markupsafe.MarkUp` back for temporary Backward compatbility with deprecation 
warning in 2.3.1. Details: https://github.com/wtforms/wtforms/issues/581
   
   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Target Github ISSUE in description if exists
   - [x] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] kaxil commented on a change in pull request #9487: Replace deprecated wtforms HTMLString with markupsafe.MarkUp

2020-06-23 Thread GitBox


kaxil commented on a change in pull request #9487:
URL: https://github.com/apache/airflow/pull/9487#discussion_r444141552



##
File path: setup.py
##
@@ -709,6 +709,7 @@ def is_package_excluded(package: str, exclusion_list: 
List[str]):
 'lazy_object_proxy~=1.3',
 'lockfile>=0.12.2',
 'markdown>=2.5.2, <3.0',
+'markupsafe>=1.1.1, <2.0',

Review comment:
   It is not a new dependency as it was already required by jinja2, 
Flask-App builder etc
   
   https://github.com/dpgaspar/Flask-AppBuilder/blob/master/requirements.txt

##
File path: setup.py
##
@@ -709,6 +709,7 @@ def is_package_excluded(package: str, exclusion_list: 
List[str]):
 'lazy_object_proxy~=1.3',
 'lockfile>=0.12.2',
 'markdown>=2.5.2, <3.0',
+'markupsafe>=1.1.1, <2.0',

Review comment:
   It is not a new dependency as it was already required by jinja2, 
Flask-AppBuilder etc
   
   https://github.com/dpgaspar/Flask-AppBuilder/blob/master/requirements.txt





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek commented on pull request #9330: Add read-only Task endpoint

2020-06-23 Thread GitBox


turbaszek commented on pull request #9330:
URL: https://github.com/apache/airflow/pull/9330#issuecomment-648079790


   @ephraimbuddy would you mind taking a look?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] feluelle commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

2020-06-23 Thread GitBox


feluelle commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r444159472



##
File path: airflow/operators/python.py
##
@@ -269,140 +281,132 @@ def __init__(  # pylint: disable=too-many-arguments
 templates_exts=templates_exts,
 *args,
 **kwargs)
-self.requirements = requirements or []
+self.requirements = list(requirements or [])
 self.string_args = string_args or []
 self.python_version = python_version
 self.use_dill = use_dill
 self.system_site_packages = system_site_packages
-# check that dill is present if needed
-dill_in_requirements = map(lambda x: x.lower().startswith('dill'),
-   self.requirements)
-if (not system_site_packages) and use_dill and not 
any(dill_in_requirements):
-raise AirflowException('If using dill, dill must be in the 
environment ' +
-   'either via system_site_packages or 
requirements')
-# check that a function is passed, and that it is not a lambda
-if (not isinstance(self.python_callable,
-   types.FunctionType) or 
(self.python_callable.__name__ ==
-   (lambda x: 0).__name__)):
-raise AirflowException('{} only supports functions for 
python_callable arg'.format(
-self.__class__.__name__))
-# check that args are passed iff python major version matches
-if (python_version is not None and
-   str(python_version)[0] != str(sys.version_info[0]) and
-   self._pass_op_args()):
-raise AirflowException("Passing op_args or op_kwargs is not 
supported across "
-   "different Python major versions "
-   "for PythonVirtualenvOperator. "
-   "Please use string_args.")
+if not self.system_site_packages and self.use_dill and 'dill' not in 
self.requirements:
+self.requirements.append('dill')
+self.pickling_library = dill if self.use_dill else pickle
 
 def execute_callable(self):
 with TemporaryDirectory(prefix='venv') as tmp_dir:
 if self.templates_dict:
 self.op_kwargs['templates_dict'] = self.templates_dict
-# generate filenames
+
 input_filename = os.path.join(tmp_dir, 'script.in')
 output_filename = os.path.join(tmp_dir, 'script.out')
 string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
 script_filename = os.path.join(tmp_dir, 'script.py')
 
-# set up virtualenv
-python_bin = 'python' + str(self.python_version) if 
self.python_version else None
 prepare_virtualenv(
 venv_directory=tmp_dir,
-python_bin=python_bin,
+python_bin=f'python{self.python_version}' if 
self.python_version else None,
 system_site_packages=self.system_site_packages,
-requirements=self.requirements,
+requirements=self.requirements
 )
 
 self._write_args(input_filename)
-self._write_script(script_filename)
 self._write_string_args(string_args_filename)
+self._write_script(script_filename)
+
+execute_in_subprocess(cmd=[
+f'{tmp_dir}/bin/python',
+script_filename,
+input_filename,
+output_filename,
+string_args_filename
+])
 
-# execute command in virtualenv
-execute_in_subprocess(
-self._generate_python_cmd(tmp_dir,
-  script_filename,
-  input_filename,
-  output_filename,
-  string_args_filename))
 return self._read_result(output_filename)
 
-def _pass_op_args(self):
-# we should only pass op_args if any are given to us
-return len(self.op_args) + len(self.op_kwargs) > 0
+def _write_args(self, filename):
+if self.op_args or self.op_kwargs:
+if self.op_kwargs:
+# some items from context can't be loaded in virtual env
+self._keep_serializable_op_kwargs()
+print(self.op_kwargs)
+with open(filename, 'wb') as file:
+self.pickling_library.dump({'args': self.op_args, 'kwargs': 
self.op_kwargs}, file)
+
+def _keep_serializable_op_kwargs(self):
+# Remove unserializable objects
+# otherwise "KeyError: 'Variable __getstate__ does not exist'" would 
be raised.
+self.op_kwargs.pop('var', None)
+# otherwise "TypeError: cannot serialize '_io.

[GitHub] [airflow] feluelle commented on a change in pull request #9394: Fix PythonVirtualenvOperator not working with Airflow context

2020-06-23 Thread GitBox


feluelle commented on a change in pull request #9394:
URL: https://github.com/apache/airflow/pull/9394#discussion_r444165384



##
File path: airflow/operators/python.py
##
@@ -269,140 +281,132 @@ def __init__(  # pylint: disable=too-many-arguments
 templates_exts=templates_exts,
 *args,
 **kwargs)
-self.requirements = requirements or []
+self.requirements = list(requirements or [])
 self.string_args = string_args or []
 self.python_version = python_version
 self.use_dill = use_dill
 self.system_site_packages = system_site_packages
-# check that dill is present if needed
-dill_in_requirements = map(lambda x: x.lower().startswith('dill'),
-   self.requirements)
-if (not system_site_packages) and use_dill and not 
any(dill_in_requirements):
-raise AirflowException('If using dill, dill must be in the 
environment ' +
-   'either via system_site_packages or 
requirements')
-# check that a function is passed, and that it is not a lambda
-if (not isinstance(self.python_callable,
-   types.FunctionType) or 
(self.python_callable.__name__ ==
-   (lambda x: 0).__name__)):
-raise AirflowException('{} only supports functions for 
python_callable arg'.format(
-self.__class__.__name__))
-# check that args are passed iff python major version matches
-if (python_version is not None and
-   str(python_version)[0] != str(sys.version_info[0]) and
-   self._pass_op_args()):
-raise AirflowException("Passing op_args or op_kwargs is not 
supported across "
-   "different Python major versions "
-   "for PythonVirtualenvOperator. "
-   "Please use string_args.")
+if not self.system_site_packages and self.use_dill and 'dill' not in 
self.requirements:
+self.requirements.append('dill')
+self.pickling_library = dill if self.use_dill else pickle
 
 def execute_callable(self):
 with TemporaryDirectory(prefix='venv') as tmp_dir:
 if self.templates_dict:
 self.op_kwargs['templates_dict'] = self.templates_dict
-# generate filenames
+
 input_filename = os.path.join(tmp_dir, 'script.in')
 output_filename = os.path.join(tmp_dir, 'script.out')
 string_args_filename = os.path.join(tmp_dir, 'string_args.txt')
 script_filename = os.path.join(tmp_dir, 'script.py')
 
-# set up virtualenv
-python_bin = 'python' + str(self.python_version) if 
self.python_version else None
 prepare_virtualenv(
 venv_directory=tmp_dir,
-python_bin=python_bin,
+python_bin=f'python{self.python_version}' if 
self.python_version else None,
 system_site_packages=self.system_site_packages,
-requirements=self.requirements,
+requirements=self.requirements
 )
 
 self._write_args(input_filename)
-self._write_script(script_filename)
 self._write_string_args(string_args_filename)
+self._write_script(script_filename)
+
+execute_in_subprocess(cmd=[
+f'{tmp_dir}/bin/python',
+script_filename,
+input_filename,
+output_filename,
+string_args_filename
+])
 
-# execute command in virtualenv
-execute_in_subprocess(
-self._generate_python_cmd(tmp_dir,
-  script_filename,
-  input_filename,
-  output_filename,
-  string_args_filename))
 return self._read_result(output_filename)
 
-def _pass_op_args(self):
-# we should only pass op_args if any are given to us
-return len(self.op_args) + len(self.op_kwargs) > 0
+def _write_args(self, filename):
+if self.op_args or self.op_kwargs:
+if self.op_kwargs:
+# some items from context can't be loaded in virtual env
+self._keep_serializable_op_kwargs()
+print(self.op_kwargs)

Review comment:
   ToRemove





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on pull request #9329: Pool CRUD Endpoints

2020-06-23 Thread GitBox


mik-laj commented on pull request #9329:
URL: https://github.com/apache/airflow/pull/9329#issuecomment-648115434


   @OmairK 
   ```
   PATCH /api/v1/pools/default_pool?update_mask=name,slots
   {
   "name": "default_pool",
   "sluts: 80
   }
   ```
   Will this request be handled correctly?  The name has not been changed 
because it is the same all the time.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #9329: Pool CRUD Endpoints

2020-06-23 Thread GitBox


mik-laj commented on a change in pull request #9329:
URL: https://github.com/apache/airflow/pull/9329#discussion_r444187393



##
File path: airflow/api_connexion/endpoints/pool_endpoint.py
##
@@ -53,18 +59,59 @@ def get_pools(session):
 
 total_entries = session.query(func.count(Pool.id)).scalar()
 pools = 
session.query(Pool).order_by(Pool.id).offset(offset).limit(limit).all()
-return pool_collection_schema.dump(PoolCollection(pools=pools, 
total_entries=total_entries)).data
+return pool_collection_schema.dump(
+PoolCollection(pools=pools, total_entries=total_entries)
+).data
 
 
-def patch_pool():
+@provide_session
+def patch_pool(pool_name, session, update_mask=None):
 """
 Update a pool
 """
-raise NotImplementedError("Not implemented yet.")
+# Only slots can be modified in 'default_pool'
+if pool_name == "default_pool":

Review comment:
   ```suggestion
   if pool_name == Pool.DEFAULT_POOL_NAME:
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on pull request #9476: Remove PATCH /dags/{dag_id}/dagRuns/{dag_run_id} endpoint

2020-06-23 Thread GitBox


mik-laj commented on pull request #9476:
URL: https://github.com/apache/airflow/pull/9476#issuecomment-648121471


   @jhtimmins  I can't find fields that can be modified.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on issue #8132: API Endpoints - Read-only - Task Instance

2020-06-23 Thread GitBox


mik-laj commented on issue #8132:
URL: https://github.com/apache/airflow/issues/8132#issuecomment-648126157


   @GMarkfjard Do you need help? This month I would like to finish all 
read-only operations. We only have a week of time left.  Do you have any 
problems? We managed to solve many problems while working on other end points, 
so if we get to know your change better, we will be able to help you. Creating 
a draft of your change would be helpful to us. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #9170: Read only endpoint for XCom #8134

2020-06-23 Thread GitBox


mik-laj commented on a change in pull request #9170:
URL: https://github.com/apache/airflow/pull/9170#discussion_r444212806



##
File path: airflow/api_connexion/schemas/xcom_schema.py
##
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import List, NamedTuple
+
+from marshmallow import Schema, fields
+from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field
+
+from airflow.models import XCom
+
+
+class XComCollectionItemSchema(SQLAlchemySchema):
+"""
+Schema for a xcom item
+"""
+
+class Meta:
+""" Meta """
+model = XCom
+
+key = auto_field()
+timestamp = auto_field()
+execution_date = auto_field()
+task_id = auto_field()
+dag_id = auto_field()
+
+
+class XComSchema(XComCollectionItemSchema):
+"""
+XCom schema
+"""
+
+value = auto_field()
+
+
+class XComCollection(NamedTuple):
+""" List of XComs with meta"""
+xcom_entries: List[XCom]
+total_entries: int
+
+
+class XComCollectionSchema(Schema):
+""" XCom Collection Schema"""
+xcom_entries = fields.List(fields.Nested(XComCollectionItemSchema))
+total_entries = fields.Int()
+
+
+xcom_schema = XComSchema()
+xcom_collection_item_schema = XComCollectionItemSchema()
+xcom_collection_schema = XComCollectionSchema()

Review comment:
   ```suggestion
   xcom_schema = XComSchema(strict=True)
   xcom_collection_item_schema = XComCollectionItemSchema(strict=True)
   xcom_collection_schema = XComCollectionSchema(strict=True)
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #9170: Read only endpoint for XCom #8134

2020-06-23 Thread GitBox


mik-laj commented on a change in pull request #9170:
URL: https://github.com/apache/airflow/pull/9170#discussion_r444212806



##
File path: airflow/api_connexion/schemas/xcom_schema.py
##
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from typing import List, NamedTuple
+
+from marshmallow import Schema, fields
+from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field
+
+from airflow.models import XCom
+
+
+class XComCollectionItemSchema(SQLAlchemySchema):
+"""
+Schema for a xcom item
+"""
+
+class Meta:
+""" Meta """
+model = XCom
+
+key = auto_field()
+timestamp = auto_field()
+execution_date = auto_field()
+task_id = auto_field()
+dag_id = auto_field()
+
+
+class XComSchema(XComCollectionItemSchema):
+"""
+XCom schema
+"""
+
+value = auto_field()
+
+
+class XComCollection(NamedTuple):
+""" List of XComs with meta"""
+xcom_entries: List[XCom]
+total_entries: int
+
+
+class XComCollectionSchema(Schema):
+""" XCom Collection Schema"""
+xcom_entries = fields.List(fields.Nested(XComCollectionItemSchema))
+total_entries = fields.Int()
+
+
+xcom_schema = XComSchema()
+xcom_collection_item_schema = XComCollectionItemSchema()
+xcom_collection_schema = XComCollectionSchema()

Review comment:
   ```suggestion
   xcom_schema = XComSchema(strict=True)
   xcom_collection_item_schema = XComCollectionItemSchema(strict=True)
   xcom_collection_schema = XComCollectionSchema(strict=True)
   ```
   This is the last thing. If you accept this, I will merge this change.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj closed issue #9108: API Endpoint - CRUD - Connection

2020-06-23 Thread GitBox


mik-laj closed issue #9108:
URL: https://github.com/apache/airflow/issues/9108


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj merged pull request #9266: Add CRUD endpoint for connections

2020-06-23 Thread GitBox


mik-laj merged pull request #9266:
URL: https://github.com/apache/airflow/pull/9266


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[airflow] branch master updated (9278857 -> 04a857d)

2020-06-23 Thread kamilbregula
This is an automated email from the ASF dual-hosted git repository.

kamilbregula pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git.


from 9278857  Fix typo in the word "default" in www/forms.py (#9480)
 add 04a857d  Add CRUD  endpoint for connections (#9266)

No new revisions were added by this update.

Summary of changes:
 .../api_connexion/endpoints/connection_endpoint.py |  76 --
 airflow/api_connexion/openapi/v1.yaml  |   3 -
 airflow/api_connexion/schemas/connection_schema.py |  16 +-
 .../endpoints/test_connection_endpoint.py  | 266 -
 4 files changed, 327 insertions(+), 34 deletions(-)



[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9330: Add read-only Task endpoint

2020-06-23 Thread GitBox


ephraimbuddy commented on a change in pull request #9330:
URL: https://github.com/apache/airflow/pull/9330#discussion_r444236641



##
File path: airflow/api_connexion/endpoints/task_endpoint.py
##
@@ -14,20 +14,36 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from flask import current_app
 
-# TODO(mik-laj): We have to implement it.
-# Do you want to help? Please look at: 
https://github.com/apache/airflow/issues/8138
+from airflow import DAG
+from airflow.api_connexion.exceptions import NotFound
+from airflow.api_connexion.schemas.task_schema import TaskCollection, 
task_collection_schema, task_schema
+from airflow.exceptions import TaskNotFound
 
 
-def get_task():
+def get_task(dag_id, task_id):
 """
 Get simplified representation of a task.
 """
-raise NotImplementedError("Not implemented yet.")
+dag: DAG = current_app.dag_bag.get_dag(dag_id)
+if not dag:
+raise NotFound("DAG not found")
 
+try:
+task = dag.get_task(task_id=task_id)
+except TaskNotFound:
+raise NotFound("Task not found")
+return task_schema.dump(task)
 
-def get_tasks():
+
+def get_tasks(dag_id):
 """
 Get tasks for DAG
 """
-raise NotImplementedError("Not implemented yet.")
+dag: DAG = current_app.dag_bag.get_dag(dag_id)
+if not dag:
+raise NotFound("DAG not found")
+tasks = dag.tasks
+task_collection = TaskCollection(tasks=tasks, total_entries=len(tasks))

Review comment:
   The` total_entries` is not actually the total of query result. In this 
case now, It is the total of all tasks in the db.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek commented on a change in pull request #9330: Add read-only Task endpoint

2020-06-23 Thread GitBox


turbaszek commented on a change in pull request #9330:
URL: https://github.com/apache/airflow/pull/9330#discussion_r444241727



##
File path: airflow/api_connexion/endpoints/task_endpoint.py
##
@@ -14,20 +14,36 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from flask import current_app
 
-# TODO(mik-laj): We have to implement it.
-# Do you want to help? Please look at: 
https://github.com/apache/airflow/issues/8138
+from airflow import DAG
+from airflow.api_connexion.exceptions import NotFound
+from airflow.api_connexion.schemas.task_schema import TaskCollection, 
task_collection_schema, task_schema
+from airflow.exceptions import TaskNotFound
 
 
-def get_task():
+def get_task(dag_id, task_id):
 """
 Get simplified representation of a task.
 """
-raise NotImplementedError("Not implemented yet.")
+dag: DAG = current_app.dag_bag.get_dag(dag_id)
+if not dag:
+raise NotFound("DAG not found")
 
+try:
+task = dag.get_task(task_id=task_id)
+except TaskNotFound:
+raise NotFound("Task not found")
+return task_schema.dump(task)
 
-def get_tasks():
+
+def get_tasks(dag_id):
 """
 Get tasks for DAG
 """
-raise NotImplementedError("Not implemented yet.")
+dag: DAG = current_app.dag_bag.get_dag(dag_id)
+if not dag:
+raise NotFound("DAG not found")
+tasks = dag.tasks
+task_collection = TaskCollection(tasks=tasks, total_entries=len(tasks))

Review comment:
   Hm, I think here we return only tasks from single dag, so if I correctly 
understand the `total_entries` should be equal to the number of tasks in the 
requested DAG





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] OmairK commented on a change in pull request #9329: Pool CRUD Endpoints

2020-06-23 Thread GitBox


OmairK commented on a change in pull request #9329:
URL: https://github.com/apache/airflow/pull/9329#discussion_r444242264



##
File path: airflow/api_connexion/endpoints/pool_endpoint.py
##
@@ -53,18 +59,59 @@ def get_pools(session):
 
 total_entries = session.query(func.count(Pool.id)).scalar()
 pools = 
session.query(Pool).order_by(Pool.id).offset(offset).limit(limit).all()
-return pool_collection_schema.dump(PoolCollection(pools=pools, 
total_entries=total_entries)).data
+return pool_collection_schema.dump(
+PoolCollection(pools=pools, total_entries=total_entries)
+).data
 
 
-def patch_pool():
+@provide_session
+def patch_pool(pool_name, session, update_mask=None):
 """
 Update a pool
 """
-raise NotImplementedError("Not implemented yet.")
+# Only slots can be modified in 'default_pool'
+if pool_name == "default_pool":

Review comment:
   Thanks, fixed ee31051





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] OmairK commented on pull request #9329: Pool CRUD Endpoints

2020-06-23 Thread GitBox


OmairK commented on pull request #9329:
URL: https://github.com/apache/airflow/pull/9329#issuecomment-648165716


   
   > ```
   > PATCH /api/v1/pools/default_pool?update_mask=name,slots
   > {
   > "name": "default_pool",
   > "slots": 80
   > }
   > ```
   > 
   > Will this request be handled correctly? The name has not been changed 
because it is the same all the time.
   
   @mik-laj 
   Thanks, fixed it, here is the change `ee31051`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] turbaszek opened a new pull request #9488: Improve queries number SchedulerJob._process_executor_events

2020-06-23 Thread GitBox


turbaszek opened a new pull request #9488:
URL: https://github.com/apache/airflow/pull/9488


   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [x] Description above provides context of the change
   - [x] Unit tests coverage for changes (not needed for documentation changes)
   - [x] Target Github ISSUE in description if exists
   - [x] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [x] Relevant documentation is updated including usage instructions.
   - [x] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ryw opened a new pull request #9489: Add more .mailmap entries

2020-06-23 Thread GitBox


ryw opened a new pull request #9489:
URL: https://github.com/apache/airflow/pull/9489


   ---
   Make sure to mark the boxes below before creating PR: [x]
   
   - [X] Description above provides context of the change
   - [X] Unit tests coverage for changes (not needed for documentation changes)
   - [X] Target Github ISSUE in description if exists
   - [X] Commits follow "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)"
   - [X] Relevant documentation is updated including usage instructions.
   - [X] I will engage committers as explained in [Contribution Workflow 
Example](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#contribution-workflow-example).
   
   ---
   In case of fundamental code change, Airflow Improvement Proposal 
([AIP](https://cwiki.apache.org/confluence/display/AIRFLOW/Airflow+Improvements+Proposals))
 is needed.
   In case of a new dependency, check compliance with the [ASF 3rd Party 
License Policy](https://www.apache.org/legal/resolved.html#category-x).
   In case of backwards incompatible changes please leave a note in 
[UPDATING.md](https://github.com/apache/airflow/blob/master/UPDATING.md).
   Read the [Pull Request 
Guidelines](https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst#pull-request-guidelines)
 for more information.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #9431: Move API page limit and offset parameters to views as kwargs Arguments

2020-06-23 Thread GitBox


mik-laj commented on a change in pull request #9431:
URL: https://github.com/apache/airflow/pull/9431#discussion_r444260539



##
File path: docs/index.rst
##
@@ -108,5 +109,5 @@ Content
 CLI 
 Macros 
 Python API <_api/index>
-REST API 
+EXPERIMENTAL REST API 

Review comment:
   ```suggestion
   Experimental REST API 
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] briandconnelly commented on pull request #3115: [AIRFLOW-2193] Add ROperator for using R

2020-06-23 Thread GitBox


briandconnelly commented on pull request #3115:
URL: https://github.com/apache/airflow/pull/3115#issuecomment-648190463


   @edgBR I have no plans to resume working on this. Using either 
`BashOperator` to run R code via `Rscript` or `DockerOperator` to run R code in 
a container works well.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-2193) R Language Operator

2020-06-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-2193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17142980#comment-17142980
 ] 

ASF GitHub Bot commented on AIRFLOW-2193:
-

briandconnelly commented on pull request #3115:
URL: https://github.com/apache/airflow/pull/3115#issuecomment-648190463


   @edgBR I have no plans to resume working on this. Using either 
`BashOperator` to run R code via `Rscript` or `DockerOperator` to run R code in 
a container works well.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> R Language Operator
> ---
>
> Key: AIRFLOW-2193
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2193
> Project: Apache Airflow
>  Issue Type: New Feature
>  Components: contrib
>Reporter: Brian Connelly
>Assignee: Brian Connelly
>Priority: Minor
>  Time Spent: 10m
>  Remaining Estimate: 0h
>
> Implement an operator that allows tasks to be completed using 
> [R|https://www.r-project.org/].
>  
> I've created an initial version and am currently putting together tests. Will 
> submit a PR.
>  
> see https://github.com/briandconnelly/incubator-airflow



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] mik-laj commented on a change in pull request #9431: Move API page limit and offset parameters to views as kwargs Arguments

2020-06-23 Thread GitBox


mik-laj commented on a change in pull request #9431:
URL: https://github.com/apache/airflow/pull/9431#discussion_r444269065



##
File path: docs/stable-rest-api/index.rst
##
@@ -0,0 +1,44 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+ ..   http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+
+REST API Documentation
+==
+
+Airflow stable REST API exposes a lot of endpoints for working with Apache 
Airflow.
+
+Configuration
+-
+There are default configurations available for airflow REST API which can be 
changed in
+``airflow.cfg``.
+
+.. note::
+For more information on setting the configuration, see 
:doc:`../howto/set-config`
+
+The default configurations for the REST API are as follows:
+
+.. list-table:: REST API CONFIGURATION DEFAULTS
+   :widths: 25, 25, 50
+   :header-rows: 1
+
+   * - Configuration
+ - Default
+ - Description
+
+   * - maximum_page_limit
+ - 100
+ - A positive integer, used to set the maximum page limit for API requests

Review comment:
   ```suggestion
   Airflow has a REST API that allows third-party application to perform a wide 
wide range of operations.
   
   Page size limit
   ---
   
   To protect against requests that may lead to application instability, the 
API has a limit of items in response. The default is 100 items, but you can 
change it using `maximum_page_limit`  option in `[api]` section in the 
`airflow.cfg` file. 
   
   .. note::
   For more information on setting the configuration, see 
:doc:`../howto/set-config`
   
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #9431: Move API page limit and offset parameters to views as kwargs Arguments

2020-06-23 Thread GitBox


mik-laj commented on a change in pull request #9431:
URL: https://github.com/apache/airflow/pull/9431#discussion_r444272559



##
File path: airflow/api_connexion/parameters.py
##
@@ -48,6 +46,21 @@ def format_datetime(value: str):
 )
 
 
+def check_limit(value: int):
+"""
+This checks the limit passed to view and raises BadRequest if
+limit exceed user configured value
+"""
+max_val = conf.getint("api", "maximum_page_limit")
+
+if value > max_val:

Review comment:
   ```
   Request messages for collections should define an int32 page_size field, 
allowing users to specify the maximum number of results to return.
   If the user does not specify page_size (or specifies 0), the API chooses an 
appropriate default, which the API should document.
   If the user specifies page_size greater than the maximum permitted by the 
API, the API should coerce down to the maximum permitted page size.
   If the user specifies a negative value for page_size, the API must send an 
INVALID_ARGUMENT error.
   The API may return fewer results than the number requested (including zero 
results), even if not at the end of the collection.
   ```
   https://google.aip.dev/158
   I looked more closely at how it implements Google. What do you think?  In my 
opinion this makes sense and it is behavior that is close to the previous one. 
Coerce down to the maximum permitted page size allows us to fetching the 
maximum number of items without checking the current application configuration.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj merged pull request #9478: Add link to ADC docs in use-alternative-secret-backend.rst

2020-06-23 Thread GitBox


mik-laj merged pull request #9478:
URL: https://github.com/apache/airflow/pull/9478


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[airflow] branch master updated: Add link to ADC in use-alternative-secrets-backend.rst (#9478)

2020-06-23 Thread kamilbregula
This is an automated email from the ASF dual-hosted git repository.

kamilbregula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
 new d02c12c  Add link to ADC in use-alternative-secrets-backend.rst (#9478)
d02c12c is described below

commit d02c12c60cddf0ab0780cbf9d2c67fcdd2fe60d2
Author: Kamil Breguła 
AuthorDate: Tue Jun 23 16:39:03 2020 +0200

Add link to ADC in use-alternative-secrets-backend.rst (#9478)
---
 docs/howto/use-alternative-secrets-backend.rst | 13 +
 1 file changed, 5 insertions(+), 8 deletions(-)

diff --git a/docs/howto/use-alternative-secrets-backend.rst 
b/docs/howto/use-alternative-secrets-backend.rst
index 04879ee..5ef2005 100644
--- a/docs/howto/use-alternative-secrets-backend.rst
+++ b/docs/howto/use-alternative-secrets-backend.rst
@@ -387,17 +387,14 @@ and if you want to retrieve both Variables and 
connections use the following sam
 backend_kwargs = {"connections_prefix": "airflow-connections", 
"variables_prefix": "airflow-variables", "sep": "-"}
 
 
-When ``gcp_key_path`` is not provided, it will use the Application Default 
Credentials in the current environment. You can set up the credentials with:
+When ``gcp_key_path`` is not provided, it will use the Application Default 
Credentials (ADC) to obtain credentials.
 
-.. code-block:: ini
+.. note::
 
-# 1. GOOGLE_APPLICATION_CREDENTIALS environment variable
-export GOOGLE_APPLICATION_CREDENTIALS=path/to/key-file.json
+For more information about the Application Default Credentials (ADC), see:
 
-# 2. Set with SDK
-gcloud auth application-default login
-# If the Cloud SDK has an active project, the project ID is returned. The 
active project can be set using:
-gcloud config set project
+  * `google.auth.default 
`__
+  * `Setting Up Authentication for Server to Server Production 
Applications `__
 
 The value of the Secrets Manager secret id must be the :ref:`connection URI 
representation `
 of the connection object.



[GitHub] [airflow] mik-laj commented on a change in pull request #9330: Add read-only Task endpoint

2020-06-23 Thread GitBox


mik-laj commented on a change in pull request #9330:
URL: https://github.com/apache/airflow/pull/9330#discussion_r444281700



##
File path: airflow/api_connexion/endpoints/task_endpoint.py
##
@@ -14,20 +14,36 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from flask import current_app
 
-# TODO(mik-laj): We have to implement it.
-# Do you want to help? Please look at: 
https://github.com/apache/airflow/issues/8138
+from airflow import DAG
+from airflow.api_connexion.exceptions import NotFound
+from airflow.api_connexion.schemas.task_schema import TaskCollection, 
task_collection_schema, task_schema
+from airflow.exceptions import TaskNotFound
 
 
-def get_task():
+def get_task(dag_id, task_id):
 """
 Get simplified representation of a task.
 """
-raise NotImplementedError("Not implemented yet.")
+dag: DAG = current_app.dag_bag.get_dag(dag_id)
+if not dag:
+raise NotFound("DAG not found")
 
+try:
+task = dag.get_task(task_id=task_id)
+except TaskNotFound:
+raise NotFound("Task not found")
+return task_schema.dump(task)
 
-def get_tasks():
+
+def get_tasks(dag_id):
 """
 Get tasks for DAG
 """
-raise NotImplementedError("Not implemented yet.")
+dag: DAG = current_app.dag_bag.get_dag(dag_id)
+if not dag:
+raise NotFound("DAG not found")
+tasks = dag.tasks
+task_collection = TaskCollection(tasks=tasks, total_entries=len(tasks))

Review comment:
   This endpoint has no pagination, because all objects must be loaded to 
be able to return a response. We don't use the database here, so we can't 
optimize it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #9330: Add read-only Task endpoint

2020-06-23 Thread GitBox


mik-laj commented on a change in pull request #9330:
URL: https://github.com/apache/airflow/pull/9330#discussion_r444282120



##
File path: airflow/api_connexion/openapi/v1.yaml
##
@@ -1964,13 +1972,14 @@ components:
 __type: {type: string}
 days: {type: integer}
 seconds: {type: integer}
-microsecond: {type: integer}
+microseconds: {type: integer}
 
 RelativeDelta:
   # TODO: Why we need these fields?
   type: object
   required:
 - __type
+- years

Review comment:
   Should we add other fields also?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #9330: Add read-only Task endpoint

2020-06-23 Thread GitBox


mik-laj commented on a change in pull request #9330:
URL: https://github.com/apache/airflow/pull/9330#discussion_r444283015



##
File path: airflow/api_connexion/openapi/v1.yaml
##
@@ -1964,13 +1972,14 @@ components:
 __type: {type: string}

Review comment:
   Should add other fields to required also?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #9330: Add read-only Task endpoint

2020-06-23 Thread GitBox


mik-laj commented on a change in pull request #9330:
URL: https://github.com/apache/airflow/pull/9330#discussion_r444285880



##
File path: tests/api_connexion/endpoints/test_dag_endpoint.py
##
@@ -14,35 +14,109 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import os
 import unittest
+from datetime import datetime
 
 import pytest
 
+from airflow import DAG
+from airflow.models import DagBag
+from airflow.models.serialized_dag import SerializedDagModel
+from airflow.operators.dummy_operator import DummyOperator
 from airflow.www import app
+from tests.test_utils.db import clear_db_dags, clear_db_runs, 
clear_db_serialized_dags
 
 
 class TestDagEndpoint(unittest.TestCase):
+dag_id = "test_dag"
+task_id = "op1"
+
+@staticmethod
+def clean_db():
+clear_db_runs()
+clear_db_dags()
+clear_db_serialized_dags()
+
 @classmethod
 def setUpClass(cls) -> None:
 super().setUpClass()
 cls.app = app.create_app(testing=True)  # type:ignore
+cls.app_serialized = app.create_app(testing=True)  # type:ignore
+
+with DAG(
+cls.dag_id, start_date=datetime(2020, 6, 15), doc_md="details"
+) as dag:
+DummyOperator(task_id=cls.task_id)
+
+cls.dag = dag  # type:ignore
+
+dag_bag = DagBag(os.devnull, include_examples=False)
+dag_bag.dags = {dag.dag_id: dag}
+cls.app.dag_bag = dag_bag  # type:ignore
+
+dag_bag = DagBag(os.devnull, include_examples=False, 
store_serialized_dags=True)
+cls.app_serialized.dag_bag = dag_bag  # type:ignore
 
 def setUp(self) -> None:
+self.clean_db()
 self.client = self.app.test_client()  # type:ignore
+self.client_serialized = self.app_serialized.test_client()  # 
type:ignore

Review comment:
   Maybe just give up this base class and divide the whole into two classes?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #9330: Add read-only Task endpoint

2020-06-23 Thread GitBox


mik-laj commented on a change in pull request #9330:
URL: https://github.com/apache/airflow/pull/9330#discussion_r444288253



##
File path: tests/cli/commands/test_dag_command.py
##
@@ -59,8 +60,14 @@ class TestCliDags(unittest.TestCase):
 @classmethod
 def setUpClass(cls):
 cls.dagbag = DagBag(include_examples=True)
+DAG.bulk_sync_to_db([d[1] for d in cls.dagbag.dags.items()])

Review comment:
   ```suggestion
   DAG.bulk_sync_to_db([d for d in cls.dagbag.dags.values()])
   ```
   or
   ```suggestion
  cls.dagbag.sync_to_db()
   ```
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] zikun commented on issue #8136: API Endpoint - Config

2020-06-23 Thread GitBox


zikun commented on issue #8136:
URL: https://github.com/apache/airflow/issues/8136#issuecomment-648215571


   I would like to pick it up if there is no update



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on a change in pull request #9331: add log endpoint

2020-06-23 Thread GitBox


mik-laj commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r444295989



##
File path: tests/api_connexion/endpoints/test_log_endpoint.py
##
@@ -14,25 +14,254 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import copy
+import logging.config
+import os
+import shutil
+import sys
+import tempfile
 import unittest
+from unittest import mock
+from unittest.mock import PropertyMock
 
-import pytest
+from itsdangerous.url_safe import URLSafeSerializer
 
+from airflow import DAG, settings
+from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONFIG
+from airflow.models import DagRun, TaskInstance
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils import timezone
+from airflow.utils.session import create_session, provide_session
+from airflow.utils.types import DagRunType
 from airflow.www import app
+from tests.test_utils.config import conf_vars
+from tests.test_utils.db import clear_db_runs
 
 
 class TestGetLog(unittest.TestCase):
+DAG_ID = 'dag_for_testing_log_endpoint'
+DAG_ID_REMOVED = 'removed_dag_for_testing_log_endpoint'
+TASK_ID = 'task_for_testing_log_endpoint'
+TRY_NUMBER = 1
+
 @classmethod
-def setUpClass(cls) -> None:
-super().setUpClass()
-cls.app = app.create_app(testing=True)  # type:ignore
+def setUpClass(cls):
+settings.configure_orm()
+cls.session = settings.Session
+cls.app = app.create_app(testing=True)
 
 def setUp(self) -> None:
-self.client = self.app.test_client()  # type:ignore
+self.default_time = "2020-06-10T20:00:00+00:00"
+self.client = self.app.test_client()
+self.log_dir = tempfile.mkdtemp()
+# Make sure that the configure_logging is not cached
+self.old_modules = dict(sys.modules)
+self._prepare_log_files()
+self._configure_loggers()
+self._prepare_db()
+
+def _create_dagrun(self, session):
+dagrun_model = DagRun(
+dag_id=self.DAG_ID,
+run_id='TEST_DAG_RUN_ID',
+run_type=DagRunType.MANUAL.value,
+execution_date=timezone.parse(self.default_time),
+start_date=timezone.parse(self.default_time),
+external_trigger=True,
+)
+session.add(dagrun_model)
+session.commit()
+
+def _configure_loggers(self):
+# Create a custom logging configuration
+logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
+logging_config['handlers']['task']['base_log_folder'] = self.log_dir
+
+logging_config['handlers']['task']['filename_template'] = \
+'{{ ti.dag_id }}/{{ ti.task_id }}/' \
+'{{ ts | replace(":", ".") }}/{{ try_number }}.log'
+
+# Write the custom logging configuration to a file
+self.settings_folder = tempfile.mkdtemp()
+settings_file = os.path.join(self.settings_folder, 
"airflow_local_settings.py")
+new_logging_file = "LOGGING_CONFIG = {}".format(logging_config)
+with open(settings_file, 'w') as handle:
+handle.writelines(new_logging_file)
+sys.path.append(self.settings_folder)
+
+with conf_vars({('logging', 'logging_config_class'): 
'airflow_local_settings.LOGGING_CONFIG'}):
+self.app = app.create_app(testing=True)
+self.client = self.app.test_client()
+settings.configure_logging()
+
+def _prepare_db(self):
+dagbag = self.app.dag_bag  # pylint: disable=no-member
+dag = DAG(self.DAG_ID, start_date=timezone.parse(self.default_time))
+dag.sync_to_db()
+dag_removed = DAG(self.DAG_ID_REMOVED, 
start_date=timezone.parse(self.default_time))
+dag_removed.sync_to_db()
+dagbag.bag_dag(dag, parent_dag=dag, root_dag=dag)
+with create_session() as session:
+self.ti = TaskInstance(
+task=DummyOperator(task_id=self.TASK_ID, dag=dag),
+execution_date=timezone.parse(self.default_time)
+)
+self.ti.try_number = 1
+self.ti_removed_dag = TaskInstance(
+task=DummyOperator(task_id=self.TASK_ID, dag=dag_removed),
+execution_date=timezone.parse(self.default_time)
+)
+self.ti_removed_dag.try_number = 1
+
+session.merge(self.ti)
+session.merge(self.ti_removed_dag)
+
+def _prepare_log_files(self):
+dir_path = f"{self.log_dir}/{self.DAG_ID}/{self.TASK_ID}/" \
+   f"{self.default_time.replace(':', '.')}/"
+os.makedirs(dir_path)
+with open(f"{dir_path}/1.log", "w+") as file:
+file.write("Log for testing.")
+file.flush()
 
-@pytest.mark.skip(reason="Not implemented yet")
-def test_should_response_200(self

[GitHub] [airflow] mik-laj commented on a change in pull request #9331: add log endpoint

2020-06-23 Thread GitBox


mik-laj commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r444296241



##
File path: tests/api_connexion/endpoints/test_log_endpoint.py
##
@@ -14,25 +14,254 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import copy
+import logging.config
+import os
+import shutil
+import sys
+import tempfile
 import unittest
+from unittest import mock
+from unittest.mock import PropertyMock
 
-import pytest
+from itsdangerous.url_safe import URLSafeSerializer
 
+from airflow import DAG, settings
+from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONFIG
+from airflow.models import DagRun, TaskInstance
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils import timezone
+from airflow.utils.session import create_session, provide_session
+from airflow.utils.types import DagRunType
 from airflow.www import app
+from tests.test_utils.config import conf_vars
+from tests.test_utils.db import clear_db_runs
 
 
 class TestGetLog(unittest.TestCase):
+DAG_ID = 'dag_for_testing_log_endpoint'
+DAG_ID_REMOVED = 'removed_dag_for_testing_log_endpoint'
+TASK_ID = 'task_for_testing_log_endpoint'
+TRY_NUMBER = 1
+
 @classmethod
-def setUpClass(cls) -> None:
-super().setUpClass()
-cls.app = app.create_app(testing=True)  # type:ignore
+def setUpClass(cls):
+settings.configure_orm()
+cls.session = settings.Session
+cls.app = app.create_app(testing=True)
 
 def setUp(self) -> None:
-self.client = self.app.test_client()  # type:ignore
+self.default_time = "2020-06-10T20:00:00+00:00"
+self.client = self.app.test_client()
+self.log_dir = tempfile.mkdtemp()
+# Make sure that the configure_logging is not cached
+self.old_modules = dict(sys.modules)
+self._prepare_log_files()
+self._configure_loggers()
+self._prepare_db()
+
+def _create_dagrun(self, session):
+dagrun_model = DagRun(
+dag_id=self.DAG_ID,
+run_id='TEST_DAG_RUN_ID',
+run_type=DagRunType.MANUAL.value,
+execution_date=timezone.parse(self.default_time),
+start_date=timezone.parse(self.default_time),
+external_trigger=True,
+)
+session.add(dagrun_model)
+session.commit()
+
+def _configure_loggers(self):
+# Create a custom logging configuration
+logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
+logging_config['handlers']['task']['base_log_folder'] = self.log_dir
+
+logging_config['handlers']['task']['filename_template'] = \
+'{{ ti.dag_id }}/{{ ti.task_id }}/' \
+'{{ ts | replace(":", ".") }}/{{ try_number }}.log'
+
+# Write the custom logging configuration to a file
+self.settings_folder = tempfile.mkdtemp()
+settings_file = os.path.join(self.settings_folder, 
"airflow_local_settings.py")
+new_logging_file = "LOGGING_CONFIG = {}".format(logging_config)
+with open(settings_file, 'w') as handle:
+handle.writelines(new_logging_file)
+sys.path.append(self.settings_folder)
+
+with conf_vars({('logging', 'logging_config_class'): 
'airflow_local_settings.LOGGING_CONFIG'}):
+self.app = app.create_app(testing=True)
+self.client = self.app.test_client()
+settings.configure_logging()
+
+def _prepare_db(self):
+dagbag = self.app.dag_bag  # pylint: disable=no-member
+dag = DAG(self.DAG_ID, start_date=timezone.parse(self.default_time))
+dag.sync_to_db()
+dag_removed = DAG(self.DAG_ID_REMOVED, 
start_date=timezone.parse(self.default_time))
+dag_removed.sync_to_db()
+dagbag.bag_dag(dag, parent_dag=dag, root_dag=dag)
+with create_session() as session:
+self.ti = TaskInstance(
+task=DummyOperator(task_id=self.TASK_ID, dag=dag),
+execution_date=timezone.parse(self.default_time)
+)
+self.ti.try_number = 1
+self.ti_removed_dag = TaskInstance(
+task=DummyOperator(task_id=self.TASK_ID, dag=dag_removed),
+execution_date=timezone.parse(self.default_time)
+)
+self.ti_removed_dag.try_number = 1
+
+session.merge(self.ti)
+session.merge(self.ti_removed_dag)
+
+def _prepare_log_files(self):
+dir_path = f"{self.log_dir}/{self.DAG_ID}/{self.TASK_ID}/" \
+   f"{self.default_time.replace(':', '.')}/"
+os.makedirs(dir_path)
+with open(f"{dir_path}/1.log", "w+") as file:
+file.write("Log for testing.")
+file.flush()
 
-@pytest.mark.skip(reason="Not implemented yet")
-def test_should_response_200(self

[GitHub] [airflow] mik-laj commented on a change in pull request #9331: add log endpoint

2020-06-23 Thread GitBox


mik-laj commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r444296587



##
File path: tests/api_connexion/endpoints/test_log_endpoint.py
##
@@ -14,25 +14,254 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import copy
+import logging.config
+import os
+import shutil
+import sys
+import tempfile
 import unittest
+from unittest import mock
+from unittest.mock import PropertyMock
 
-import pytest
+from itsdangerous.url_safe import URLSafeSerializer
 
+from airflow import DAG, settings
+from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONFIG
+from airflow.models import DagRun, TaskInstance
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils import timezone
+from airflow.utils.session import create_session, provide_session
+from airflow.utils.types import DagRunType
 from airflow.www import app
+from tests.test_utils.config import conf_vars
+from tests.test_utils.db import clear_db_runs
 
 
 class TestGetLog(unittest.TestCase):
+DAG_ID = 'dag_for_testing_log_endpoint'
+DAG_ID_REMOVED = 'removed_dag_for_testing_log_endpoint'
+TASK_ID = 'task_for_testing_log_endpoint'
+TRY_NUMBER = 1
+
 @classmethod
-def setUpClass(cls) -> None:
-super().setUpClass()
-cls.app = app.create_app(testing=True)  # type:ignore
+def setUpClass(cls):
+settings.configure_orm()
+cls.session = settings.Session
+cls.app = app.create_app(testing=True)
 
 def setUp(self) -> None:
-self.client = self.app.test_client()  # type:ignore
+self.default_time = "2020-06-10T20:00:00+00:00"
+self.client = self.app.test_client()
+self.log_dir = tempfile.mkdtemp()
+# Make sure that the configure_logging is not cached
+self.old_modules = dict(sys.modules)
+self._prepare_log_files()
+self._configure_loggers()
+self._prepare_db()
+
+def _create_dagrun(self, session):
+dagrun_model = DagRun(
+dag_id=self.DAG_ID,
+run_id='TEST_DAG_RUN_ID',
+run_type=DagRunType.MANUAL.value,
+execution_date=timezone.parse(self.default_time),
+start_date=timezone.parse(self.default_time),
+external_trigger=True,
+)
+session.add(dagrun_model)
+session.commit()
+
+def _configure_loggers(self):
+# Create a custom logging configuration
+logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
+logging_config['handlers']['task']['base_log_folder'] = self.log_dir
+
+logging_config['handlers']['task']['filename_template'] = \
+'{{ ti.dag_id }}/{{ ti.task_id }}/' \
+'{{ ts | replace(":", ".") }}/{{ try_number }}.log'
+
+# Write the custom logging configuration to a file
+self.settings_folder = tempfile.mkdtemp()
+settings_file = os.path.join(self.settings_folder, 
"airflow_local_settings.py")
+new_logging_file = "LOGGING_CONFIG = {}".format(logging_config)
+with open(settings_file, 'w') as handle:
+handle.writelines(new_logging_file)
+sys.path.append(self.settings_folder)
+
+with conf_vars({('logging', 'logging_config_class'): 
'airflow_local_settings.LOGGING_CONFIG'}):
+self.app = app.create_app(testing=True)
+self.client = self.app.test_client()
+settings.configure_logging()
+
+def _prepare_db(self):
+dagbag = self.app.dag_bag  # pylint: disable=no-member
+dag = DAG(self.DAG_ID, start_date=timezone.parse(self.default_time))
+dag.sync_to_db()
+dag_removed = DAG(self.DAG_ID_REMOVED, 
start_date=timezone.parse(self.default_time))
+dag_removed.sync_to_db()
+dagbag.bag_dag(dag, parent_dag=dag, root_dag=dag)
+with create_session() as session:
+self.ti = TaskInstance(
+task=DummyOperator(task_id=self.TASK_ID, dag=dag),
+execution_date=timezone.parse(self.default_time)
+)
+self.ti.try_number = 1
+self.ti_removed_dag = TaskInstance(
+task=DummyOperator(task_id=self.TASK_ID, dag=dag_removed),
+execution_date=timezone.parse(self.default_time)
+)
+self.ti_removed_dag.try_number = 1
+
+session.merge(self.ti)
+session.merge(self.ti_removed_dag)
+
+def _prepare_log_files(self):
+dir_path = f"{self.log_dir}/{self.DAG_ID}/{self.TASK_ID}/" \
+   f"{self.default_time.replace(':', '.')}/"
+os.makedirs(dir_path)
+with open(f"{dir_path}/1.log", "w+") as file:
+file.write("Log for testing.")
+file.flush()
 
-@pytest.mark.skip(reason="Not implemented yet")
-def test_should_response_200(self

[GitHub] [airflow] mik-laj commented on a change in pull request #9329: Pool CRUD Endpoints

2020-06-23 Thread GitBox


mik-laj commented on a change in pull request #9329:
URL: https://github.com/apache/airflow/pull/9329#discussion_r444299754



##
File path: airflow/api_connexion/endpoints/pool_endpoint.py
##
@@ -53,18 +59,59 @@ def get_pools(session):
 
 total_entries = session.query(func.count(Pool.id)).scalar()
 pools = 
session.query(Pool).order_by(Pool.id).offset(offset).limit(limit).all()
-return pool_collection_schema.dump(PoolCollection(pools=pools, 
total_entries=total_entries)).data
+return pool_collection_schema.dump(
+PoolCollection(pools=pools, total_entries=total_entries)
+).data
 
 
-def patch_pool():
+@provide_session
+def patch_pool(pool_name, session, update_mask=None):
 """
 Update a pool
 """
-raise NotImplementedError("Not implemented yet.")
+# Only slots can be modified in 'default_pool'
+if pool_name == Pool.DEFAULT_POOL_NAME and request.json["name"] != 
Pool.DEFAULT_POOL_NAME:

Review comment:
   Is name  field required in the body now?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] aneesh-joseph commented on pull request #8777: Add Production Helm chart support

2020-06-23 Thread GitBox


aneesh-joseph commented on pull request #8777:
URL: https://github.com/apache/airflow/pull/8777#issuecomment-648229681


   @Siddharthk , for `3`, I added the below config to the `airflow.cfg` 
`kubernetes` section of the configmap
   
   ```
   run_as_user = {{ .Values.uid }}
   fs_group = {{ .Values.gid }}
   ```
   
   so that it runs as  the correct airflow user
   
   Have added this to the config map  in a PR
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] OmairK commented on a change in pull request #9473: [WIP] Dag Runs CRUD endpoints

2020-06-23 Thread GitBox


OmairK commented on a change in pull request #9473:
URL: https://github.com/apache/airflow/pull/9473#discussion_r444302962



##
File path: airflow/api_connexion/endpoints/dag_run_endpoint.py
##
@@ -14,23 +14,28 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+from flask import request
+from connexion import NoContent
+from sqlalchemy import and_, func 
 
-from sqlalchemy import func
-
-from airflow.api_connexion.exceptions import NotFound
+from airflow.api_connexion.exceptions import NotFound, AlreadyExists
 from airflow.api_connexion.schemas.dag_run_schema import (
 DAGRunCollection, dagrun_collection_schema, dagrun_schema,
 )
 from airflow.api_connexion.utils import conn_parse_datetime
-from airflow.models import DagRun
+from airflow.models import DagRun, DagModel
 from airflow.utils.session import provide_session
+from airflow.utils.types import DagRunType
 
 
-def delete_dag_run():
+@provide_session
+def delete_dag_run(dag_id, dag_run_id, session):
 """
 Delete a DAG Run
 """
-raise NotImplementedError("Not implemented yet.")
+if session.query(DagRun).filter(and_(DagRun.dag_id == dag_id, 
DagRun.run_id == dag_run_id)).delete() == 0:
+raise NotFound("DAGRun not found")

Review comment:
   Thanks fixed `5e70d02 `





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] OmairK commented on a change in pull request #9473: [WIP] Dag Runs CRUD endpoints

2020-06-23 Thread GitBox


OmairK commented on a change in pull request #9473:
URL: https://github.com/apache/airflow/pull/9473#discussion_r444303716



##
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##
@@ -354,7 +364,17 @@ def test_should_response_200(self):
 
 
 class TestPostDagRun(TestDagRunEndpoint):
-@pytest.mark.skip(reason="Not implemented yet")
-def test_should_response_200(self):
-response = self.client.post("/dags/TEST_DAG_ID/dagRuns")
+@provide_session
+def test_should_response_200(self, session):
+dag_instance = DagModel(dag_id="TEST_DAG_ID")
+session.add(dag_instance)
+session.commit()
+response = self.client.post("api/v1/dags/TEST_DAG_ID/dagRuns", 
json={"dag_run_id": "TEST_DAG_RUN", "execution_date": self.default_time, 
"state": "failed"})
 assert response.status_code == 200
+
+def test_response_404(self):
+response = self.client.post("api/v1/dags/TEST_DAG_ID/dagRuns", 
json={"dag_run_id": "TEST_DAG_RUN", "execution_date": self.default_time, 
"state": "failed"})
+assert response.status_code == 404
+self.assertEqual(
+{'detail': None, 'status': 404, 'title': 'DAG with dag_id: 
TEST_DAG_ID not found', 'type': 'about:blank'}, response.json
+)

Review comment:
   Fixed thanks





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ryw commented on pull request #8777: Add Production Helm chart support

2020-06-23 Thread GitBox


ryw commented on pull request #8777:
URL: https://github.com/apache/airflow/pull/8777#issuecomment-648233932


   @schnie see a few questions from @Siddharthk (was chatting w/ him in Airflow 
Slack)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj edited a comment on issue #8136: API Endpoint - Config

2020-06-23 Thread GitBox


mik-laj edited a comment on issue #8136:
URL: https://github.com/apache/airflow/issues/8136#issuecomment-648236093


   @zikun I assigned you to this ticket. I look forward to your contribution.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj commented on issue #8136: API Endpoint - Config

2020-06-23 Thread GitBox


mik-laj commented on issue #8136:
URL: https://github.com/apache/airflow/issues/8136#issuecomment-648236093


   @zikun I assigned you to this ticket?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] OmairK commented on a change in pull request #9473: [WIP] Dag Runs CRUD endpoints

2020-06-23 Thread GitBox


OmairK commented on a change in pull request #9473:
URL: https://github.com/apache/airflow/pull/9473#discussion_r444312038



##
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##
@@ -76,10 +78,18 @@ def _create_test_dag_run(self, state='running', 
extra_dag=False):
 
 
 class TestDeleteDagRun(TestDagRunEndpoint):
-@pytest.mark.skip(reason="Not implemented yet")
-def test_should_response_200(self):
-response = 
self.client.delete("api/v1/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
+@provide_session
+def test_should_response_200(self, session):
+session.add_all(self._create_test_dag_run())
+session.commit()
+response = 
self.client.delete("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID_1")
 assert response.status_code == 204
+response = 
self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID_1")
+self.assertEqual(response.status_code, 404)

Review comment:
   Fixed f0f9256





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] OmairK commented on a change in pull request #9473: [WIP] Dag Runs CRUD endpoints

2020-06-23 Thread GitBox


OmairK commented on a change in pull request #9473:
URL: https://github.com/apache/airflow/pull/9473#discussion_r444311320



##
File path: tests/api_connexion/endpoints/test_dag_run_endpoint.py
##
@@ -76,10 +78,18 @@ def _create_test_dag_run(self, state='running', 
extra_dag=False):
 
 
 class TestDeleteDagRun(TestDagRunEndpoint):
-@pytest.mark.skip(reason="Not implemented yet")
-def test_should_response_200(self):
-response = 
self.client.delete("api/v1/dags/TEST_DAG_ID}/dagRuns/TEST_DAG_RUN_ID")
+@provide_session
+def test_should_response_200(self, session):
+session.add_all(self._create_test_dag_run())
+session.commit()
+response = 
self.client.delete("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID_1")
 assert response.status_code == 204
+response = 
self.client.get("api/v1/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID_1")
+self.assertEqual(response.status_code, 404)
+
+def test_should_response_404(self):
+response = 
self.client.delete("api/v1/dags/INVALID_DAG_RUN/dagRuns/INVALID_DAG_RUN")
+self.assertEqual(response.status_code, 404)

Review comment:
   Fixed thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] aneesh-joseph edited a comment on pull request #8777: Add Production Helm chart support

2020-06-23 Thread GitBox


aneesh-joseph edited a comment on pull request #8777:
URL: https://github.com/apache/airflow/pull/8777#issuecomment-648229681


   @Siddharthk , for `3`, I added the below config to the `airflow.cfg` 
`kubernetes` section of the configmap
   
   ```
   run_as_user = {{ .Values.uid }}
   fs_group = {{ .Values.gid }}
   ```
   
   so that it runs as  the correct airflow user
   
   Have added this to the config map  in a 
[PR](https://github.com/apache/airflow/pull/9371)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] OmairK commented on a change in pull request #9329: Pool CRUD Endpoints

2020-06-23 Thread GitBox


OmairK commented on a change in pull request #9329:
URL: https://github.com/apache/airflow/pull/9329#discussion_r444315429



##
File path: airflow/api_connexion/endpoints/pool_endpoint.py
##
@@ -53,18 +59,59 @@ def get_pools(session):
 
 total_entries = session.query(func.count(Pool.id)).scalar()
 pools = 
session.query(Pool).order_by(Pool.id).offset(offset).limit(limit).all()
-return pool_collection_schema.dump(PoolCollection(pools=pools, 
total_entries=total_entries)).data
+return pool_collection_schema.dump(
+PoolCollection(pools=pools, total_entries=total_entries)
+).data
 
 
-def patch_pool():
+@provide_session
+def patch_pool(pool_name, session, update_mask=None):
 """
 Update a pool
 """
-raise NotImplementedError("Not implemented yet.")
+# Only slots can be modified in 'default_pool'
+if pool_name == Pool.DEFAULT_POOL_NAME and request.json["name"] != 
Pool.DEFAULT_POOL_NAME:

Review comment:
   Yes I changed it to a required field as it should always be present in 
the body. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] kaxil merged pull request #9489: Add more .mailmap entries

2020-06-23 Thread GitBox


kaxil merged pull request #9489:
URL: https://github.com/apache/airflow/pull/9489


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[airflow] branch master updated: Add more .mailmap entries (#9489)

2020-06-23 Thread kaxilnaik
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
 new 4165a54  Add more .mailmap entries (#9489)
4165a54 is described below

commit 4165a54522be9131f2268ad9432ce58ff507a0a3
Author: Ry Walker <4283+...@users.noreply.github.com>
AuthorDate: Tue Jun 23 11:40:43 2020 -0400

Add more .mailmap entries (#9489)
---
 .mailmap | 6 ++
 1 file changed, 6 insertions(+)

diff --git a/.mailmap b/.mailmap
index ba3b9fc..3a78129 100644
--- a/.mailmap
+++ b/.mailmap
@@ -15,6 +15,7 @@
 # specific language governing permissions and limitations
 # under the License.
 
+Anita Fronczak  
 Antony Mayi 
 Arthur Wiedmer  
 Arthur Wiedmer  
@@ -29,12 +30,15 @@ Daniel Imberman  

 Daniel Imberman  
 Daniel Standish 
 Dependabot [bot] <49699333+dependabot[bot]@users.noreply.github.com>
+Ephraim Anierobi  
<4122866+ephraimbu...@users.noreply.github.com>
 Felix Uellendall  

 Feng Lu  
 Fokko Driesprong  
 Fokko Driesprong  
 Gerard Toonstra 
 Greg Neiheisel 
+Hossein Torabi  
+James Timmins  
 Jarek Potiuk  
 Jeremiah Lowin 
 Jeremiah Lowin  
@@ -54,6 +58,7 @@ Kaxil Naik  
 Kousuke Saruta 
 Kousuke Saruta  
 Marcin Szymański 
+Matthew Bruce  
 Maxime Beauchemin  

 Maxime Beauchemin  

 Maxime Beauchemin  
@@ -68,6 +73,7 @@ Niels Zeilemaker  

 Nikolay Kolev 
 Peng Chen  <348707...@qq.com>
 Pradeep Bhadani 
+Rafael Bottega  
 Sergio Kefalas 
 Sevak Avetisyan 
 Sid Anand  



[GitHub] [airflow] vuppalli commented on issue #9418: Deprecated AI Platform Operators and Runtimes in Example DAG

2020-06-23 Thread GitBox


vuppalli commented on issue #9418:
URL: https://github.com/apache/airflow/issues/9418#issuecomment-648269805


   Thanks so much! I have a couple of quick questions: when running the DAG, I 
noticed that I do not have access to the GCP resources used in the file. Would 
it be possible to get added to the project so that I can run the DAG without 
making any changes successfully? Or, is there a place where I can access all of 
these resources for myself? Additionally, is there an official test file to 
confirm that the DAG works?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] marclamberti commented on pull request #8777: Add Production Helm chart support

2020-06-23 Thread GitBox


marclamberti commented on pull request #8777:
URL: https://github.com/apache/airflow/pull/8777#issuecomment-648276874


   @Siddharthk ,
   1 / It's not a bug, it's the way Kubernetes works. If you update your Helm 
chart with a new image tag, then Kubernetes will update your app to keep the 
state you desire. Take a look at the document right here: 
https://kubernetes.io/docs/tutorials/kubernetes-basics/update/update-intro/. 
   At the bottom you can read: 
   > Similar to application Scaling, if a Deployment is exposed publicly, the 
Service will load-balance the traffic only to available Pods during the update. 
   In case your Airflow instance is running in dev/staging, that should not be 
a problem as it may be ok to have a little downtime between updates. In case 
you are in production, you should have your webserver in HA (highly available), 
and so, replicas running with a load balancer in front of them. During the 
rolling update, while one pod is being updated, the others are still accessible 
as well as the Airflow UI.
   
   2/ I don't think there is a recommended way to store logs as it depends on 
your environment and requirements. For instance, I store my logs in S3. S3 is 
cheap and reliable. I can process the logs later with other tools to make 
analysis if needed and I can archive them at a defined time. 3
   
   3/ About your error, I got it once with the "unofficial" chart. Sadly I 
don't remember how I fixed it. Check with kubectl describe your pod if you 
don't get additional information and keep your workers after completion so that 
you can debug them.
   
   Hope it helps :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] boring-cyborg[bot] commented on pull request #9409: Add extra options for slack webhook operator and slack hook

2020-06-23 Thread GitBox


boring-cyborg[bot] commented on pull request #9409:
URL: https://github.com/apache/airflow/pull/9409#issuecomment-648279148


   Awesome work, congrats on your first merged pull request!
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] dimberman merged pull request #9409: Add extra options for slack webhook operator and slack hook

2020-06-23 Thread GitBox


dimberman merged pull request #9409:
URL: https://github.com/apache/airflow/pull/9409


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[airflow] branch v1-10-test updated: Add extra options to fix SSL issue and be more flexible (#9409)

2020-06-23 Thread dimberman
This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v1-10-test by this push:
 new 33973ab  Add extra options to fix SSL issue and be more flexible 
(#9409)
33973ab is described below

commit 33973abb0e931f57dbcfdbd3d2bebb396465799e
Author: Ben Chen 
AuthorDate: Tue Jun 23 18:38:22 2020 +0200

Add extra options to fix SSL issue and be more flexible (#9409)
---
 airflow/contrib/hooks/slack_webhook_hook.py| 10 +++---
 airflow/contrib/operators/slack_webhook_operator.py|  9 +++--
 tests/contrib/hooks/test_slack_webhook_hook.py |  3 ++-
 tests/contrib/operators/test_slack_webhook_operator.py |  6 --
 4 files changed, 20 insertions(+), 8 deletions(-)

diff --git a/airflow/contrib/hooks/slack_webhook_hook.py 
b/airflow/contrib/hooks/slack_webhook_hook.py
index f3817b3..001973a 100644
--- a/airflow/contrib/hooks/slack_webhook_hook.py
+++ b/airflow/contrib/hooks/slack_webhook_hook.py
@@ -58,6 +58,8 @@ class SlackWebhookHook(HttpHook):
 :type link_names: bool
 :param proxy: Proxy to use to make the Slack webhook call
 :type proxy: str
+:param extra_options: Extra options for http hook
+:type extra_options: dict
 """
 
 def __init__(self,
@@ -72,6 +74,7 @@ class SlackWebhookHook(HttpHook):
  icon_url=None,
  link_names=False,
  proxy=None,
+ extra_options=None,
  *args,
  **kwargs
  ):
@@ -86,6 +89,7 @@ class SlackWebhookHook(HttpHook):
 self.icon_url = icon_url
 self.link_names = link_names
 self.proxy = proxy
+self.extra_options = extra_options or {}
 
 def _get_token(self, token, http_conn_id):
 """
@@ -140,13 +144,13 @@ class SlackWebhookHook(HttpHook):
 """
 Remote Popen (actually execute the slack webhook call)
 """
-proxies = {}
+
 if self.proxy:
 # we only need https proxy for Slack, as the endpoint is https
-proxies = {'https': self.proxy}
+self.extra_options.update({'proxies': {'https': self.proxy}})
 
 slack_message = self._build_slack_message()
 self.run(endpoint=self.webhook_token,
  data=slack_message,
  headers={'Content-type': 'application/json'},
- extra_options={'proxies': proxies})
+ extra_options=self.extra_options)
diff --git a/airflow/contrib/operators/slack_webhook_operator.py 
b/airflow/contrib/operators/slack_webhook_operator.py
index 6169524..3b1bcfa 100644
--- a/airflow/contrib/operators/slack_webhook_operator.py
+++ b/airflow/contrib/operators/slack_webhook_operator.py
@@ -57,10 +57,12 @@ class SlackWebhookOperator(SimpleHttpOperator):
 :type link_names: bool
 :param proxy: Proxy to use to make the Slack webhook call
 :type proxy: str
+:param extra_options: Extra options for http hook
+:type extra_options: dict
 """
 
 template_fields = ['webhook_token', 'message', 'attachments', 'blocks', 
'channel',
-   'username', 'proxy', ]
+   'username', 'proxy', 'extra_options', ]
 
 @apply_defaults
 def __init__(self,
@@ -74,6 +76,7 @@ class SlackWebhookOperator(SimpleHttpOperator):
  icon_emoji=None,
  icon_url=None,
  link_names=False,
+ extra_options=None,
  proxy=None,
  *args,
  **kwargs):
@@ -92,6 +95,7 @@ class SlackWebhookOperator(SimpleHttpOperator):
 self.link_names = link_names
 self.proxy = proxy
 self.hook = None
+self.extra_options = extra_options
 
 def execute(self, context):
 """
@@ -108,6 +112,7 @@ class SlackWebhookOperator(SimpleHttpOperator):
 self.icon_emoji,
 self.icon_url,
 self.link_names,
-self.proxy
+self.proxy,
+self.extra_options
 )
 self.hook.execute()
diff --git a/tests/contrib/hooks/test_slack_webhook_hook.py 
b/tests/contrib/hooks/test_slack_webhook_hook.py
index 13663c5..fea2d82 100644
--- a/tests/contrib/hooks/test_slack_webhook_hook.py
+++ b/tests/contrib/hooks/test_slack_webhook_hook.py
@@ -40,7 +40,8 @@ class TestSlackWebhookHook(unittest.TestCase):
 'icon_emoji': ':hankey:',
 'icon_url': 'https://airflow.apache.org/_images/pin_large.png',
 'link_names': True,
-'proxy': 'https://my-horrible-proxy.proxyist.com:8080'
+'proxy': 'https://my-horrible-proxy.proxyist.com:8080',
+'extra_options': {"verify": True}
 }
 expected_message_dict = {
 'channel': _config['channel'],
diff --git a/tests/contrib/operators/test_slack_webhook_operator.py 

[GitHub] [airflow] benbenbang closed issue #9407: SSL Issue on K8S with SlackWebhookOperator

2020-06-23 Thread GitBox


benbenbang closed issue #9407:
URL: https://github.com/apache/airflow/issues/9407


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] kaxil commented on pull request #8992: [AIRFLOW-5391] Do not re-run skipped tasks when they are cleared (#7276)

2020-06-23 Thread GitBox


kaxil commented on pull request #8992:
URL: https://github.com/apache/airflow/pull/8992#issuecomment-648282668


   Doc tests are failing 🤔 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-5391) Clearing a task skipped by BranchPythonOperator will cause the task to execute

2020-06-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-5391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17143100#comment-17143100
 ] 

ASF GitHub Bot commented on AIRFLOW-5391:
-

kaxil commented on pull request #8992:
URL: https://github.com/apache/airflow/pull/8992#issuecomment-648282668


   Doc tests are failing 🤔 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Clearing a task skipped by BranchPythonOperator will cause the task to execute
> --
>
> Key: AIRFLOW-5391
> URL: https://issues.apache.org/jira/browse/AIRFLOW-5391
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: operators
>Affects Versions: 1.10.4
>Reporter: Qian Yu
>Assignee: Qian Yu
>Priority: Major
> Fix For: 2.0.0
>
>
> I tried this on 1.10.3 and 1.10.4, both have this issue: 
> E.g. in this example from the doc, branch_a executed, branch_false was 
> skipped because of branching condition. However if someone Clear 
> branch_false, it'll cause branch_false to execute. 
> !https://airflow.apache.org/_images/branch_good.png!
> This behaviour is understandable given how BranchPythonOperator is 
> implemented. BranchPythonOperator does not store its decision anywhere. It 
> skips its own downstream tasks in the branch at runtime. So there's currently 
> no way for branch_false to know it should be skipped without rerunning the 
> branching task.
> This is obviously counter-intuitive from the user's perspective. In this 
> example, users would not expect branch_false to execute when they clear it 
> because the branching task should have skipped it.
> There are a few ways to improve this:
> Option 1): Make downstream tasks skipped by BranchPythonOperator not 
> clearable without also clearing the upstream BranchPythonOperator. In this 
> example, if someone clears branch_false without clearing branching, the Clear 
> action should just fail with an error telling the user he needs to clear the 
> branching task as well.
> Option 2): Make BranchPythonOperator store the result of its skip condition 
> somewhere. Make downstream tasks check for this stored decision and skip 
> themselves if they should have been skipped by the condition. This probably 
> means the decision of BranchPythonOperator needs to be stored in the db.
>  
> [kevcampb|https://blog.diffractive.io/author/kevcampb/] attempted a 
> workaround and on this blog. And he acknowledged his workaround is not 
> perfect and a better permanent fix is needed:
> [https://blog.diffractive.io/2018/08/07/replacement-shortcircuitoperator-for-airflow/]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] aneesh-joseph edited a comment on pull request #8777: Add Production Helm chart support

2020-06-23 Thread GitBox


aneesh-joseph edited a comment on pull request #8777:
URL: https://github.com/apache/airflow/pull/8777#issuecomment-648229681


   @Siddharthk , for `3`, I added the below config to the `airflow.cfg` 
`kubernetes` section of the configmap
   
   ```
   run_as_user = {{ .Values.uid }}
   fs_group = {{ .Values.gid }}
   ```
   
   so that it runs as  the correct airflow user, instead of root 
   
   Have added this to the config map  in a 
[PR](https://github.com/apache/airflow/pull/9371)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] panas2567 commented on pull request #5288: AIRFLOW-4512 - New trigger rule for tasks - At least one successful

2020-06-23 Thread GitBox


panas2567 commented on pull request #5288:
URL: https://github.com/apache/airflow/pull/5288#issuecomment-648324441


   > Hello,
   > Is there any method to achieve the goal of atleast_one_success ?
   > As I would need this behaviour in one of my DAG.
   > Thanks,
   
   Create two mid tasks: 
   _check_one_success_ with trigger rule one_success 
   _check_all_done_ with trigger rule all_done
   Then joining like:
   [task1, task2, ..., taskN] >> check_one_success >> 
task_with_trigger_rule_at_least_one_success
   [task1, task2, ..., taskN] >> check_all_done >> 
task_with_trigger_rule_at_least_one_success



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-4512) New trigger rule for tasks - At least one successful

2020-06-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17143169#comment-17143169
 ] 

ASF GitHub Bot commented on AIRFLOW-4512:
-

panas2567 commented on pull request #5288:
URL: https://github.com/apache/airflow/pull/5288#issuecomment-648324441


   > Hello,
   > Is there any method to achieve the goal of atleast_one_success ?
   > As I would need this behaviour in one of my DAG.
   > Thanks,
   
   Create two mid tasks: 
   _check_one_success_ with trigger rule one_success 
   _check_all_done_ with trigger rule all_done
   Then joining like:
   [task1, task2, ..., taskN] >> check_one_success >> 
task_with_trigger_rule_at_least_one_success
   [task1, task2, ..., taskN] >> check_all_done >> 
task_with_trigger_rule_at_least_one_success



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> New trigger rule for tasks - At least one successful
> 
>
> Key: AIRFLOW-4512
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4512
> Project: Apache Airflow
>  Issue Type: New Feature
>  Components: core, dependencies
>Affects Versions: 1.10.3
>Reporter: Bharath Palaksha
>Assignee: Bharath Palaksha
>Priority: Minor
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> {{New trigger rule - *atleast_one_success*}}
> {{Trigger rules help in defining better dependencies on up stream tasks. 
> There are good number of rules currently defined which are as below}}
>  * {{all_success}}: (default) all parents have succeeded
>  * {{all_failed}}: all parents are in a {{failed}} or {{upstream_failed}} 
> state
>  * {{all_done}}: all parents are done with their execution
>  * {{one_failed}}: fires as soon as at least one parent has failed, it does 
> not wait for all parents to be done
>  * {{one_success}}: fires as soon as at least one parent succeeds, it does 
> not wait for all parents to be done
>  * {{none_failed}}: all parents have not failed ({{failed}} or 
> {{upstream_failed}}) i.e. all parents have succeeded or been skipped
>  * {{none_skipped}}: no parent is in a {{skipped}} state, i.e. all parents 
> are in a {{success}}, {{failed}}, or {{upstream_failed}} state
>  * {{dummy}}: dependencies are just for show, trigger at will
>  
> There can be another rule added here which is *atleast_one_success* - This 
> waits for all parent tasks to be complete and checks if at least one parent 
> is successful and triggers current task. It differs from one_success as it 
> waits for all parents to be done. 
> Consider a very common scenario in data pipelines where you have a number of 
> parallel tasks generating some data. As a downstream task to all these 
> generate tasks, you have a task to collate all data into one collection which 
> has to run if any of the upstream generate is successful and also has to wait 
> for all of them to be done. one_success can't be used as it doesn't wait for 
> other tasks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] panas2567 edited a comment on pull request #5288: AIRFLOW-4512 - New trigger rule for tasks - At least one successful

2020-06-23 Thread GitBox


panas2567 edited a comment on pull request #5288:
URL: https://github.com/apache/airflow/pull/5288#issuecomment-648324441


   > Hello,
   > Is there any method to achieve the goal of atleast_one_success ?
   > As I would need this behaviour in one of my DAG.
   > Thanks,
   
   Create two mid tasks: 
   _check_one_success_ with trigger rule one_success 
   _check_all_done_ with trigger rule all_done
   Then joining like:
   [task1, task2, ..., taskN] >> check_one_success >> 
task_with_trigger_rule_at_least_one_success
   [task1, task2, ..., taskN] >> check_all_done >> 
task_with_trigger_rule_at_least_one_success
   The task_with_trigger_rule_at_least_one_success has to have trigger rule set 
to all_success



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] casassg commented on pull request #8962: [AIRFLOW-8057] [AIP-31] Add @task decorator

2020-06-23 Thread GitBox


casassg commented on pull request #8962:
URL: https://github.com/apache/airflow/pull/8962#issuecomment-648326231


   Rebased from latest master to see if integration tests are fixed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (AIRFLOW-4512) New trigger rule for tasks - At least one successful

2020-06-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17143171#comment-17143171
 ] 

ASF GitHub Bot commented on AIRFLOW-4512:
-

panas2567 edited a comment on pull request #5288:
URL: https://github.com/apache/airflow/pull/5288#issuecomment-648324441


   > Hello,
   > Is there any method to achieve the goal of atleast_one_success ?
   > As I would need this behaviour in one of my DAG.
   > Thanks,
   
   Create two mid tasks: 
   _check_one_success_ with trigger rule one_success 
   _check_all_done_ with trigger rule all_done
   Then joining like:
   [task1, task2, ..., taskN] >> check_one_success >> 
task_with_trigger_rule_at_least_one_success
   [task1, task2, ..., taskN] >> check_all_done >> 
task_with_trigger_rule_at_least_one_success
   The task_with_trigger_rule_at_least_one_success has to have trigger rule set 
to all_success



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> New trigger rule for tasks - At least one successful
> 
>
> Key: AIRFLOW-4512
> URL: https://issues.apache.org/jira/browse/AIRFLOW-4512
> Project: Apache Airflow
>  Issue Type: New Feature
>  Components: core, dependencies
>Affects Versions: 1.10.3
>Reporter: Bharath Palaksha
>Assignee: Bharath Palaksha
>Priority: Minor
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> {{New trigger rule - *atleast_one_success*}}
> {{Trigger rules help in defining better dependencies on up stream tasks. 
> There are good number of rules currently defined which are as below}}
>  * {{all_success}}: (default) all parents have succeeded
>  * {{all_failed}}: all parents are in a {{failed}} or {{upstream_failed}} 
> state
>  * {{all_done}}: all parents are done with their execution
>  * {{one_failed}}: fires as soon as at least one parent has failed, it does 
> not wait for all parents to be done
>  * {{one_success}}: fires as soon as at least one parent succeeds, it does 
> not wait for all parents to be done
>  * {{none_failed}}: all parents have not failed ({{failed}} or 
> {{upstream_failed}}) i.e. all parents have succeeded or been skipped
>  * {{none_skipped}}: no parent is in a {{skipped}} state, i.e. all parents 
> are in a {{success}}, {{failed}}, or {{upstream_failed}} state
>  * {{dummy}}: dependencies are just for show, trigger at will
>  
> There can be another rule added here which is *atleast_one_success* - This 
> waits for all parent tasks to be complete and checks if at least one parent 
> is successful and triggers current task. It differs from one_success as it 
> waits for all parents to be done. 
> Consider a very common scenario in data pipelines where you have a number of 
> parallel tasks generating some data. As a downstream task to all these 
> generate tasks, you have a task to collate all data into one collection which 
> has to run if any of the upstream generate is successful and also has to wait 
> for all of them to be done. one_success can't be used as it doesn't wait for 
> other tasks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [airflow] mik-laj commented on issue #9418: Deprecated AI Platform Operators and Runtimes in Example DAG

2020-06-23 Thread GitBox


mik-laj commented on issue #9418:
URL: https://github.com/apache/airflow/issues/9418#issuecomment-648364194


   Unfortunately, the community has not developed procedures for storing files 
necessary for testing. We don't run these tests automatically on CI yet. Each 
team that works on integrations must provide these files on their own. This 
should not be difficult based on official guides for this service.
   https://cloud.google.com/ai-platform/training/docs/
   
   I have the following environment variables configured to run these tests. 
   ```
   GCP_PROJECT_ID=polidea-airflow
   
   GCP_MLENGINE_BUCKET_NAME_EPHEMERAL=polidea-airflow-tests-38
   GCP_MLENGINE_BUCKET_NAME_PERSISTENT=test-airflow-mlengine-persistent
   
   GCP_MLENGINE_MODEL_NAME=airflow_test_ci_model_name_19837
   
   
GCP_MLENGINE_PREDICTION_INPUT=gs://test-airflow-mlengine-persistent/prediction_input.json
   
GCP_MLENGINE_TRAINER_URI=gs://test-airflow-mlengine-persistent/trainer-0.0.0.tar.gz
   
   GCP_MLENGINE_DATAFLOW_STAGING=gs://polidea-airflow-tests-38/staging/
   GCP_MLENGINE_DATAFLOW_TMP=gs://polidea-airflow-tests-38/tmp/
   GCP_MLENGINE_JOB_DIR=gs://polidea-airflow-tests-38/job-dir
   
GCP_MLENGINE_PREDICTION_OUTPUT=gs://polidea-airflow-tests-38/prediction_output/
   
GCP_MLENGINE_SAVED_MODEL_PATH=gs://polidea-airflow-tests-38/job-dir/keras_export/
   ```
   I prepared copies of the necessary files for you and made them available in 
a public bucket.
   ```
   gs://airflow-polidea-googl-system-tests-resources-public
   ```
   I hope this information will help you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] mik-laj edited a comment on issue #9418: Deprecated AI Platform Operators and Runtimes in Example DAG

2020-06-23 Thread GitBox


mik-laj edited a comment on issue #9418:
URL: https://github.com/apache/airflow/issues/9418#issuecomment-648364194


   Unfortunately, the community has not developed procedures for storing files 
necessary for testing. We don't run these tests automatically on CI yet. Each 
team that works on integrations must provide these files on their own. This 
should not be difficult based on official guides for this service.
   https://cloud.google.com/ai-platform/training/docs/
   
   I have the following environment variables configured to run these tests. 
   ```
   GCP_PROJECT_ID=polidea-airflow
   
   GCP_MLENGINE_BUCKET_NAME_EPHEMERAL=polidea-airflow-tests-38
   GCP_MLENGINE_BUCKET_NAME_PERSISTENT=test-airflow-mlengine-persistent
   
   GCP_MLENGINE_MODEL_NAME=airflow_test_ci_model_name_19837
   
   
GCP_MLENGINE_PREDICTION_INPUT=gs://test-airflow-mlengine-persistent/prediction_input.json
   
GCP_MLENGINE_TRAINER_URI=gs://test-airflow-mlengine-persistent/trainer-0.0.0.tar.gz
   
   GCP_MLENGINE_DATAFLOW_STAGING=gs://polidea-airflow-tests-38/staging/
   GCP_MLENGINE_DATAFLOW_TMP=gs://polidea-airflow-tests-38/tmp/
   GCP_MLENGINE_JOB_DIR=gs://polidea-airflow-tests-38/job-dir
   
GCP_MLENGINE_PREDICTION_OUTPUT=gs://polidea-airflow-tests-38/prediction_output/
   
GCP_MLENGINE_SAVED_MODEL_PATH=gs://polidea-airflow-tests-38/job-dir/keras_export/
   ```
   I prepared copies of the necessary files for you and made them available in 
a public bucket. Please make a copy of this bucket if you want to work on this 
integration, because this bucket can be deleted at any time. 
   ```
   gs://airflow-polidea-googl-system-tests-resources-public
   ```
   I hope this information will help you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[airflow] 01/02: Move KubernetesPodOperator into providers package

2020-06-23 Thread dimberman
This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 2324175fab7aff0d35651825e48eb7d651c8a845
Author: Daniel Imberman 
AuthorDate: Tue Jun 23 11:49:51 2020 -0700

Move KubernetesPodOperator into providers package

This commit will make merging PRs simpler while also
maintaining existing paths.
---
 .../contrib/operators/kubernetes_pod_operator.py   | 276 +
 .../cncf/kubernetes/operators/kubernetes_pod.py}   |   0
 chart/charts/postgresql-6.3.12.tgz | Bin 0 -> 22754 bytes
 3 files changed, 12 insertions(+), 264 deletions(-)

diff --git a/airflow/contrib/operators/kubernetes_pod_operator.py 
b/airflow/contrib/operators/kubernetes_pod_operator.py
index b89a37f..382f965 100644
--- a/airflow/contrib/operators/kubernetes_pod_operator.py
+++ b/airflow/contrib/operators/kubernetes_pod_operator.py
@@ -17,272 +17,20 @@
 """Executes task in a Kubernetes POD"""
 import warnings
 
-import re
+from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import 
KubernetesPodOperator as K8sPodOp
 
-from airflow.exceptions import AirflowException
-from airflow.kubernetes import kube_client, pod_generator, pod_launcher
-from airflow.kubernetes.k8s_model import append_to_pod
-from airflow.kubernetes.pod import Resources
-from airflow.models import BaseOperator
-from airflow.utils.decorators import apply_defaults
-from airflow.utils.helpers import validate_key
-from airflow.utils.state import State
-from airflow.version import version as airflow_version
 
-
-class KubernetesPodOperator(BaseOperator):  # pylint: 
disable=too-many-instance-attributes
+class KubernetesPodOperator(K8sPodOp):
 """
-Execute a task in a Kubernetes Pod
-
-.. note::
-If you use `Google Kubernetes Engine 
`__, use
-:class:`~airflow.gcp.operators.kubernetes_engine.GKEPodOperator`, which
-simplifies the authorization process.
-
-:param image: Docker image you wish to launch. Defaults to hub.docker.com,
-but fully qualified URLS will point to custom repositories.
-:type image: str
-:param name: name of the pod in which the task will run, will be used 
(plus a random
-suffix) to generate a pod id (DNS-1123 subdomain, containing only 
[a-z0-9.-]).
-:type name: str
-:param cmds: entrypoint of the container. (templated)
-The docker images's entrypoint is used if this is not provided.
-:type cmds: list[str]
-:param arguments: arguments of the entrypoint. (templated)
-The docker image's CMD is used if this is not provided.
-:type arguments: list[str]
-:param image_pull_policy: Specify a policy to cache or always pull an 
image.
-:type image_pull_policy: str
-:param image_pull_secrets: Any image pull secrets to be given to the pod.
-   If more than one secret is required, provide a
-   comma separated list: secret_a,secret_b
-:type image_pull_secrets: str
-:param ports: ports for launched pod.
-:type ports: list[airflow.kubernetes.pod.Port]
-:param volume_mounts: volumeMounts for launched pod.
-:type volume_mounts: list[airflow.kubernetes.volume_mount.VolumeMount]
-:param volumes: volumes for launched pod. Includes ConfigMaps and 
PersistentVolumes.
-:type volumes: list[airflow.kubernetes.volume.Volume]
-:param labels: labels to apply to the Pod.
-:type labels: dict
-:param startup_timeout_seconds: timeout in seconds to startup the pod.
-:type startup_timeout_seconds: int
-:param name: name of the pod in which the task will run, will be used to
-generate a pod id (DNS-1123 subdomain, containing only [a-z0-9.-]).
-:type name: str
-:param env_vars: Environment variables initialized in the container. 
(templated)
-:type env_vars: dict
-:param secrets: Kubernetes secrets to inject in the container.
-They can be exposed as environment vars or files in a volume.
-:type secrets: list[airflow.kubernetes.secret.Secret]
-:param in_cluster: run kubernetes client with in_cluster configuration.
-:type in_cluster: bool
-:param cluster_context: context that points to kubernetes cluster.
-Ignored when in_cluster is True. If None, current-context is used.
-:type cluster_context: str
-:param get_logs: get the stdout of the container as logs of the tasks.
-:type get_logs: bool
-:param annotations: non-identifying metadata you can attach to the Pod.
-Can be a large range of data, and can include 
characters
-that are not permitted by labels.
-:type annotations: dict
-:param resources: A dict containing resources requests and limits.
-Possible keys are request_memory, request_cpu, limit_memory, limit_cpu,
- 

[airflow] 02/02: Monitor pods by labels instead of names (#6377)

2020-06-23 Thread dimberman
This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit dd7caa6721129aab42b0f46a58d758115fc731f3
Author: Daniel Imberman 
AuthorDate: Sat May 16 14:13:58 2020 -0700

Monitor pods by labels instead of names (#6377)

* Monitor k8sPodOperator pods by labels

To prevent situations where the scheduler starts a
second k8sPodOperator pod after a restart, we now check
for existing pods using kubernetes labels

* Update airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py

Co-authored-by: Kaxil Naik 

* Update airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py

Co-authored-by: Kaxil Naik 

* add docs

* Update airflow/kubernetes/pod_launcher.py

Co-authored-by: Kaxil Naik 

Co-authored-by: Daniel Imberman 
Co-authored-by: Kaxil Naik 
(cherry picked from commit 8985df0bfcb5f2b2cd69a21b9814021f9f8ce953)
---
 airflow/executors/kubernetes_executor.py   |  34 +--
 airflow/kubernetes/pod_generator.py|  42 +++
 airflow/kubernetes/pod_launcher.py |  41 ++-
 .../cncf/kubernetes/operators/kubernetes_pod.py| 302 ++---
 kubernetes_tests/test_kubernetes_pod_operator.py   | 182 +++--
 tests/executors/test_kubernetes_executor.py|  17 +-
 6 files changed, 456 insertions(+), 162 deletions(-)

diff --git a/airflow/executors/kubernetes_executor.py 
b/airflow/executors/kubernetes_executor.py
index 8b5fdc1..2036f4f 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -33,6 +33,8 @@ from kubernetes.client.rest import ApiException
 from urllib3.exceptions import HTTPError, ReadTimeoutError
 
 from airflow.configuration import conf
+from airflow.kubernetes import pod_generator
+from airflow.kubernetes.pod_generator import MAX_POD_ID_LEN, PodGenerator
 from airflow.kubernetes.pod_launcher import PodLauncher
 from airflow.kubernetes.kube_client import get_kube_client
 from airflow.kubernetes.worker_configuration import WorkerConfiguration
@@ -45,7 +47,6 @@ from airflow import settings
 from airflow.exceptions import AirflowConfigException, AirflowException
 from airflow.utils.log.logging_mixin import LoggingMixin
 
-MAX_POD_ID_LEN = 253
 MAX_LABEL_LEN = 63
 
 
@@ -402,8 +403,8 @@ class AirflowKubernetesScheduler(LoggingMixin):
 namespace=self.namespace,
 worker_uuid=self.worker_uuid,
 pod_id=self._create_pod_id(dag_id, task_id),
-dag_id=self._make_safe_label_value(dag_id),
-task_id=self._make_safe_label_value(task_id),
+dag_id=pod_generator.make_safe_label_value(dag_id),
+task_id=pod_generator.make_safe_label_value(task_id),
 try_number=try_number,
 
execution_date=self._datetime_to_label_safe_datestring(execution_date),
 airflow_command=command
@@ -495,25 +496,6 @@ class AirflowKubernetesScheduler(LoggingMixin):
 return safe_pod_id
 
 @staticmethod
-def _make_safe_label_value(string):
-"""
-Valid label values must be 63 characters or less and must be empty or 
begin and
-end with an alphanumeric character ([a-z0-9A-Z]) with dashes (-), 
underscores (_),
-dots (.), and alphanumerics between.
-
-If the label value is then greater than 63 chars once made safe, or 
differs in any
-way from the original value sent to this function, then we need to 
truncate to
-53chars, and append it with a unique hash.
-"""
-safe_label = 
re.sub(r'^[^a-z0-9A-Z]*|[^a-zA-Z0-9_\-\.]|[^a-z0-9A-Z]*$', '', string)
-
-if len(safe_label) > MAX_LABEL_LEN or string != safe_label:
-safe_hash = hashlib.md5(string.encode()).hexdigest()[:9]
-safe_label = safe_label[:MAX_LABEL_LEN - len(safe_hash) - 1] + "-" 
+ safe_hash
-
-return safe_label
-
-@staticmethod
 def _create_pod_id(dag_id, task_id):
 safe_dag_id = 
AirflowKubernetesScheduler._strip_unsafe_kubernetes_special_chars(
 dag_id)
@@ -599,8 +581,8 @@ class AirflowKubernetesScheduler(LoggingMixin):
 )
 for task in tasks:
 if (
-self._make_safe_label_value(task.dag_id) == dag_id and
-self._make_safe_label_value(task.task_id) == task_id and
+pod_generator.make_safe_label_value(task.dag_id) == dag_id 
and
+pod_generator.make_safe_label_value(task.task_id) == 
task_id and
 task.execution_date == ex_time
 ):
 self.log.info(
@@ -683,8 +665,8 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
 # pylint: disable=protected-access
 dict_string = (
 
"dag_id={},task_id={},e

[airflow] branch v1-10-test updated (33973ab -> dd7caa6)

2020-06-23 Thread dimberman
This is an automated email from the ASF dual-hosted git repository.

dimberman pushed a change to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git.


from 33973ab  Add extra options to fix SSL issue and be more flexible 
(#9409)
 new 2324175  Move KubernetesPodOperator into providers package
 new dd7caa6  Monitor pods by labels instead of names (#6377)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../contrib/operators/kubernetes_pod_operator.py   | 276 +--
 airflow/executors/kubernetes_executor.py   |  34 +--
 airflow/kubernetes/pod_generator.py|  42 +++
 airflow/kubernetes/pod_launcher.py |  41 ++-
 .../cncf/kubernetes/operators/kubernetes_pod.py}   | 302 ++---
 chart/charts/postgresql-6.3.12.tgz | Bin 0 -> 22754 bytes
 kubernetes_tests/test_kubernetes_pod_operator.py   | 182 +++--
 tests/executors/test_kubernetes_executor.py|  17 +-
 8 files changed, 468 insertions(+), 426 deletions(-)
 copy airflow/{contrib/operators/kubernetes_pod_operator.py => 
providers/cncf/kubernetes/operators/kubernetes_pod.py} (57%)
 create mode 100644 chart/charts/postgresql-6.3.12.tgz



[GitHub] [airflow] WesleyBatista opened a new issue #9490: Unable to retrieve logs after Out Of Memory Error

2020-06-23 Thread GitBox


WesleyBatista opened a new issue #9490:
URL: https://github.com/apache/airflow/issues/9490


   
   **Apache Airflow version**: 1.10.10
   
   **Environment**:
   
   - **Cloud provider or hardware configuration**:
   - **OS** (e.g. from /etc/os-release): `Debian GNU/Linux 9 (stretch)`
   - **Kernel** (e.g. `uname -a`): `Linux  4.9.0-3-amd64 #1 SMP 
Debian 4.9.30-2+deb9u5 (2017-09-19) x86_64 GNU/Linux`
   
   **What happened**:
   
   A task got executed and failed, but the execution logs are inaccessible from 
the UI.
   
   
![image](https://user-images.githubusercontent.com/755254/85449070-9f912480-b597-11ea-9f6e-9ebf2ede720a.png)
   
   ```
   $ cat /var/log/airflow///2020-06-15T01\:23\:00+00\:00/1.log
   ... 
   [2020-06-22 20:15:35,141] {logging_mixin.py:112} INFO - [2020-06-22 
20:15:35,091] {local_task_job.py:103} INFO - Task exited with return code -9
   ```
   
   **What you expected to happen**:
   
   Execution logs should be available on the UI.
   
   
   **How to reproduce it**:
   
   In our case it was an [OOM error](https://stackoverflow.com/a/18529453), but 
maybe any unhandled error can be used to reproduce a crash, or perhaps even 
issuing a `kill -9` on the process running the task could reproduce the issue.
   
   **Anything else we need to know**:
   
   1. I found this issue that helped me to understand the error: 
https://issues.apache.org/jira/browse/AIRFLOW-4922 
   
   2. Our current setup:
   - **machine1** running `airflow-webserver`, `airflow-scheduler`, 
`airflow-worker`
   - **machine2**  running `airflow-worker`
   
   The task is set to be executed on **machine2**
   3. These are the task instances currently on the airflow metadata database:
   
![bug-no-logs-airflow](https://user-images.githubusercontent.com/755254/85451797-876ed480-b59a-11ea-8feb-d38204a4135b.png)
   
   We can see in the image that the one which took the most duration time 
and eventually got killed by OOM is also the one without a `hostname` set.
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9331: add log endpoint

2020-06-23 Thread GitBox


ephraimbuddy commented on a change in pull request #9331:
URL: https://github.com/apache/airflow/pull/9331#discussion_r81257



##
File path: tests/api_connexion/endpoints/test_log_endpoint.py
##
@@ -14,25 +14,254 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+import copy
+import logging.config
+import os
+import shutil
+import sys
+import tempfile
 import unittest
+from unittest import mock
+from unittest.mock import PropertyMock
 
-import pytest
+from itsdangerous.url_safe import URLSafeSerializer
 
+from airflow import DAG, settings
+from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONFIG
+from airflow.models import DagRun, TaskInstance
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils import timezone
+from airflow.utils.session import create_session, provide_session
+from airflow.utils.types import DagRunType
 from airflow.www import app
+from tests.test_utils.config import conf_vars
+from tests.test_utils.db import clear_db_runs
 
 
 class TestGetLog(unittest.TestCase):
+DAG_ID = 'dag_for_testing_log_endpoint'
+DAG_ID_REMOVED = 'removed_dag_for_testing_log_endpoint'
+TASK_ID = 'task_for_testing_log_endpoint'
+TRY_NUMBER = 1
+
 @classmethod
-def setUpClass(cls) -> None:
-super().setUpClass()
-cls.app = app.create_app(testing=True)  # type:ignore
+def setUpClass(cls):
+settings.configure_orm()
+cls.session = settings.Session
+cls.app = app.create_app(testing=True)
 
 def setUp(self) -> None:
-self.client = self.app.test_client()  # type:ignore
+self.default_time = "2020-06-10T20:00:00+00:00"
+self.client = self.app.test_client()
+self.log_dir = tempfile.mkdtemp()
+# Make sure that the configure_logging is not cached
+self.old_modules = dict(sys.modules)
+self._prepare_log_files()
+self._configure_loggers()
+self._prepare_db()
+
+def _create_dagrun(self, session):
+dagrun_model = DagRun(
+dag_id=self.DAG_ID,
+run_id='TEST_DAG_RUN_ID',
+run_type=DagRunType.MANUAL.value,
+execution_date=timezone.parse(self.default_time),
+start_date=timezone.parse(self.default_time),
+external_trigger=True,
+)
+session.add(dagrun_model)
+session.commit()
+
+def _configure_loggers(self):
+# Create a custom logging configuration
+logging_config = copy.deepcopy(DEFAULT_LOGGING_CONFIG)
+logging_config['handlers']['task']['base_log_folder'] = self.log_dir
+
+logging_config['handlers']['task']['filename_template'] = \
+'{{ ti.dag_id }}/{{ ti.task_id }}/' \
+'{{ ts | replace(":", ".") }}/{{ try_number }}.log'
+
+# Write the custom logging configuration to a file
+self.settings_folder = tempfile.mkdtemp()
+settings_file = os.path.join(self.settings_folder, 
"airflow_local_settings.py")
+new_logging_file = "LOGGING_CONFIG = {}".format(logging_config)
+with open(settings_file, 'w') as handle:
+handle.writelines(new_logging_file)
+sys.path.append(self.settings_folder)
+
+with conf_vars({('logging', 'logging_config_class'): 
'airflow_local_settings.LOGGING_CONFIG'}):
+self.app = app.create_app(testing=True)
+self.client = self.app.test_client()
+settings.configure_logging()
+
+def _prepare_db(self):
+dagbag = self.app.dag_bag  # pylint: disable=no-member
+dag = DAG(self.DAG_ID, start_date=timezone.parse(self.default_time))
+dag.sync_to_db()
+dag_removed = DAG(self.DAG_ID_REMOVED, 
start_date=timezone.parse(self.default_time))
+dag_removed.sync_to_db()
+dagbag.bag_dag(dag, parent_dag=dag, root_dag=dag)
+with create_session() as session:
+self.ti = TaskInstance(
+task=DummyOperator(task_id=self.TASK_ID, dag=dag),
+execution_date=timezone.parse(self.default_time)
+)
+self.ti.try_number = 1
+self.ti_removed_dag = TaskInstance(
+task=DummyOperator(task_id=self.TASK_ID, dag=dag_removed),
+execution_date=timezone.parse(self.default_time)
+)
+self.ti_removed_dag.try_number = 1
+
+session.merge(self.ti)
+session.merge(self.ti_removed_dag)
+
+def _prepare_log_files(self):
+dir_path = f"{self.log_dir}/{self.DAG_ID}/{self.TASK_ID}/" \
+   f"{self.default_time.replace(':', '.')}/"
+os.makedirs(dir_path)
+with open(f"{dir_path}/1.log", "w+") as file:
+file.write("Log for testing.")
+file.flush()
 
-@pytest.mark.skip(reason="Not implemented yet")
-def test_should_response_200

[GitHub] [airflow] ephraimbuddy commented on a change in pull request #9431: Move API page limit and offset parameters to views as kwargs Arguments

2020-06-23 Thread GitBox


ephraimbuddy commented on a change in pull request #9431:
URL: https://github.com/apache/airflow/pull/9431#discussion_r444510988



##
File path: airflow/api_connexion/parameters.py
##
@@ -48,6 +46,21 @@ def format_datetime(value: str):
 )
 
 
+def check_limit(value: int):
+"""
+This checks the limit passed to view and raises BadRequest if
+limit exceed user configured value
+"""
+max_val = conf.getint("api", "maximum_page_limit")
+
+if value > max_val:

Review comment:
   Nice. I think It is nice. Let me work on it now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[airflow] branch master updated: Replace deprecated wtforms HTMLString with markupsafe.MarkUp (#9487)

2020-06-23 Thread kaxilnaik
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
 new 7c587dc  Replace deprecated wtforms HTMLString with markupsafe.MarkUp 
(#9487)
7c587dc is described below

commit 7c587dc486c4f637c681e2e8b61a12ebf99c0f14
Author: Kaxil Naik 
AuthorDate: Tue Jun 23 22:27:23 2020 +0100

Replace deprecated wtforms HTMLString with markupsafe.MarkUp (#9487)

WTForms uses `MarkUp` to escape strings now and removed their internal 
class HTMLString in Master. Details: https://github.com/wtforms/wtforms/pull/400

That change previously broke Airflow for new users (in 2.3.0). However on 
users request they added `HTMLString` that just passes all args to 
`markupsafe.MarkUp` back for temporary Backward compatbility with deprecation 
warning in 2.3.1. Details: https://github.com/wtforms/wtforms/issues/581
---
 airflow/www/widgets.py  |  5 +++--
 requirements/requirements-python3.6.txt | 20 ++--
 requirements/requirements-python3.7.txt | 20 ++--
 requirements/requirements-python3.8.txt | 24 
 requirements/setup-3.6.md5  |  2 +-
 requirements/setup-3.7.md5  |  2 +-
 requirements/setup-3.8.md5  |  2 +-
 setup.py|  1 +
 8 files changed, 39 insertions(+), 37 deletions(-)

diff --git a/airflow/www/widgets.py b/airflow/www/widgets.py
index 4d6582f..2f687cc 100644
--- a/airflow/www/widgets.py
+++ b/airflow/www/widgets.py
@@ -17,7 +17,8 @@
 # under the License.
 
 from flask_appbuilder.widgets import RenderTemplateWidget
-from wtforms.widgets import HTMLString, html_params
+from markupsafe import Markup
+from wtforms.widgets import html_params
 
 
 class AirflowModelListWidget(RenderTemplateWidget):
@@ -43,6 +44,6 @@ class AirflowDateTimePickerWidget:
 field.data = ""
 template = self.data_template
 
-return HTMLString(
+return Markup(
 template % {"text": html_params(type="text", value=field.data, 
**kwargs)}
 )
diff --git a/requirements/requirements-python3.6.txt 
b/requirements/requirements-python3.6.txt
index b7eb84d..8e0e6ad 100644
--- a/requirements/requirements-python3.6.txt
+++ b/requirements/requirements-python3.6.txt
@@ -72,9 +72,9 @@ beautifulsoup4==4.7.1
 billiard==3.6.3.0
 black==19.10b0
 blinker==1.4
-boto3==1.14.7
+boto3==1.14.8
 boto==2.49.0
-botocore==1.17.7
+botocore==1.17.8
 bowler==0.8.0
 cached-property==1.5.1
 cachetools==4.1.0
@@ -101,7 +101,7 @@ cryptography==2.9.2
 curlify==2.2.1
 cx-Oracle==7.3.0
 dask==2.19.0
-datadog==0.36.0
+datadog==0.37.0
 decorator==4.4.2
 defusedxml==0.6.0
 dill==0.3.2
@@ -120,8 +120,8 @@ email-validator==1.1.1
 entrypoints==0.3
 eventlet==0.25.2
 execnet==1.7.1
-facebook-business==7.0.2
-fastavro==0.23.4
+facebook-business==7.0.3
+fastavro==0.23.5
 filelock==3.0.12
 fissix==20.5.1
 flake8-colors==0.1.6
@@ -180,7 +180,7 @@ hmsclient==0.1.1
 httplib2==0.18.1
 humanize==0.5.1
 hvac==0.10.4
-identify==1.4.19
+identify==1.4.20
 idna-ssl==1.1.0
 idna==2.9
 ijson==2.6.1
@@ -201,7 +201,7 @@ jira==2.0.0
 jmespath==0.10.0
 json-merge-patch==0.2
 jsondiff==1.1.2
-jsonpatch==1.25
+jsonpatch==1.26
 jsonpickle==1.4.1
 jsonpointer==2.0
 jsonschema==3.2.0
@@ -239,7 +239,7 @@ networkx==2.4
 nodeenv==1.4.0
 nteract-scrapbook==0.4.1
 ntlm-auth==1.5.0
-numpy==1.18.5
+numpy==1.19.0
 oauthlib==2.1.0
 openapi-spec-validator==0.2.8
 oscrypto==1.2.0
@@ -380,14 +380,14 @@ uritemplate==3.0.1
 urllib3==1.25.9
 vertica-python==0.10.4
 vine==1.3.0
-virtualenv==20.0.23
+virtualenv==20.0.24
 watchtower==0.7.3
 wcwidth==0.2.4
 websocket-client==0.57.0
 wrapt==1.12.1
 xmltodict==0.12.0
 yamllint==1.23.0
-yandexcloud==0.41.0
+yandexcloud==0.42.0
 yarl==1.4.2
 zdesk==2.7.1
 zict==2.0.0
diff --git a/requirements/requirements-python3.7.txt 
b/requirements/requirements-python3.7.txt
index 8d70d17..06d4761 100644
--- a/requirements/requirements-python3.7.txt
+++ b/requirements/requirements-python3.7.txt
@@ -72,9 +72,9 @@ beautifulsoup4==4.7.1
 billiard==3.6.3.0
 black==19.10b0
 blinker==1.4
-boto3==1.14.7
+boto3==1.14.8
 boto==2.49.0
-botocore==1.17.7
+botocore==1.17.8
 bowler==0.8.0
 cached-property==1.5.1
 cachetools==4.1.0
@@ -100,7 +100,7 @@ cryptography==2.9.2
 curlify==2.2.1
 cx-Oracle==7.3.0
 dask==2.19.0
-datadog==0.36.0
+datadog==0.37.0
 decorator==4.4.2
 defusedxml==0.6.0
 dill==0.3.2
@@ -119,8 +119,8 @@ email-validator==1.1.1
 entrypoints==0.3
 eventlet==0.25.2
 execnet==1.7.1
-facebook-business==7.0.2
-fastavro==0.23.4
+facebook-business==7.0.3
+fastavro==0.23.5
 filelock==3.0.12
 fissix==20.5.1
 flake8-colors==0.1.6
@@ -179,7 +179,7 @@ hmsclient==0.1.1
 httplib2==0.18.1
 humanize==0.5.1
 hvac==0.10.4
-identify==1.4.19
+identify==1.4.20
 idna==2.9
 ijson==2.6.1
 imagesize==1.2.0
@@ -197,7 +197,7 @@ jira==2.0

[GitHub] [airflow] kaxil merged pull request #9487: Replace deprecated wtforms HTMLString with markupsafe.MarkUp

2020-06-23 Thread GitBox


kaxil merged pull request #9487:
URL: https://github.com/apache/airflow/pull/9487


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[airflow] branch master updated: Remove need of datetime.timezone in test_views.py (#9479)

2020-06-23 Thread kaxilnaik
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
 new 9997aff  Remove need of datetime.timezone in test_views.py (#9479)
9997aff is described below

commit 9997aff10bf4c1bfbde38fa65d7961ec431f75dc
Author: Kaxil Naik 
AuthorDate: Tue Jun 23 22:27:09 2020 +0100

Remove need of datetime.timezone in test_views.py (#9479)
---
 tests/www/test_views.py | 12 ++--
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git a/tests/www/test_views.py b/tests/www/test_views.py
index 1cfbbc8..70cb0c6 100644
--- a/tests/www/test_views.py
+++ b/tests/www/test_views.py
@@ -29,7 +29,7 @@ import tempfile
 import unittest
 import urllib
 from contextlib import contextmanager
-from datetime import datetime as dt, timedelta, timezone as tz
+from datetime import datetime as dt, timedelta
 from typing import Any, Dict, Generator, List, NamedTuple
 from unittest import mock
 from unittest.mock import PropertyMock
@@ -2626,7 +2626,7 @@ class TestDagRunModelView(TestBase):
 
 dr = self.session.query(models.DagRun).one()
 
-self.assertEqual(dr.execution_date, dt(2018, 7, 6, 5, 4, 3, 
tzinfo=tz.utc))
+self.assertEqual(dr.execution_date, timezone.datetime(2018, 7, 6, 5, 
4, 3))
 
 def test_create_dagrun_execution_date_with_timezone_edt(self):
 data = {
@@ -2642,7 +2642,7 @@ class TestDagRunModelView(TestBase):
 
 dr = self.session.query(models.DagRun).one()
 
-self.assertEqual(dr.execution_date, dt(2018, 7, 6, 5, 4, 3, 
tzinfo=tz(timedelta(hours=-4
+self.assertEqual(dr.execution_date, timezone.datetime(2018, 7, 6, 9, 
4, 3))
 
 def test_create_dagrun_execution_date_with_timezone_pst(self):
 data = {
@@ -2658,7 +2658,7 @@ class TestDagRunModelView(TestBase):
 
 dr = self.session.query(models.DagRun).one()
 
-self.assertEqual(dr.execution_date, dt(2018, 7, 6, 5, 4, 3, 
tzinfo=tz(timedelta(hours=-8
+self.assertEqual(dr.execution_date, timezone.datetime(2018, 7, 6, 13, 
4, 3))
 
 @conf_vars({("core", "default_timezone"): "America/Toronto"})
 def test_create_dagrun_execution_date_without_timezone_default_edt(self):
@@ -2675,7 +2675,7 @@ class TestDagRunModelView(TestBase):
 
 dr = self.session.query(models.DagRun).one()
 
-self.assertEqual(dr.execution_date, dt(2018, 7, 6, 5, 4, 3, 
tzinfo=tz(timedelta(hours=-4
+self.assertEqual(dr.execution_date, timezone.datetime(2018, 7, 6, 9, 
4, 3))
 
 def test_create_dagrun_execution_date_without_timezone_default_utc(self):
 data = {
@@ -2691,7 +2691,7 @@ class TestDagRunModelView(TestBase):
 
 dr = self.session.query(models.DagRun).one()
 
-self.assertEqual(dr.execution_date, dt(2018, 7, 6, 5, 4, 3, 
tzinfo=tz.utc))
+self.assertEqual(dr.execution_date, dt(2018, 7, 6, 5, 4, 3, 
tzinfo=timezone.TIMEZONE))
 
 def test_create_dagrun_valid_conf(self):
 conf_value = dict(Valid=True)



  1   2   >