Repository: incubator-airflow
Updated Branches:
  refs/heads/master 7961ee8f0 -> 1d93d1b54


[AIRFLOW-2661] fix config dags_volume_subpath and logs_volume_subpath

Make sure you have checked _all_ steps below.

### JIRA
- [x] My PR addresses the following [Airflow JIRA]
(https://issues.apache.org/jira/browse/AIRFLOW/)
issues and references them in the PR title. For
example, "\[AIRFLOW-XXX\] My Airflow PR"
    -
https://issues.apache.org/jira/browse/AIRFLOW-2661
    - In case you are fixing a typo in the
documentation you can prepend your commit with
\[AIRFLOW-XXX\], code changes always need a JIRA
issue.

### Description
- [x] Here are some details about my PR, including
screenshots of any UI changes:
Changes the use of `log_volume_subpath` and
`dags_volume_subpath` which are
now passed into the construction of the worker
pod's volumeMounts instead of
the volume section (where subPath is not valid).

### Tests
- [x] My PR adds the following unit tests __OR__
does not need testing for this extremely good
reason:
Unit tests have been added but I'm not sure how to
add integration tests
for this without breaking the other minikube tests

### Commits
- [x] My commits all reference JIRA issues in
their subject lines, and I have squashed multiple
commits if they address the same issue. In
addition, my commits follow the guidelines from
"[How to write a good git commit
message](http://chris.beams.io/posts/git-
commit/)":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not
"adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

### Documentation
- [x] In case of new functionality, my PR adds
documentation that describes how to use it.

No new functionality added

### Code Quality
- [x] Passes `git diff upstream/master -u --
"*.py" | flake8 --diff`

Closes #3537 from r4vi/AIRFLOW-2661


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1d93d1b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1d93d1b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1d93d1b5

Branch: refs/heads/master
Commit: 1d93d1b5499c8061c17f9250cb4729532c61a109
Parents: 7961ee8
Author: Ravi Kotecha <kotecha.r...@gmail.com>
Authored: Fri Jun 22 16:37:46 2018 +0200
Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com>
Committed: Fri Jun 22 16:37:46 2018 +0200

----------------------------------------------------------------------
 .../contrib/kubernetes/worker_configuration.py  | 41 +++++++------
 .../executors/test_kubernetes_executor.py       | 62 ++++++++++++++++++++
 2 files changed, 86 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1d93d1b5/airflow/contrib/kubernetes/worker_configuration.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/kubernetes/worker_configuration.py 
b/airflow/contrib/kubernetes/worker_configuration.py
index 2654085..784bb77 100644
--- a/airflow/contrib/kubernetes/worker_configuration.py
+++ b/airflow/contrib/kubernetes/worker_configuration.py
@@ -106,34 +106,31 @@ class WorkerConfiguration(LoggingMixin):
         dags_volume_name = 'airflow-dags'
         logs_volume_name = 'airflow-logs'
 
-        def _construct_volume(name, claim, subpath=None):
-            vo = {
+        def _construct_volume(name, claim):
+            volume = {
                 'name': name
             }
             if claim:
-                vo['persistentVolumeClaim'] = {
+                volume['persistentVolumeClaim'] = {
                     'claimName': claim
                 }
-                if subpath:
-                    vo['subPath'] = subpath
             else:
-                vo['emptyDir'] = {}
-            return vo
+                volume['emptyDir'] = {}
+            return volume
 
         volumes = [
             _construct_volume(
                 dags_volume_name,
-                self.kube_config.dags_volume_claim,
-                self.kube_config.dags_volume_subpath
+                self.kube_config.dags_volume_claim
             ),
             _construct_volume(
                 logs_volume_name,
-                self.kube_config.logs_volume_claim,
-                self.kube_config.logs_volume_subpath
+                self.kube_config.logs_volume_claim
             )
         ]
 
         dag_volume_mount_path = ""
+
         if self.kube_config.dags_volume_claim:
             dag_volume_mount_path = self.worker_airflow_dags
         else:
@@ -141,15 +138,25 @@ class WorkerConfiguration(LoggingMixin):
                 self.worker_airflow_dags,
                 self.kube_config.git_subpath
             )
-
-        volume_mounts = [{
+        dags_volume_mount = {
             'name': dags_volume_name,
             'mountPath': dag_volume_mount_path,
-            'readOnly': True
-        }, {
+            'readOnly': True,
+        }
+        if self.kube_config.dags_volume_subpath:
+            dags_volume_mount['subPath'] = self.kube_config.dags_volume_subpath
+
+        logs_volume_mount = {
             'name': logs_volume_name,
-            'mountPath': self.worker_airflow_logs
-        }]
+            'mountPath': self.worker_airflow_logs,
+        }
+        if self.kube_config.dags_volume_subpath:
+            logs_volume_mount['subPath'] = self.kube_config.logs_volume_subpath
+
+        volume_mounts = [
+            dags_volume_mount,
+            logs_volume_mount
+        ]
 
         # Mount the airflow.cfg file via a configmap the user has specified
         if self.kube_config.airflow_configmap:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1d93d1b5/tests/contrib/executors/test_kubernetes_executor.py
----------------------------------------------------------------------
diff --git a/tests/contrib/executors/test_kubernetes_executor.py 
b/tests/contrib/executors/test_kubernetes_executor.py
index 0a38920..963efcb 100644
--- a/tests/contrib/executors/test_kubernetes_executor.py
+++ b/tests/contrib/executors/test_kubernetes_executor.py
@@ -14,6 +14,7 @@
 #
 
 import unittest
+import mock
 import re
 import string
 import random
@@ -21,6 +22,7 @@ from datetime import datetime
 
 try:
     from airflow.contrib.executors.kubernetes_executor import 
AirflowKubernetesScheduler
+    from airflow.contrib.kubernetes.worker_configuration import 
WorkerConfiguration
 except ImportError:
     AirflowKubernetesScheduler = None
 
@@ -72,5 +74,65 @@ class TestAirflowKubernetesScheduler(unittest.TestCase):
         self.assertEquals(datetime_obj, new_datetime_obj)
 
 
+class TestKubernetesWorkerConfiguration(unittest.TestCase):
+    """
+    Tests that if dags_volume_subpath/logs_volume_subpath configuration
+    options are passed to worker pod config
+    """
+    def setUp(self):
+        if AirflowKubernetesScheduler is None:
+            self.skipTest("kubernetes python package is not installed")
+
+        self.pod = mock.patch(
+            'airflow.contrib.kubernetes.worker_configuration.Pod'
+        )
+        self.resources = mock.patch(
+            'airflow.contrib.kubernetes.worker_configuration.Resources'
+        )
+        self.secret = mock.patch(
+            'airflow.contrib.kubernetes.worker_configuration.Secret'
+        )
+
+        for patcher in [self.pod, self.resources, self.secret]:
+            self.mock_foo = patcher.start()
+            self.addCleanup(patcher.stop)
+
+        self.kube_config = mock.MagicMock()
+        self.kube_config.airflow_home = '/'
+        self.kube_config.airflow_dags = 'dags'
+        self.kube_config.airflow_dags = 'logs'
+        self.kube_config.dags_volume_subpath = None
+        self.kube_config.logs_volume_subpath = None
+
+    def test_worker_configuration_no_subpaths(self):
+        worker_config = WorkerConfiguration(self.kube_config)
+        volumes, volume_mounts = worker_config.init_volumes_and_mounts()
+        for volume_or_mount in volumes + volume_mounts:
+            if volume_or_mount['name'] != 'airflow-config':
+                self.assertNotIn(
+                    'subPath', volume_or_mount,
+                    "subPath shouldn't be defined"
+                )
+
+    def test_worker_with_subpaths(self):
+        self.kube_config.dags_volume_subpath = 'dags'
+        self.kube_config.logs_volume_subpath = 'logs'
+        worker_config = WorkerConfiguration(self.kube_config)
+        volumes, volume_mounts = worker_config.init_volumes_and_mounts()
+
+        for volume in volumes:
+            self.assertNotIn(
+                'subPath', volume,
+                "subPath isn't valid configuration for a volume"
+            )
+
+        for volume_mount in volume_mounts:
+            if volume_mount['name'] != 'airflow-config':
+                self.assertIn(
+                    'subPath', volume_mount,
+                    "subPath should've been passed to volumeMount 
configuration"
+                )
+
+
 if __name__ == '__main__':
     unittest.main()

Reply via email to