[ https://issues.apache.org/jira/browse/AIRFLOW-2770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16722734#comment-16722734 ]
ASF GitHub Bot commented on AIRFLOW-2770: ----------------------------------------- feng-tao closed pull request #4319: [AIRFLOW-2770] Read `dags_in_image` config value as a boolean URL: https://github.com/apache/incubator-airflow/pull/4319 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index a9473178c1..8dc7db5e09 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -605,6 +605,10 @@ namespace = default # The name of the Kubernetes ConfigMap Containing the Airflow Configuration (this file) airflow_configmap = +# For docker image already contains DAGs, this is set to `True`, and the worker will search for dags in dags_folder, +# otherwise use git sync or dags volume claim to mount DAGs +dags_in_image = False + # For either git sync or volume mounted DAGs, the worker will look in this subpath for DAGs dags_volume_subpath = diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index f9d9ddb0fc..9342ce0a3e 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -137,6 +137,10 @@ def __init__(self): self.kubernetes_section, 'worker_service_account_name') self.image_pull_secrets = conf.get(self.kubernetes_section, 'image_pull_secrets') + # NOTE: user can build the dags into the docker image directly, + # this will set to True if so + self.dags_in_image = conf.getboolean(self.kubernetes_section, 'dags_in_image') + # NOTE: `git_repo` and `git_branch` must be specified together as a pair # The http URL of the git repository to clone from self.git_repo = conf.get(self.kubernetes_section, 'git_repo') @@ -204,10 +208,12 @@ def __init__(self): self._validate() def _validate(self): - if not self.dags_volume_claim and (not self.git_repo or not self.git_branch): + if not self.dags_volume_claim and not self.dags_in_image \ + and (not self.git_repo or not self.git_branch): raise AirflowConfigException( 'In kubernetes mode the following must be set in the `kubernetes` ' - 'config section: `dags_volume_claim` or `git_repo and git_branch`') + 'config section: `dags_volume_claim` or `git_repo and git_branch` ' + 'or `dags_in_image`') class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin, object): diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py index f857cbc237..58cf9cbd20 100644 --- a/airflow/contrib/kubernetes/worker_configuration.py +++ b/airflow/contrib/kubernetes/worker_configuration.py @@ -38,7 +38,7 @@ def __init__(self, kube_config): def _get_init_containers(self, volume_mounts): """When using git to retrieve the DAGs, use the GitSync Init Container""" # If we're using volume claims to mount the dags, no init container is needed - if self.kube_config.dags_volume_claim: + if self.kube_config.dags_volume_claim or self.kube_config.dags_in_image: return [] # Otherwise, define a git-sync init container @@ -128,32 +128,19 @@ def _construct_volume(name, claim): return volume volumes = [ - _construct_volume( - dags_volume_name, - self.kube_config.dags_volume_claim - ), _construct_volume( logs_volume_name, self.kube_config.logs_volume_claim ) ] - dag_volume_mount_path = "" - - if self.kube_config.dags_volume_claim: - dag_volume_mount_path = self.worker_airflow_dags - else: - dag_volume_mount_path = os.path.join( - self.worker_airflow_dags, - self.kube_config.git_subpath + if not self.kube_config.dags_in_image: + volumes.append( + _construct_volume( + dags_volume_name, + self.kube_config.dags_volume_claim + ) ) - dags_volume_mount = { - 'name': dags_volume_name, - 'mountPath': dag_volume_mount_path, - 'readOnly': True, - } - if self.kube_config.dags_volume_subpath: - dags_volume_mount['subPath'] = self.kube_config.dags_volume_subpath logs_volume_mount = { 'name': logs_volume_name, @@ -163,10 +150,28 @@ def _construct_volume(name, claim): logs_volume_mount['subPath'] = self.kube_config.logs_volume_subpath volume_mounts = [ - dags_volume_mount, logs_volume_mount ] + if not self.kube_config.dags_in_image: + dag_volume_mount_path = "" + + if self.kube_config.dags_volume_claim: + dag_volume_mount_path = self.worker_airflow_dags + else: + dag_volume_mount_path = os.path.join( + self.worker_airflow_dags, + self.kube_config.git_subpath + ) + dags_volume_mount = { + 'name': dags_volume_name, + 'mountPath': dag_volume_mount_path, + 'readOnly': True, + } + if self.kube_config.dags_volume_subpath: + dags_volume_mount['subPath'] = self.kube_config.dags_volume_subpath + volume_mounts.append(dags_volume_mount) + # Mount the airflow.cfg file via a configmap the user has specified if self.kube_config.airflow_configmap: config_volume_name = 'airflow-config' diff --git a/scripts/ci/kubernetes/kube/configmaps.yaml b/scripts/ci/kubernetes/kube/configmaps.yaml index 93a6364f86..759c7d637f 100644 --- a/scripts/ci/kubernetes/kube/configmaps.yaml +++ b/scripts/ci/kubernetes/kube/configmaps.yaml @@ -178,6 +178,7 @@ data: worker_container_image_pull_policy = IfNotPresent worker_dags_folder = /tmp/dags delete_worker_pods = True + dags_in_image = False git_repo = https://github.com/apache/incubator-airflow.git git_branch = master git_subpath = airflow/example_dags/ diff --git a/tests/contrib/executors/test_kubernetes_executor.py b/tests/contrib/executors/test_kubernetes_executor.py index 1307e500cf..8f06f8f3a3 100644 --- a/tests/contrib/executors/test_kubernetes_executor.py +++ b/tests/contrib/executors/test_kubernetes_executor.py @@ -109,6 +109,7 @@ def setUp(self): self.kube_config.airflow_dags = 'logs' self.kube_config.dags_volume_subpath = None self.kube_config.logs_volume_subpath = None + self.kube_config.dags_in_image = False def test_worker_configuration_no_subpaths(self): worker_config = WorkerConfiguration(self.kube_config) @@ -155,6 +156,61 @@ def test_worker_environment_when_dags_folder_specified(self): self.assertEqual(dags_folder, env['AIRFLOW__CORE__DAGS_FOLDER']) + def test_worker_pvc_dags(self): + # Tests persistence volume config created when `dags_volume_claim` is set + self.kube_config.dags_volume_claim = 'airflow-dags' + + worker_config = WorkerConfiguration(self.kube_config) + volumes, volume_mounts = worker_config.init_volumes_and_mounts() + + dag_volume = [volume for volume in volumes if volume['name'] == 'airflow-dags'] + dag_volume_mount = [mount for mount in volume_mounts if mount['name'] == 'airflow-dags'] + + self.assertEqual('airflow-dags', dag_volume[0]['persistentVolumeClaim']['claimName']) + self.assertEqual(1, len(dag_volume_mount)) + + def test_worker_git_dags(self): + # Tests persistence volume config created when `git_repo` is set + self.kube_config.dags_volume_claim = None + self.kube_config.dags_folder = '/usr/local/airflow/dags' + self.kube_config.worker_dags_folder = '/usr/local/airflow/dags' + + self.kube_config.git_sync_container_repository = 'gcr.io/google-containers/git-sync-amd64' + self.kube_config.git_sync_container_tag = 'v2.0.5' + self.kube_config.git_sync_container = 'gcr.io/google-containers/git-sync-amd64:v2.0.5' + self.kube_config.git_sync_init_container_name = 'git-sync-clone' + self.kube_config.git_subpath = '' + + worker_config = WorkerConfiguration(self.kube_config) + volumes, volume_mounts = worker_config.init_volumes_and_mounts() + + init_container = worker_config._get_init_containers(volume_mounts)[0] + + dag_volume = [volume for volume in volumes if volume['name'] == 'airflow-dags'] + dag_volume_mount = [mount for mount in volume_mounts if mount['name'] == 'airflow-dags'] + + self.assertTrue('emptyDir' in dag_volume[0]) + self.assertEqual('/usr/local/airflow/dags/', dag_volume_mount[0]['mountPath']) + + self.assertEqual('git-sync-clone', init_container['name']) + self.assertEqual('gcr.io/google-containers/git-sync-amd64:v2.0.5', init_container['image']) + + def test_worker_container_dags(self): + # Tests that the 'airflow-dags' persistence volume is NOT created when `dags_in_image` is set + self.kube_config.dags_in_image = True + + worker_config = WorkerConfiguration(self.kube_config) + volumes, volume_mounts = worker_config.init_volumes_and_mounts() + + dag_volume = [volume for volume in volumes if volume['name'] == 'airflow-dags'] + dag_volume_mount = [mount for mount in volume_mounts if mount['name'] == 'airflow-dags'] + + init_containers = worker_config._get_init_containers(volume_mounts) + + self.assertEqual(0, len(dag_volume)) + self.assertEqual(0, len(dag_volume_mount)) + self.assertEqual(0, len(init_containers)) + class TestKubernetesExecutor(unittest.TestCase): """ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > kubernetes: add support for dag folder in the docker image > ---------------------------------------------------------- > > Key: AIRFLOW-2770 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2770 > Project: Apache Airflow > Issue Type: Improvement > Reporter: Rurui Ye > Assignee: Rurui Ye > Priority: Critical > Fix For: 1.10.2 > > > currently the kube executor need to provider dag_volume_chain or git repo in > the config file, but if the user has build dag into their docker image, they > doesn't need to provider these two options, and they can manager their dag > version by manager the docker image version. > So I suppose we can add the a new configuration as > kube.config.dag_folder_path along with dag_volume_chain and git repo. with > this config, we can run the worker just from the dags in docker image. -- This message was sent by Atlassian JIRA (v7.6.3#76005)