Repository: incubator-airflow Updated Branches: refs/heads/master 7961ee8f0 -> 1d93d1b54
[AIRFLOW-2661] fix config dags_volume_subpath and logs_volume_subpath Make sure you have checked _all_ steps below. ### JIRA - [x] My PR addresses the following [Airflow JIRA] (https://issues.apache.org/jira/browse/AIRFLOW/) issues and references them in the PR title. For example, "\[AIRFLOW-XXX\] My Airflow PR" - https://issues.apache.org/jira/browse/AIRFLOW-2661 - In case you are fixing a typo in the documentation you can prepend your commit with \[AIRFLOW-XXX\], code changes always need a JIRA issue. ### Description - [x] Here are some details about my PR, including screenshots of any UI changes: Changes the use of `log_volume_subpath` and `dags_volume_subpath` which are now passed into the construction of the worker pod's volumeMounts instead of the volume section (where subPath is not valid). ### Tests - [x] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason: Unit tests have been added but I'm not sure how to add integration tests for this without breaking the other minikube tests ### Commits - [x] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git- commit/)": 1. Subject is separated from body by a blank line 2. Subject is limited to 50 characters 3. Subject does not end with a period 4. Subject uses the imperative mood ("add", not "adding") 5. Body wraps at 72 characters 6. Body explains "what" and "why", not "how" ### Documentation - [x] In case of new functionality, my PR adds documentation that describes how to use it. No new functionality added ### Code Quality - [x] Passes `git diff upstream/master -u -- "*.py" | flake8 --diff` Closes #3537 from r4vi/AIRFLOW-2661 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1d93d1b5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1d93d1b5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1d93d1b5 Branch: refs/heads/master Commit: 1d93d1b5499c8061c17f9250cb4729532c61a109 Parents: 7961ee8 Author: Ravi Kotecha <kotecha.r...@gmail.com> Authored: Fri Jun 22 16:37:46 2018 +0200 Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com> Committed: Fri Jun 22 16:37:46 2018 +0200 ---------------------------------------------------------------------- .../contrib/kubernetes/worker_configuration.py | 41 +++++++------ .../executors/test_kubernetes_executor.py | 62 ++++++++++++++++++++ 2 files changed, 86 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1d93d1b5/airflow/contrib/kubernetes/worker_configuration.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py index 2654085..784bb77 100644 --- a/airflow/contrib/kubernetes/worker_configuration.py +++ b/airflow/contrib/kubernetes/worker_configuration.py @@ -106,34 +106,31 @@ class WorkerConfiguration(LoggingMixin): dags_volume_name = 'airflow-dags' logs_volume_name = 'airflow-logs' - def _construct_volume(name, claim, subpath=None): - vo = { + def _construct_volume(name, claim): + volume = { 'name': name } if claim: - vo['persistentVolumeClaim'] = { + volume['persistentVolumeClaim'] = { 'claimName': claim } - if subpath: - vo['subPath'] = subpath else: - vo['emptyDir'] = {} - return vo + volume['emptyDir'] = {} + return volume volumes = [ _construct_volume( dags_volume_name, - self.kube_config.dags_volume_claim, - self.kube_config.dags_volume_subpath + self.kube_config.dags_volume_claim ), _construct_volume( logs_volume_name, - self.kube_config.logs_volume_claim, - self.kube_config.logs_volume_subpath + self.kube_config.logs_volume_claim ) ] dag_volume_mount_path = "" + if self.kube_config.dags_volume_claim: dag_volume_mount_path = self.worker_airflow_dags else: @@ -141,15 +138,25 @@ class WorkerConfiguration(LoggingMixin): self.worker_airflow_dags, self.kube_config.git_subpath ) - - volume_mounts = [{ + dags_volume_mount = { 'name': dags_volume_name, 'mountPath': dag_volume_mount_path, - 'readOnly': True - }, { + 'readOnly': True, + } + if self.kube_config.dags_volume_subpath: + dags_volume_mount['subPath'] = self.kube_config.dags_volume_subpath + + logs_volume_mount = { 'name': logs_volume_name, - 'mountPath': self.worker_airflow_logs - }] + 'mountPath': self.worker_airflow_logs, + } + if self.kube_config.dags_volume_subpath: + logs_volume_mount['subPath'] = self.kube_config.logs_volume_subpath + + volume_mounts = [ + dags_volume_mount, + logs_volume_mount + ] # Mount the airflow.cfg file via a configmap the user has specified if self.kube_config.airflow_configmap: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1d93d1b5/tests/contrib/executors/test_kubernetes_executor.py ---------------------------------------------------------------------- diff --git a/tests/contrib/executors/test_kubernetes_executor.py b/tests/contrib/executors/test_kubernetes_executor.py index 0a38920..963efcb 100644 --- a/tests/contrib/executors/test_kubernetes_executor.py +++ b/tests/contrib/executors/test_kubernetes_executor.py @@ -14,6 +14,7 @@ # import unittest +import mock import re import string import random @@ -21,6 +22,7 @@ from datetime import datetime try: from airflow.contrib.executors.kubernetes_executor import AirflowKubernetesScheduler + from airflow.contrib.kubernetes.worker_configuration import WorkerConfiguration except ImportError: AirflowKubernetesScheduler = None @@ -72,5 +74,65 @@ class TestAirflowKubernetesScheduler(unittest.TestCase): self.assertEquals(datetime_obj, new_datetime_obj) +class TestKubernetesWorkerConfiguration(unittest.TestCase): + """ + Tests that if dags_volume_subpath/logs_volume_subpath configuration + options are passed to worker pod config + """ + def setUp(self): + if AirflowKubernetesScheduler is None: + self.skipTest("kubernetes python package is not installed") + + self.pod = mock.patch( + 'airflow.contrib.kubernetes.worker_configuration.Pod' + ) + self.resources = mock.patch( + 'airflow.contrib.kubernetes.worker_configuration.Resources' + ) + self.secret = mock.patch( + 'airflow.contrib.kubernetes.worker_configuration.Secret' + ) + + for patcher in [self.pod, self.resources, self.secret]: + self.mock_foo = patcher.start() + self.addCleanup(patcher.stop) + + self.kube_config = mock.MagicMock() + self.kube_config.airflow_home = '/' + self.kube_config.airflow_dags = 'dags' + self.kube_config.airflow_dags = 'logs' + self.kube_config.dags_volume_subpath = None + self.kube_config.logs_volume_subpath = None + + def test_worker_configuration_no_subpaths(self): + worker_config = WorkerConfiguration(self.kube_config) + volumes, volume_mounts = worker_config.init_volumes_and_mounts() + for volume_or_mount in volumes + volume_mounts: + if volume_or_mount['name'] != 'airflow-config': + self.assertNotIn( + 'subPath', volume_or_mount, + "subPath shouldn't be defined" + ) + + def test_worker_with_subpaths(self): + self.kube_config.dags_volume_subpath = 'dags' + self.kube_config.logs_volume_subpath = 'logs' + worker_config = WorkerConfiguration(self.kube_config) + volumes, volume_mounts = worker_config.init_volumes_and_mounts() + + for volume in volumes: + self.assertNotIn( + 'subPath', volume, + "subPath isn't valid configuration for a volume" + ) + + for volume_mount in volume_mounts: + if volume_mount['name'] != 'airflow-config': + self.assertIn( + 'subPath', volume_mount, + "subPath should've been passed to volumeMount configuration" + ) + + if __name__ == '__main__': unittest.main()