potiuk commented on pull request #13835:
URL: https://github.com/apache/airflow/pull/13835#issuecomment-765329747
```
self = <Task(KubernetesPodOperator): test>, context = {'dag': <DAG: dag>,
'task': <Task(KubernetesPodOperator): test>, 'ti': <TaskInstance:
adhoc_airflow.test 2016-01-01 01:00:00+01:00 [None]>, 'ts':
'2016-01-01T01:00:00+01:00'}
def execute(self, context) -> Optional[str]:
try:
if self.in_cluster is not None:
client = kube_client.get_kube_client(
in_cluster=self.in_cluster,
cluster_context=self.cluster_context,
config_file=self.config_file,
)
else:
client = kube_client.get_kube_client(
cluster_context=self.cluster_context,
config_file=self.config_file
)
self.pod = self.create_pod_request_obj()
self.namespace = self.pod.metadata.namespace
self.client = client
# Add combination of labels to uniquely identify a running pod
labels = self.create_labels_for_pod(context)
label_selector = self._get_pod_identifying_label_string(labels)
self.namespace = self.pod.metadata.namespace
pod_list = client.list_namespaced_pod(self.namespace,
label_selector=label_selector)
if len(pod_list.items) > 1 and self.reattach_on_restart:
raise AirflowException(
'More than one pod running with labels: '
'{label_selector}'.format(label_selector=label_selector)
)
launcher = pod_launcher.PodLauncher(kube_client=client,
extract_xcom=self.do_xcom_push)
if len(pod_list.items) == 1:
try_numbers_match = self._try_numbers_match(context,
pod_list.items[0])
final_state, result = self.handle_pod_overlap(
> labels, try_numbers_match, launcher, pod_list.items[0]
)
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:336:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Task(KubernetesPodOperator): test>, labels = {'dag_id': 'dag',
'execution_date': '2016-01-01T0100000100-a2f50a31f', 'task_id': 'test',
'try_number': '1'}, try_numbers_match = False, launcher =
<airflow.kubernetes.pod_launcher.PodLauncher object at 0x7eff6f133110>
pod = {'api_version': None,
'kind': None,
'metadata': {'annotations': None,
'cluster_name': None,
...rt',
'reason': None,
'start_time': datetime.datetime(2021, 1, 22, 10, 59, 14,
tzinfo=tzutc())}}
def handle_pod_overlap(
self, labels: dict, try_numbers_match: bool, launcher: Any, pod:
k8s.V1Pod
) -> Tuple[State, Optional[str]]:
"""
In cases where the Scheduler restarts while a KubernetesPodOperator
task is running,
this function will either continue to monitor the existing pod or
launch a new pod
based on the `reattach_on_restart` parameter.
:param labels: labels used to determine if a pod is repeated
:type labels: dict
:param try_numbers_match: do the try numbers match? Only needed for
logging purposes
:type try_numbers_match: bool
:param launcher: PodLauncher
:param pod_list: list of pods found
"""
if try_numbers_match:
log_line = f"found a running pod with labels {labels} and the
same try_number."
else:
log_line = f"found a running pod with labels {labels} but a
different try_number."
# In case of failed pods, should reattach the first time, but only
once
# as the task will have already failed.
if self.reattach_on_restart and not
pod.metadata.labels.get("already_checked"):
log_line += " Will attach to this pod and monitor instead of
starting new one"
self.log.info(log_line)
self.pod = pod
> final_state, result = self.monitor_launched_pod(launcher, pod)
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:376:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Task(KubernetesPodOperator): test>, launcher =
<airflow.kubernetes.pod_launcher.PodLauncher object at 0x7eff6f133110>
pod = {'api_version': None,
'kind': None,
'metadata': {'annotations': None,
'cluster_name': None,
...rt',
'reason': None,
'start_time': datetime.datetime(2021, 1, 22, 10, 59, 14,
tzinfo=tzutc())}}
def monitor_launched_pod(self, launcher, pod) -> Tuple[State,
Optional[str]]:
"""
Monitors a pod to completion that was created by a previous
KubernetesPodOperator
:param launcher: pod launcher that will manage launching and
monitoring pods
:param pod: podspec used to find pod using k8s API
:return:
"""
try:
(final_state, result) = launcher.monitor_pod(pod,
get_logs=self.get_logs)
finally:
if self.is_delete_operator_pod:
launcher.delete_pod(pod)
if final_state != State.SUCCESS:
if self.log_events_on_failure:
for event in launcher.read_pod_events(pod).items:
self.log.error("Pod Event: %s - %s", event.reason,
event.message)
self.patch_already_checked(self.pod)
> raise AirflowException(f'Pod returned a failure: {final_state}')
E airflow.exceptions.AirflowException: Pod returned a failure:
failed
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:524:
AirflowException
During handling of the above exception, another exception occurred:
self =
<kubernetes_tests.test_kubernetes_pod_operator.TestKubernetesPodOperatorSystem
testMethod=test_reattach_failing_pod_once>
def test_reattach_failing_pod_once(self):
from airflow.utils.state import State
client = kube_client.get_kube_client(in_cluster=False)
name = "test"
namespace = "default"
k = KubernetesPodOperator(
namespace='default',
image="ubuntu:16.04",
cmds=["bash", "-cx"],
arguments=["exit 1"],
labels={"foo": "bar"},
name="test",
task_id=name,
in_cluster=False,
do_xcom_push=False,
is_delete_operator_pod=False,
termination_grace_period=0,
)
context = create_context(k)
with
mock.patch("airflow.kubernetes.pod_launcher.PodLauncher.monitor_pod") as
monitor_mock:
monitor_mock.return_value = (State.SUCCESS, None)
k.execute(context)
name = k.pod.metadata.name
pod = client.read_namespaced_pod(name=name, namespace=namespace)
while pod.status.phase != "Failed":
pod = client.read_namespaced_pod(name=name,
namespace=namespace)
with pytest.raises(AirflowException):
> k.execute(context)
kubernetes_tests/test_kubernetes_pod_operator.py:991:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:346: in execute
self.patch_already_checked(self.pod)
airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py:504: in
patch_already_checked
self.client.patch_namespaced_pod(pod.metadata.name,
pod.metadata.namespace, body)
.build/.kubernetes_venv_3.7/lib/python3.7/site-packages/kubernetes/client/api/core_v1_api.py:16004:
in patch_namespaced_pod
(data) = self.patch_namespaced_pod_with_http_info(name, namespace, body,
**kwargs) # noqa: E501
.build/.kubernetes_venv_3.7/lib/python3.7/site-packages/kubernetes/client/api/core_v1_api.py:16109:
in patch_namespaced_pod_with_http_info
collection_formats=collection_formats)
.build/.kubernetes_venv_3.7/lib/python3.7/site-packages/kubernetes/client/api_client.py:345:
in call_api
_preload_content, _request_timeout)
.build/.kubernetes_venv_3.7/lib/python3.7/site-packages/kubernetes/client/api_client.py:176:
in __call_api
_request_timeout=_request_timeout)
.build/.kubernetes_venv_3.7/lib/python3.7/site-packages/kubernetes/client/api_client.py:404:
in request
body=body)
.build/.kubernetes_venv_3.7/lib/python3.7/site-packages/kubernetes/client/rest.py:298:
in PATCH
body=body)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <kubernetes.client.rest.RESTClientObject object at 0x7eff6f1b3ad0>,
method = 'PATCH', url =
'https://0.0.0.0:19090/api/v1/namespaces/default/pods/test-38fdc44bef524fe5a18143d4155c56be',
query_params = []
headers = {'Accept': 'application/json', 'Content-Type':
'application/strategic-merge-patch+json', 'User-Agent':
'OpenAPI-Generator/11.0.0/python'}
body = {'metadata': {'creationTimestamp': '2021-01-22T10:59:14+00:00',
'labels': {'airflow_version': '2.1.0.dev0',
'already_c...c6a871f3236940006ed31091de355578492ed140a39c', 'lastState': {},
...}], 'hostIP': '172.18.0.2', 'phase': 'Failed', ...}}, post_params = {},
_preload_content = True
_request_timeout = None
def request(self, method, url, query_params=None, headers=None,
body=None, post_params=None, _preload_content=True,
_request_timeout=None):
"""Perform requests.
:param method: http request method
:param url: http request url
:param query_params: query parameters in the url
:param headers: http request headers
:param body: request json body, for `application/json`
:param post_params: request post parameters,
`application/x-www-form-urlencoded`
and `multipart/form-data`
:param _preload_content: if False, the urllib3.HTTPResponse object
will
be returned without reading/decoding
response
data. Default is True.
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
"""
method = method.upper()
assert method in ['GET', 'HEAD', 'DELETE', 'POST', 'PUT',
'PATCH', 'OPTIONS']
if post_params and body:
raise ValueError(
"body parameter cannot be used with post_params parameter."
)
post_params = post_params or {}
headers = headers or {}
timeout = None
if _request_timeout:
if isinstance(_request_timeout, (int, ) if six.PY3 else (int,
long)): # noqa: E501,F821
timeout = urllib3.Timeout(total=_request_timeout)
elif (isinstance(_request_timeout, tuple) and
len(_request_timeout) == 2):
timeout = urllib3.Timeout(
connect=_request_timeout[0], read=_request_timeout[1])
if 'Content-Type' not in headers:
headers['Content-Type'] = 'application/json'
try:
# For `POST`, `PUT`, `PATCH`, `OPTIONS`, `DELETE`
if method in ['POST', 'PUT', 'PATCH', 'OPTIONS', 'DELETE']:
if query_params:
url += '?' + urlencode(query_params)
if re.search('json', headers['Content-Type'], re.IGNORECASE):
if headers['Content-Type'] ==
'application/json-patch+json':
if not isinstance(body, list):
headers['Content-Type'] = \
'application/strategic-merge-patch+json'
request_body = None
if body is not None:
request_body = json.dumps(body)
r = self.pool_manager.request(
method, url,
body=request_body,
preload_content=_preload_content,
timeout=timeout,
headers=headers)
elif headers['Content-Type'] ==
'application/x-www-form-urlencoded': # noqa: E501
r = self.pool_manager.request(
method, url,
fields=post_params,
encode_multipart=False,
preload_content=_preload_content,
timeout=timeout,
headers=headers)
elif headers['Content-Type'] == 'multipart/form-data':
# must del headers['Content-Type'], or the correct
# Content-Type which generated by urllib3 will be
# overwritten.
del headers['Content-Type']
r = self.pool_manager.request(
method, url,
fields=post_params,
encode_multipart=True,
preload_content=_preload_content,
timeout=timeout,
headers=headers)
# Pass a `string` parameter directly in the body to support
# other content types than Json when `body` argument is
# provided in serialized form
elif isinstance(body, str):
request_body = body
r = self.pool_manager.request(
method, url,
body=request_body,
preload_content=_preload_content,
timeout=timeout,
headers=headers)
else:
# Cannot generate the request from given parameters
msg = """Cannot prepare a request message for provided
arguments. Please check that your arguments
match
declared content type."""
raise ApiException(status=0, reason=msg)
# For `GET`, `HEAD`
else:
r = self.pool_manager.request(method, url,
fields=query_params,
preload_content=_preload_content,
timeout=timeout,
headers=headers)
except urllib3.exceptions.SSLError as e:
msg = "{0}\n{1}".format(type(e).__name__, str(e))
raise ApiException(status=0, reason=msg)
if _preload_content:
r = RESTResponse(r)
# In the python 3, the response.data is bytes.
# we need to decode it to string.
if six.PY3:
r.data = r.data.decode('utf8')
# log response body
logger.debug("response body: %s", r.data)
if not 200 <= r.status <= 299:
> raise ApiException(http_resp=r)
E kubernetes.client.rest.ApiException: (409)
E Reason: Conflict
E HTTP response headers: HTTPHeaderDict({'Cache-Control':
'no-cache, private', 'Content-Type': 'application/json', 'Date': 'Fri, 22 Jan
2021 10:59:22 GMT', 'Content-Length': '358'})
E HTTP response body:
{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Operation
cannot be fulfilled on pods \"test-38fdc44bef524fe5a18143d4155c56be\": the
object has been modified; please apply your changes to the latest version and
try
again","reason":"Conflict","details":{"name":"test-38fdc44bef524fe5a18143d4155c56be","kind":"pods"},"code":409}
.build/.kubernetes_venv_3.7/lib/python3.7/site-packages/kubernetes/client/rest.py:231:
ApiException
---------------------------------------------------------------------------------------------------------------------------------------------
Captured stdout call
----------------------------------------------------------------------------------------------------------------------------------------------
[2021-01-22 11:59:14,860] {pod_launcher.py:176} INFO - Event:
test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
[2021-01-22 11:59:14,860] {pod_launcher.py:113} WARNING - Pod not yet
started: test-38fdc44bef524fe5a18143d4155c56be
[2021-01-22 11:59:15,869] {pod_launcher.py:176} INFO - Event:
test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
[2021-01-22 11:59:15,869] {pod_launcher.py:113} WARNING - Pod not yet
started: test-38fdc44bef524fe5a18143d4155c56be
[2021-01-22 11:59:16,878] {pod_launcher.py:176} INFO - Event:
test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
[2021-01-22 11:59:16,878] {pod_launcher.py:113} WARNING - Pod not yet
started: test-38fdc44bef524fe5a18143d4155c56be
[2021-01-22 11:59:17,884] {pod_launcher.py:176} INFO - Event:
test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
[2021-01-22 11:59:17,885] {pod_launcher.py:113} WARNING - Pod not yet
started: test-38fdc44bef524fe5a18143d4155c56be
[2021-01-22 11:59:18,898] {pod_launcher.py:176} INFO - Event:
test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
[2021-01-22 11:59:18,899] {pod_launcher.py:113} WARNING - Pod not yet
started: test-38fdc44bef524fe5a18143d4155c56be
[2021-01-22 11:59:19,903] {pod_launcher.py:176} INFO - Event:
test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
[2021-01-22 11:59:19,903] {pod_launcher.py:113} WARNING - Pod not yet
started: test-38fdc44bef524fe5a18143d4155c56be
[2021-01-22 11:59:20,906] {pod_launcher.py:176} INFO - Event:
test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
[2021-01-22 11:59:20,906] {pod_launcher.py:113} WARNING - Pod not yet
started: test-38fdc44bef524fe5a18143d4155c56be
[2021-01-22 11:59:21,910] {pod_launcher.py:176} INFO - Event:
test-38fdc44bef524fe5a18143d4155c56be had an event of type Failed
[2021-01-22 11:59:21,910] {pod_launcher.py:286} ERROR - Event with job id
test-38fdc44bef524fe5a18143d4155c56be Failed
[2021-01-22 11:59:21,936] {pod_launcher.py:136} INFO - + exit 1
[2021-01-22 11:59:22,953] {pod_launcher.py:176} INFO - Event:
test-38fdc44bef524fe5a18143d4155c56be had an event of type Failed
[2021-01-22 11:59:22,954] {pod_launcher.py:286} ERROR - Event with job id
test-38fdc44bef524fe5a18143d4155c56be Failed
[2021-01-22 11:59:22,956] {pod_launcher.py:176} INFO - Event:
test-38fdc44bef524fe5a18143d4155c56be had an event of type Failed
[2021-01-22 11:59:22,956] {pod_launcher.py:286} ERROR - Event with job id
test-38fdc44bef524fe5a18143d4155c56be Failed
-----------------------------------------------------------------------------------------------------------------------------------------------
Captured log call
-----------------------------------------------------------------------------------------------------------------------------------------------
INFO airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:176
Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
WARNING airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:113 Pod
not yet started: test-38fdc44bef524fe5a18143d4155c56be
INFO airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:176
Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
WARNING airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:113 Pod
not yet started: test-38fdc44bef524fe5a18143d4155c56be
INFO airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:176
Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
WARNING airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:113 Pod
not yet started: test-38fdc44bef524fe5a18143d4155c56be
INFO airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:176
Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
WARNING airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:113 Pod
not yet started: test-38fdc44bef524fe5a18143d4155c56be
INFO airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:176
Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
WARNING airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:113 Pod
not yet started: test-38fdc44bef524fe5a18143d4155c56be
INFO airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:176
Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
WARNING airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:113 Pod
not yet started: test-38fdc44bef524fe5a18143d4155c56be
INFO airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:176
Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Pending
WARNING airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:113 Pod
not yet started: test-38fdc44bef524fe5a18143d4155c56be
INFO airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:176
Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Failed
ERROR airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:286
Event with job id test-38fdc44bef524fe5a18143d4155c56be Failed
INFO airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:136 +
exit 1
INFO airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:176
Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Failed
ERROR airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:286
Event with job id test-38fdc44bef524fe5a18143d4155c56be Failed
INFO airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:176
Event: test-38fdc44bef524fe5a18143d4155c56be had an event of type Failed
ERROR airflow.kubernetes.pod_launcher.PodLauncher:pod_launcher.py:286
Event with job id test-38fdc44bef524fe5a18143d4155c56be Failed
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]