ephraimbuddy opened a new issue #12418:
URL: https://github.com/apache/airflow/issues/12418
AttributeError is raised when a dictionary is passed to executor_config
using kubernetes executor.
**Apache Airflow version**: 2.0
**Kubernetes version (if you are using kubernetes)** (use `kubectl version`):
Client Version: version.Info{Major:"1", Minor:"19", GitVersion:"v1.19.2",
GitCommit:"f5743093fd1c663cb0cbc89748f730662345d44d", GitTreeState:"clean",
BuildDate:"2020-09-16T13:41:02Z", GoVersion:"go1.15", Compiler:"gc",
Platform:"linux/amd64"}
Server Version: version.Info{Major:"1", Minor:"19", GitVersion:"v1.19.1",
GitCommit:"206bcadf021e76c27513500ca24182692aabd17e", GitTreeState:"clean",
BuildDate:"2020-09-14T07:30:52Z", GoVersion:"go1.15", Compiler:"gc",
Platform:"linux/amd64"}
**Environment**:
- **OS** (e.g. from /etc/os-release): ubuntu
**What happened**:
While running the example dag
https://github.com/apache/airflow/blob/master/airflow/example_dags/example_kubernetes_executor.py
using the kubernetes executor, the scheduler raises an exception when trying
to run `three_task`. The exception is as below:
```
[2020-11-17 18:12:08,510] {base_executor.py:79} INFO - Adding to queue:
['airflow', 'tasks', 'run', 'example_kubernetes_executor', 'three_task',
'2020-11-17T18:11:10.769947+00:00', '--local', '--pool', 'default_pool',
'--subdir', '/usr/local/airflow/dags/example_kubernetes_executor.py']
[2020-11-17 18:12:08,523] {kubernetes_executor.py:543} INFO - Add task
TaskInstanceKey(dag_id='example_kubernetes_executor', task_id='three_task',
execution_date=datetime.datetime(2020, 11, 17, 18, 11, 10, 769947,
tzinfo=Timezone('UTC')), try_number=1) with command ['airflow', 'tasks', 'run',
'example_kubernetes_executor', 'three_task',
'2020-11-17T18:11:10.769947+00:00', '--local', '--pool', 'default_pool',
'--subdir', '/usr/local/airflow/dags/example_kubernetes_executor.py'] with
executor_config {'KubernetesExecutor': {'request_memory': '128Mi',
'limit_memory': '128Mi', 'tolerations': [{'key': 'dedicated', 'operator':
'Equal', 'value': 'airflow'}], 'affinity': {'podAntiAffinity':
{'requiredDuringSchedulingIgnoredDuringExecution': [{'topologyKey':
'kubernetes.io/hostname', 'labelSelector': {'matchExpressions': [{'key': 'app',
'operator': 'In', 'values': ['airflow']}]}}]}}}}
/usr/local/lib/python3.7/site-packages/airflow/kubernetes/pod_generator.py:197
DeprecationWarning: Using a dictionary for the executor_config is deprecated
and
will soon be removed.please use a `kubernetes.client.models.V1Pod` class
with a
"pod_override" key instead.
[2020-11-17 18:12:08,573] {kubernetes_executor.py:280} INFO - Kubernetes job
is (TaskInstanceKey(dag_id='example_kubernetes_executor', task_id='three_task',
execution_date=datetime.datetime(2020, 11, 17, 18, 11, 10, 769947,
tzinfo=Timezone('UTC')), try_number=1), ['airflow', 'tasks', 'run',
'example_kubernetes_executor', 'three_task',
'2020-11-17T18:11:10.769947+00:00', '--local', '--pool', 'default_pool',
'--subdir', '/usr/local/airflow/dags/example_kubernetes_executor.py'],
{'api_version': 'v1',
'kind': 'Pod',
'metadata': {'annotations': None,
'cluster_name': None,
'creation_timestamp': None,
'deletion_grace_period_seconds': None,
'deletion_timestamp': None,
'finalizers': None,
'generate_name': None,
'generation': None,
'initializers': None,
'labels': None,
'managed_fields': None,
'name': None,
'namespace': None,
'owner_references': None,
'resource_version': None,
'self_link': None,
'uid': None},
'spec': {'active_deadline_seconds': None,
'affinity': {'podAntiAffinity':
{'requiredDuringSchedulingIgnoredDuringExecution': [{'labelSelector':
{'matchExpressions': [{'key': 'app',
'operator': 'In',
'values':
['airflow']}]},
'topologyKey': 'kubernetes.io/hostname'}]}},
'automount_service_account_token': None,
'containers': [{'args': [],
'command': [],
'env': [],
'env_from': [],
'image': None,
'image_pull_policy': None,
'lifecycle': None,
'liveness_probe': None,
'name': 'base',
'ports': [],
'readiness_probe': None,
'resources': {'limits': {'memory': '128Mi'},
'requests': {'memory': '128Mi'}},
'security_context': None,
'stdin': None,
'stdin_once': None,
'termination_message_path': None,
'termination_message_policy': None,
'tty': None,
'volume_devices': None,
'volume_mounts': [],
'working_dir': None}],
'dns_config': None,
'dns_policy': None,
'enable_service_links': None,
'host_aliases': None,
'host_ipc': None,
'host_network': False,
'host_pid': None,
'hostname': None,
'image_pull_secrets': [],
'init_containers': None,
'node_name': None,
'node_selector': None,
'preemption_policy': None,
'priority': None,
'priority_class_name': None,
'readiness_gates': None,
'restart_policy': None,
'runtime_class_name': None,
'scheduler_name': None,
'security_context': None,
'service_account': None,
'service_account_name': None,
'share_process_namespace': None,
'subdomain': None,
'termination_grace_period_seconds': None,
'tolerations': [{'key': 'dedicated',
'operator': 'Equal',
'value': 'airflow'}],
'volumes': []},
'status': None}, None)
[2020-11-17 18:12:08,594] {scheduler_job.py:1301} ERROR - Exception when
executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
File
"/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line
1283, in _execute
self._run_scheduler_loop()
File
"/usr/local/lib/python3.7/site-packages/astronomer/airflow/version_check/plugin.py",
line 29, in run_before
fn(*args, **kwargs)
File
"/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line
1387, in _run_scheduler_loop
self.executor.heartbeat()
File
"/usr/local/lib/python3.7/site-packages/airflow/executors/base_executor.py",
line 159, in heartbeat
self.sync()
File
"/usr/local/lib/python3.7/site-packages/airflow/executors/kubernetes_executor.py",
line 603, in sync
self.kube_scheduler.run_next(task)
File
"/usr/local/lib/python3.7/site-packages/airflow/executors/kubernetes_executor.py",
line 313, in run_next
self.launcher.run_pod_async(pod,
**self.kube_config.kube_client_request_args)
File
"/usr/local/lib/python3.7/site-packages/airflow/kubernetes/pod_launcher.py",
line 90, in run_pod_async
pod_mutation_hook(pod)
File "/usr/local/airflow/config/airflow_local_settings.py", line 26, in
pod_mutation_hook
pod.spec.affinity = pod.spec.affinity.to_dict().update({})
AttributeError: 'dict' object has no attribute 'to_dict'
```
When the Affinity is changed to k8s object instead of the dictionary in the
example dag the error is
```
[2020-11-17 19:39:55,850] {kubernetes_executor.py:543} INFO - Add task
TaskInstanceKey(dag_id='example_kubernetes_executor', task_id='three_task',
execution_date=datetime.datetime(2020, 11, 17, 19, 23, 35, 400362,
tzinfo=Timezone('UTC')), try_number=1) with command ['airflow', 'tasks', 'run',
'example_kubernetes_executor', 'three_task',
'2020-11-17T19:23:35.400362+00:00', '--local', '--pool', 'default_pool',
'--subdir', '/usr/local/airflow/dags/example_kubernetes_executor.py'] with
executor_config {'KubernetesExecutor': {'request_memory': '128Mi',
'limit_memory': '128Mi', 'tolerations': [{'key': 'dedicated', 'operator':
'Equal', 'value': 'airflow'}], 'affinity':
"{'preferred_during_scheduling_ignored_during_execution': None,\n
'required_during_scheduling_ignored_during_execution': [{'label_selector':
{'match_expressions': [{'key': 'app',\n
'operator': 'in',\n
'values': ['airflow']}],\n
'match_labels': None},\n
'namespaces': None,\n
'topology_key': 'kubernetes.io/hostname'}]}"}}
/usr/local/lib/python3.7/site-packages/airflow/kubernetes/pod_generator.py:197
DeprecationWarning: Using a dictionary for the executor_config is deprecated
and
will soon be removed.please use a `kubernetes.client.models.V1Pod` class
with a
"pod_override" key instead.
[2020-11-17 19:39:55,921] {kubernetes_executor.py:543} INFO - Add task
TaskInstanceKey(dag_id='example_kubernetes_executor', task_id='four_task',
execution_date=datetime.datetime(2020, 11, 17, 19, 23, 35, 400362,
tzinfo=Timezone('UTC')), try_number=1) with command ['airflow', 'tasks', 'run',
'example_kubernetes_executor', 'four_task', '2020-11-17T19:23:35.400362+00:00',
'--local', '--pool', 'default_pool', '--subdir',
'/usr/local/airflow/dags/example_kubernetes_executor.py'] with executor_config
{'KubernetesExecutor': {'labels': {'foo': 'bar'}}}
[2020-11-17 19:39:55,923] {kubernetes_executor.py:543} INFO - Add task
TaskInstanceKey(dag_id='example_kubernetes_executor', task_id='two_task',
execution_date=datetime.datetime(2020, 11, 17, 19, 23, 35, 400362,
tzinfo=Timezone('UTC')), try_number=1) with command ['airflow', 'tasks', 'run',
'example_kubernetes_executor', 'two_task', '2020-11-17T19:23:35.400362+00:00',
'--local', '--pool', 'default_pool', '--subdir',
'/usr/local/airflow/dags/example_kubernetes_executor.py'] with executor_config
{'KubernetesExecutor': {'image': 'newproj:0.2'}}
[2020-11-17 19:39:55,928] {kubernetes_executor.py:280} INFO - Kubernetes job
is (TaskInstanceKey(dag_id='example_kubernetes_executor', task_id='three_task',
execution_date=datetime.datetime(2020, 11, 17, 19, 23, 35, 400362,
tzinfo=Timezone('UTC')), try_number=1), ['airflow', 'tasks', 'run',
'example_kubernetes_executor', 'three_task',
'2020-11-17T19:23:35.400362+00:00', '--local', '--pool', 'default_pool',
'--subdir', '/usr/local/airflow/dags/example_kubernetes_executor.py'],
{'api_version': 'v1',
'kind': 'Pod',
'metadata': {'annotations': None,
'cluster_name': None,
'creation_timestamp': None,
'deletion_grace_period_seconds': None,
'deletion_timestamp': None,
'finalizers': None,
'generate_name': None,
'generation': None,
'initializers': None,
'labels': None,
'managed_fields': None,
'name': None,
'namespace': None,
'owner_references': None,
'resource_version': None,
'self_link': None,
'uid': None},
'spec': {'active_deadline_seconds': None,
'affinity':
"{'preferred_during_scheduling_ignored_during_execution': "
'None,\n'
"
'required_during_scheduling_ignored_during_execution': "
"[{'label_selector': {'match_expressions': [{'key': "
"'app',\n"
'
'
"'operator': 'in',\n"
'
'
"'values': ['airflow']}],\n"
'
'
"'match_labels': None},\n"
'
'
"'namespaces': None,\n"
'
'
"'topology_key': 'kubernetes.io/hostname'}]}",
'automount_service_account_token': None,
'containers': [{'args': [],
'command': [],
'env': [],
'env_from': [],
'image': None,
'image_pull_policy': None,
'lifecycle': None,
'liveness_probe': None,
'name': 'base',
'ports': [],
'readiness_probe': None,
'resources': {'limits': {'memory': '128Mi'},
'requests': {'memory': '128Mi'}},
'security_context': None,
'stdin': None,
'stdin_once': None,
'termination_message_path': None,
'termination_message_policy': None,
'tty': None,
'volume_devices': None,
'volume_mounts': [],
'working_dir': None}],
'dns_config': None,
'dns_policy': None,
'enable_service_links': None,
'host_aliases': None,
'host_ipc': None,
'host_network': False,
'host_pid': None,
'hostname': None,
'image_pull_secrets': [],
'init_containers': None,
'node_name': None,
'node_selector': None,
'preemption_policy': None,
'priority': None,
'priority_class_name': None,
'readiness_gates': None,
'restart_policy': None,
'runtime_class_name': None,
'scheduler_name': None,
'security_context': None,
'service_account': None,
'service_account_name': None,
'share_process_namespace': None,
'subdomain': None,
'termination_grace_period_seconds': None,
'tolerations': [{'key': 'dedicated',
'operator': 'Equal',
'value': 'airflow'}],
'volumes': []},
'status': None}, None)
[2020-11-17 19:39:55,951] {scheduler_job.py:1301} ERROR - Exception when
executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
File
"/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line
1283, in _execute
self._run_scheduler_loop()
File
"/usr/local/lib/python3.7/site-packages/astronomer/airflow/version_check/plugin.py",
line 29, in run_before
fn(*args, **kwargs)
File
"/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line
1387, in _run_scheduler_loop
self.executor.heartbeat()
File
"/usr/local/lib/python3.7/site-packages/airflow/executors/base_executor.py",
line 159, in heartbeat
self.sync()
File
"/usr/local/lib/python3.7/site-packages/airflow/executors/kubernetes_executor.py",
line 603, in sync
self.kube_scheduler.run_next(task)
File
"/usr/local/lib/python3.7/site-packages/airflow/executors/kubernetes_executor.py",
line 313, in run_next
self.launcher.run_pod_async(pod,
**self.kube_config.kube_client_request_args)
File
"/usr/local/lib/python3.7/site-packages/airflow/kubernetes/pod_launcher.py",
line 90, in run_pod_async
pod_mutation_hook(pod)
File "/usr/local/airflow/config/airflow_local_settings.py", line 26, in
pod_mutation_hook
pod.spec.affinity = pod.spec.affinity.to_dict().update({})
```
This prevents other tasks from creating pods
**What you expected to happen**:
I expected it to run without issues
**How to reproduce it**:
Run this example dag
https://github.com/apache/airflow/blob/master/airflow/example_dags/example_kubernetes_executor.py
in a kubernetes cluster using kubernetes executor
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]