[ 
https://issues.apache.org/jira/browse/AIRFLOW-3501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16720414#comment-16720414
 ] 

ASF GitHub Bot commented on AIRFLOW-3501:
-----------------------------------------

kppullin closed pull request #4307: [AIRFLOW-3501] k8s executor - Support 
loading dags from image.
URL: https://github.com/apache/incubator-airflow/pull/4307
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index a9473178c1..2024bfc34e 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -624,6 +624,10 @@ git_user =
 git_password =
 git_subpath =
 
+# If True, use the dags that exist in the docker container instead of pulling
+# from git or a dag volume claim.
+use_container_dags =
+
 # For cloning DAGs from git repositories into volumes: 
https://github.com/kubernetes/git-sync
 git_sync_container_repository = gcr.io/google-containers/git-sync-amd64
 git_sync_container_tag = v2.0.5
diff --git a/airflow/contrib/executors/kubernetes_executor.py 
b/airflow/contrib/executors/kubernetes_executor.py
index 6c1bd222b9..d4ac7f0dd7 100644
--- a/airflow/contrib/executors/kubernetes_executor.py
+++ b/airflow/contrib/executors/kubernetes_executor.py
@@ -150,6 +150,10 @@ def __init__(self):
         self.git_user = conf.get(self.kubernetes_section, 'git_user')
         self.git_password = conf.get(self.kubernetes_section, 'git_password')
 
+        # If True, use the dags that exist in the docker container instead of 
pulling
+        # from git or a dag volume claim.
+        self.use_container_dags = conf.get(self.kubernetes_section, 
'use_container_dags')
+
         # NOTE: The user may optionally use a volume claim to mount a PV 
containing
         # DAGs directly
         self.dags_volume_claim = conf.get(self.kubernetes_section, 
'dags_volume_claim')
@@ -204,10 +208,13 @@ def __init__(self):
         self._validate()
 
     def _validate(self):
-        if not self.dags_volume_claim and (not self.git_repo or not 
self.git_branch):
+        if not self.dags_volume_claim \
+                and (not self.git_repo or not self.git_branch) \
+                and not self.use_container_dags:
             raise AirflowConfigException(
                 'In kubernetes mode the following must be set in the 
`kubernetes` '
-                'config section: `dags_volume_claim` or `git_repo and 
git_branch`')
+                'config section: `dags_volume_claim` or `use_container_dags` '
+                'or `git_repo and git_branch`')
 
 
 class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object):
diff --git a/airflow/contrib/kubernetes/worker_configuration.py 
b/airflow/contrib/kubernetes/worker_configuration.py
index f857cbc237..a98d083b72 100644
--- a/airflow/contrib/kubernetes/worker_configuration.py
+++ b/airflow/contrib/kubernetes/worker_configuration.py
@@ -37,8 +37,8 @@ def __init__(self, kube_config):
 
     def _get_init_containers(self, volume_mounts):
         """When using git to retrieve the DAGs, use the GitSync Init 
Container"""
-        # If we're using volume claims to mount the dags, no init container is 
needed
-        if self.kube_config.dags_volume_claim:
+        # If we're using container dags or a volume claim to mount the dags, 
no init container is needed
+        if self.kube_config.dags_volume_claim or 
self.kube_config.use_container_dags:
             return []
 
         # Otherwise, define a git-sync init container
@@ -128,33 +128,12 @@ def _construct_volume(name, claim):
             return volume
 
         volumes = [
-            _construct_volume(
-                dags_volume_name,
-                self.kube_config.dags_volume_claim
-            ),
             _construct_volume(
                 logs_volume_name,
                 self.kube_config.logs_volume_claim
             )
         ]
 
-        dag_volume_mount_path = ""
-
-        if self.kube_config.dags_volume_claim:
-            dag_volume_mount_path = self.worker_airflow_dags
-        else:
-            dag_volume_mount_path = os.path.join(
-                self.worker_airflow_dags,
-                self.kube_config.git_subpath
-            )
-        dags_volume_mount = {
-            'name': dags_volume_name,
-            'mountPath': dag_volume_mount_path,
-            'readOnly': True,
-        }
-        if self.kube_config.dags_volume_subpath:
-            dags_volume_mount['subPath'] = self.kube_config.dags_volume_subpath
-
         logs_volume_mount = {
             'name': logs_volume_name,
             'mountPath': self.worker_airflow_logs,
@@ -162,10 +141,34 @@ def _construct_volume(name, claim):
         if self.kube_config.logs_volume_subpath:
             logs_volume_mount['subPath'] = self.kube_config.logs_volume_subpath
 
-        volume_mounts = [
-            dags_volume_mount,
-            logs_volume_mount
-        ]
+        volume_mounts = [logs_volume_mount]
+
+        # Build the dags volume if not using container dags
+        if not self.kube_config.use_container_dags:
+            dags_volume = _construct_volume(
+                dags_volume_name,
+                self.kube_config.dags_volume_claim
+            )
+            volumes.append(dags_volume)
+
+            dag_volume_mount_path = ""
+
+            if self.kube_config.dags_volume_claim:
+                dag_volume_mount_path = self.worker_airflow_dags
+            else:
+                dag_volume_mount_path = os.path.join(
+                    self.worker_airflow_dags,
+                    self.kube_config.git_subpath
+                )
+            dags_volume_mount = {
+                'name': dags_volume_name,
+                'mountPath': dag_volume_mount_path,
+                'readOnly': True,
+            }
+            if self.kube_config.dags_volume_subpath:
+                dags_volume_mount['subPath'] = 
self.kube_config.dags_volume_subpath
+
+            volume_mounts.append(dags_volume_mount)
 
         # Mount the airflow.cfg file via a configmap the user has specified
         if self.kube_config.airflow_configmap:
diff --git a/tests/contrib/executors/test_kubernetes_executor.py 
b/tests/contrib/executors/test_kubernetes_executor.py
index 1307e500cf..37b64cb277 100644
--- a/tests/contrib/executors/test_kubernetes_executor.py
+++ b/tests/contrib/executors/test_kubernetes_executor.py
@@ -109,6 +109,7 @@ def setUp(self):
         self.kube_config.airflow_dags = 'logs'
         self.kube_config.dags_volume_subpath = None
         self.kube_config.logs_volume_subpath = None
+        self.kube_config.use_container_dags = None
 
     def test_worker_configuration_no_subpaths(self):
         worker_config = WorkerConfiguration(self.kube_config)
@@ -155,6 +156,61 @@ def 
test_worker_environment_when_dags_folder_specified(self):
 
         self.assertEqual(dags_folder, env['AIRFLOW__CORE__DAGS_FOLDER'])
 
+    def test_worker_pvc_dags(self):
+        # Tests persistence volume config created when `dags_volume_claim` is 
set
+        self.kube_config.dags_volume_claim = 'airflow-dags'
+
+        worker_config = WorkerConfiguration(self.kube_config)
+        volumes, volume_mounts = worker_config.init_volumes_and_mounts()
+
+        dag_volume = [volume for volume in volumes if volume['name'] == 
'airflow-dags']
+        dag_volume_mount = [mount for mount in volume_mounts if mount['name'] 
== 'airflow-dags']
+
+        self.assertEqual('airflow-dags', 
dag_volume[0]['persistentVolumeClaim']['claimName'])
+        self.assertEqual(1, len(dag_volume_mount))
+
+    def test_worker_git_dags(self):
+        # Tests persistence volume config created when `git_repo` is set
+        self.kube_config.dags_volume_claim = None
+        self.kube_config.dags_folder = '/usr/local/airflow/dags'
+        self.kube_config.worker_dags_folder = '/usr/local/airflow/dags'
+
+        self.kube_config.git_sync_container_repository = 
'gcr.io/google-containers/git-sync-amd64'
+        self.kube_config.git_sync_container_tag = 'v2.0.5'
+        self.kube_config.git_sync_container = 
'gcr.io/google-containers/git-sync-amd64:v2.0.5'
+        self.kube_config.git_sync_init_container_name = 'git-sync-clone'
+        self.kube_config.git_subpath = ''
+
+        worker_config = WorkerConfiguration(self.kube_config)
+        volumes, volume_mounts = worker_config.init_volumes_and_mounts()
+
+        init_container = worker_config._get_init_containers(volume_mounts)[0]
+
+        dag_volume = [volume for volume in volumes if volume['name'] == 
'airflow-dags']
+        dag_volume_mount = [mount for mount in volume_mounts if mount['name'] 
== 'airflow-dags']
+
+        self.assertTrue('emptyDir' in dag_volume[0])
+        self.assertEqual('/usr/local/airflow/dags/', 
dag_volume_mount[0]['mountPath'])
+
+        self.assertEqual('git-sync-clone', init_container['name'])
+        self.assertEqual('gcr.io/google-containers/git-sync-amd64:v2.0.5', 
init_container['image'])
+
+    def test_worker_container_dags(self):
+        # Tests persistence volume config created when `use_container_dags` is 
set
+        self.kube_config.use_container_dags = True
+
+        worker_config = WorkerConfiguration(self.kube_config)
+        volumes, volume_mounts = worker_config.init_volumes_and_mounts()
+
+        dag_volume = [volume for volume in volumes if volume['name'] == 
'airflow-dags']
+        dag_volume_mount = [mount for mount in volume_mounts if mount['name'] 
== 'airflow-dags']
+
+        init_containers = worker_config._get_init_containers(volume_mounts)
+
+        self.assertEqual(0, len(dag_volume))
+        self.assertEqual(0, len(dag_volume_mount))
+        self.assertEqual(0, len(init_containers))
+
 
 class TestKubernetesExecutor(unittest.TestCase):
     """


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add config option to load dags in an image with the kubernetes executor.
> ------------------------------------------------------------------------
>
>                 Key: AIRFLOW-3501
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3501
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: kubernetes
>            Reporter: Kevin Pullin
>            Priority: Major
>
> Currently the airflow kubernetes executor forces loading dags either from a 
> volume claim or an init container.  There should be an option to bypass these 
> settings and instead use dags packaged into the running image.
> The motivation for this change is to allow for an airflow image to be built 
> and released via a CI/CD pipeline upon a new commit to a dag repository.  For 
> example, given a new git commit to a dag repo, a CI/CD server can build an 
> airflow docker image, run tests against the current dags, and finally push 
> the entire bundle as a single, complete, well-known unit to kubernetes.
> There's no need to worry that a git init container will fail, having to have 
> a separate pipeline to update dags on a shared volume, etc.  And if issues 
> arise from an update, the configuration can be easily rolled back to the 
> prior version of the image.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to