[ 
https://issues.apache.org/jira/browse/AIRFLOW-5447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16930761#comment-16930761
 ] 

Daniel Imberman commented on AIRFLOW-5447:
------------------------------------------

Ok so I've broken down the currently running threads in hopes this helps us out
 

Thread 1: attempting to put a new task in the task_queue

 
{code:java}
Thread 0x7f0c13c7d700
  File "/usr/local/airflow/.local/bin/airflow", line 32, in <module>
    args.func(args)
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", 
line 74, in wrapper
    return f(*args, **kwargs)
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/bin/cli.py", 
line 1013, in scheduler
    job.run()  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/jobs/base_job.py",
 line 213, in run
    self._execute()
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 1350, in _execute
    self._execute_helper()
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 1439, in _execute_helper
    self.executor.heartbeat()
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/executors/base_executor.py",
 line 132, in heartbeat
    self.trigger_tasks(open_slots)
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/executors/base_executor.py",
 line 156, in trigger_tasks
    executor_config=simple_ti.executor_config)
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/contrib/executors/kubernetes_executor.py",
 line 767, in execute_async
    self.task_queue.put((key, command, kube_executor_config))
  File "<string>", line 2, in put
  File "/usr/local/lib/python3.7/multiprocessing/managers.py", line 819, in 
_callmethod
    kind, result = conn.recv()
  File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 250, in 
recv
    buf = self._recv_bytes()
  File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 407, in 
_recv_bytes
    buf = self._recv(4)
  File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 379, in 
_recv
    chunk = read(handle, remaining)
  File "<string>", line 1, in <module>
  File "<string>", line 5, in <module>
{code}
 

Thread 2: re-reading plugins files
{code:java}
Thread 0x7f0c01c31700
  File "/usr/local/lib/python3.7/threading.py", line 890, in _bootstrap
    self._bootstrap_inner()
  File "/usr/local/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)Thread 0x7f0bff430700  File 
"/usr/local/lib/python3.7/multiprocessing/managers.py", line 201, in 
handle_request
    result = func(c, *args, **kwds)
  File "/usr/local/lib/python3.7/multiprocessing/managers.py", line 422, in 
accept_connection
    self.serve_client(c)
  File "/usr/local/lib/python3.7/multiprocessing/managers.py", line 234, in 
serve_client
    request = recv()
  File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 251, in 
recv
    return _ForkingPickler.loads(buf.getbuffer())
  File "<frozen importlib._bootstrap>", line 202, in _lock_unlock_module
  File "<frozen importlib._bootstrap>", line 98, in acquire
  File "/usr/local/lib/python3.7/threading.py", line 890, in _bootstrap
    self._bootstrap_inner()
  File "/usr/local/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.7/multiprocessing/managers.py", line 178, in 
accepter
    c = self.listener.accept()
  File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 453, in 
accept
    c = self._listener.accept()
  File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 598, in 
accept
    s, self._last_accepted = self._socket.accept()
  File "/usr/local/lib/python3.7/socket.py", line 212, in accept
    fd, addr = self._accept()
  File "/usr/local/airflow/.local/bin/airflow", line 21, in <module>
    from airflow import configuration
  File "<frozen importlib._bootstrap>", line 983, in _find_and_load
  File "<frozen importlib._bootstrap>", line 967, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 728, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/__init__.py", 
line 94, in <module>
    operators._integrate_plugins()
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/operators/__init__.py",
 line 104, in _integrate_plugins
    from airflow.plugins_manager import operators_modules
  File "<frozen importlib._bootstrap>", line 983, in _find_and_load
  File "<frozen importlib._bootstrap>", line 967, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
{code}
 

Thread 3: thread manager server
{code:java}
Thread 0x7f0c13c7d700  File "<frozen importlib._bootstrap_external>", line 728, 
in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/plugins_manager.py",
 line 142, in <module>
    m = imp.load_source(namespace, filepath)
  File "/usr/local/lib/python3.7/imp.py", line 171, in load_source
    module = _load(spec)
  File "<frozen importlib._bootstrap>", line 696, in _load
  File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 728, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/usr/local/airflow/plugins/parameterized_dags/__init__.py", line 4, in 
<module>
    from airflow.www.app import csrf
  File "<frozen importlib._bootstrap>", line 983, in _find_and_load
  File "<frozen importlib._bootstrap>", line 967, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 728, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/www/app.py", 
line 37, in <module>
    from airflow.www.blueprints import routes
  File "<frozen importlib._bootstrap>", line 983, in _find_and_load
  File "<frozen importlib._bootstrap>", line 967, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 728, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/www/blueprints.py",
 line 24, in <module>
    from airflow import jobs
  File "<frozen importlib._bootstrap>", line 1035, in _handle_fromlist
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "<frozen importlib._bootstrap>", line 983, in _find_and_load
  File "<frozen importlib._bootstrap>", line 967, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 728, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/jobs/__init__.py",
 line 21, in <module>
    from airflow.jobs.base_job import BaseJob  # noqa: F401
  File "<frozen importlib._bootstrap>", line 983, in _find_and_load
  File "<frozen importlib._bootstrap>", line 967, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 728, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/jobs/base_job.py",
 line 48, in <module>
    class BaseJob(Base, LoggingMixin):
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/jobs/base_job.py",
 line 82, in BaseJob
    executor=executors.get_default_executor(),
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/executors/__init__.py",
 line 48, in get_default_executor
    DEFAULT_EXECUTOR = _get_executor(executor_name)
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/executors/__init__.py",
 line 86, in _get_executor
    return KubernetesExecutor()
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/contrib/executors/kubernetes_executor.py",
 line 650, in __init__
    self._manager = multiprocessing.Manager()
  File "/usr/local/lib/python3.7/multiprocessing/context.py", line 56, in 
Manager
    m.start()
  File "/usr/local/lib/python3.7/multiprocessing/managers.py", line 563, in 
start
    self._process.start()
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line 112, in start
    self._popen = self._Popen(self)
  File "/usr/local/lib/python3.7/multiprocessing/context.py", line 277, in 
_Popen
    return Popen(process_obj)
  File "/usr/local/lib/python3.7/multiprocessing/popen_fork.py", line 20, in 
__init__
    self._launch(process_obj)
  File "/usr/local/lib/python3.7/multiprocessing/popen_fork.py", line 74, in 
_launch
    code = process_obj._bootstrap()
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in 
_bootstrap
    self.run()
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.7/multiprocessing/managers.py", line 597, in 
_run_server
    server.serve_forever()
  File "/usr/local/lib/python3.7/multiprocessing/managers.py", line 165, in 
serve_forever
    self.stop_event.wait(1)
  File "/usr/local/lib/python3.7/threading.py", line 552, in wait
    signaled = self._cond.wait(timeout)
  File "/usr/local/lib/python3.7/threading.py", line 300, in wait
    gotit = waiter.acquire(True, timeout)
  File "<string>", line 1, in <module>
  File "<string>", line 5, in <module>
{code}
Thread 4: a second(?) threadmanager server
{code:java}
Thread 0x7f0c01c31700
  File "/usr/local/lib/python3.7/threading.py", line 890, in _bootstrap
    self._bootstrap_inner()
  File "/usr/local/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.7/multiprocessing/managers.py", line 201, in 
handle_request
    result = func(c, *args, **kwds)
  File "/usr/local/lib/python3.7/multiprocessing/managers.py", line 422, in 
accept_connection
    self.serve_client(c)
  File "/usr/local/lib/python3.7/multiprocessing/managers.py", line 234, in 
serve_client
    request = recv()
  File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 250, in 
recv
    buf = self._recv_bytes()
  File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 407, in 
_recv_bytes
    buf = self._recv(4)
  File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 379, in 
_recv
    chunk = read(handle, remaining)Thread 0x7f0bff430700
  File "/usr/local/lib/python3.7/threading.py", line 890, in _bootstrap
    self._bootstrap_inner()
  File "/usr/local/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.7/threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.7/multiprocessing/managers.py", line 178, in 
accepter
    c = self.listener.accept()
  File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 453, in 
accept
    c = self._listener.accept()
  File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 598, in 
accept
    s, self._last_accepted = self._socket.accept()
  File "/usr/local/lib/python3.7/socket.py", line 212, in accept
    fd, addr = self._accept()Thread 0x7f0c13c7d700  File 
"/usr/local/airflow/.local/bin/airflow", line 32, in <module>
    args.func(args)
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", 
line 74, in wrapper
    return f(*args, **kwargs)
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/bin/cli.py", 
line 1013, in scheduler
    job.run()
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/jobs/base_job.py",
 line 213, in run
    self._execute()
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 1350, in _execute
    self._execute_helper()
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 1374, in _execute_helper
    self.executor.start()
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/contrib/executors/kubernetes_executor.py",
 line 756, in start
    self.kube_client, self.worker_uuid
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/contrib/executors/kubernetes_executor.py",
 line 412, in __init__
    self._manager = multiprocessing.Manager()
  File "/usr/local/lib/python3.7/multiprocessing/context.py", line 56, in 
Manager
    m.start()
  File "/usr/local/lib/python3.7/multiprocessing/managers.py", line 563, in 
start
    self._process.start()
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line 112, in start
    self._popen = self._Popen(self)
  File "/usr/local/lib/python3.7/multiprocessing/context.py", line 277, in 
_Popen
    return Popen(process_obj)
  File "/usr/local/lib/python3.7/multiprocessing/popen_fork.py", line 20, in 
__init__
    self._launch(process_obj)
  File "/usr/local/lib/python3.7/multiprocessing/popen_fork.py", line 74, in 
_launch
    code = process_obj._bootstrap()
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in 
_bootstrap
    self.run()
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.7/multiprocessing/managers.py", line 597, in 
_run_server
    server.serve_forever()
  File "/usr/local/lib/python3.7/multiprocessing/managers.py", line 165, in 
serve_forever
    self.stop_event.wait(1)
  File "/usr/local/lib/python3.7/threading.py", line 552, in wait
    signaled = self._cond.wait(timeout)
  File "/usr/local/lib/python3.7/threading.py", line 300, in wait
    gotit = waiter.acquire(True, timeout)
  File "<string>", line 1, in <module>
  File "<string>", line 5, in <module>
{code}
Thread 5: Jobwatcher reading k8s stream
{code:java}
Thread 0x7f0c13c7d700
  File "/usr/local/airflow/.local/bin/airflow", line 32, in <module>
    args.func(args)  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", 
line 74, in wrapper
    return f(*args, **kwargs)
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/bin/cli.py", 
line 1013, in scheduler
    job.run()
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/jobs/base_job.py",
 line 213, in run
    self._execute()
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 1350, in _execute
    self._execute_helper()
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 1374, in _execute_helper
    self.executor.start()
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/contrib/executors/kubernetes_executor.py",
 line 756, in start
    self.kube_client, self.worker_uuid
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/contrib/executors/kubernetes_executor.py",
 line 415, in __init__
    self.kube_watcher = self._make_kube_watcher()
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/contrib/executors/kubernetes_executor.py",
 line 421, in _make_kube_watcher
    watcher.start()
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line 112, in start
    self._popen = self._Popen(self)
  File "/usr/local/lib/python3.7/multiprocessing/context.py", line 223, in 
_Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "/usr/local/lib/python3.7/multiprocessing/context.py", line 277, in 
_Popen
    return Popen(process_obj)
  File "/usr/local/lib/python3.7/multiprocessing/popen_fork.py", line 20, in 
__init__
    self._launch(process_obj)
  File "/usr/local/lib/python3.7/multiprocessing/popen_fork.py", line 74, in 
_launch
    code = process_obj._bootstrap()
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in 
_bootstrap
    self.run()
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/contrib/executors/kubernetes_executor.py",
 line 325, in run
    self.worker_uuid, self.kube_config)
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/contrib/executors/kubernetes_executor.py",
 line 349, in _run
    **kwargs):
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/kubernetes/watch/watch.py",
 line 144, in stream
    for line in iter_resp_lines(resp):
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/kubernetes/watch/watch.py",
 line 48, in iter_resp_lines
    for seg in resp.read_chunked(decode_content=False):
  File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 704, 
in read_chunked
    self._update_chunk_length()
  File "/usr/local/lib/python3.7/site-packages/urllib3/response.py", line 636, 
in _update_chunk_length
    line = self._fp.fp.readline()
  File "/usr/local/lib/python3.7/socket.py", line 589, in readinto
    return self._sock.recv_into(b)
  File "/usr/local/lib/python3.7/site-packages/urllib3/contrib/pyopenssl.py", 
line 304, in recv_into
    return self.connection.recv_into(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/OpenSSL/SSL.py", line 1821, in 
recv_into
    result = _lib.SSL_read(self._ssl, buf, nbytes)
  File "<string>", line 1, in <module>
  File "<string>", line 5, in <module>
{code}
Thread 6: reading DAG files
{code:java}
Thread 0x7f0c13c7d700
  File "/usr/local/airflow/.local/bin/airflow", line 32, in <module>
    args.func(args)
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/utils/cli.py", 
line 74, in wrapper
    return f(*args, **kwargs)
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/bin/cli.py", 
line 1013, in scheduler
    job.run()
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/jobs/base_job.py",
 line 213, in run
    self._execute()
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 1350, in _execute
    self._execute_helper()
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py",
 line 1380, in _execute_helper
    self.processor_agent.start()
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/utils/dag_processing.py",
 line 543, in start
    self._process.start()
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line 112, in start
    self._popen = self._Popen(self)
  File "/usr/local/lib/python3.7/multiprocessing/context.py", line 223, in 
_Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "/usr/local/lib/python3.7/multiprocessing/context.py", line 277, in 
_Popen
    return Popen(process_obj)
  File "/usr/local/lib/python3.7/multiprocessing/popen_fork.py", line 20, in 
__init__
    self._launch(process_obj)
  File "/usr/local/lib/python3.7/multiprocessing/popen_fork.py", line 74, in 
_launch
    code = process_obj._bootstrap()
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in 
_bootstrap
    self.run()
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/utils/dag_processing.py",
 line 613, in _run_processor_manager
    processor_manager.start()
  File 
"/usr/local/airflow/.local/lib/python3.7/site-packages/airflow/utils/dag_processing.py",
 line 872, in start
    self._signal_conn.send(dag_parsing_stat)
  File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 206, in 
send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 404, in 
_send_bytes
    self._send(header + buf)
  File "/usr/local/lib/python3.7/multiprocessing/connection.py", line 368, in 
_send
    n = write(self._handle, buf)
  File "<string>", line 1, in <module>
  File "<string>", line 5, in <module>
{code}
 

> KubernetesExecutor hangs on task queueing
> -----------------------------------------
>
>                 Key: AIRFLOW-5447
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-5447
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: executor-kubernetes
>    Affects Versions: 1.10.4, 1.10.5
>         Environment: Kubernetes version v1.14.3, Airflow version 1.10.4-1.10.5
>            Reporter: Henry Cohen
>            Assignee: Daniel Imberman
>            Priority: Blocker
>
> Starting in 1.10.4, and continuing in 1.10.5, when using the 
> KubernetesExecutor, with the webserver and scheduler running in the 
> kubernetes cluster, tasks are scheduled, but when added to the task queue, 
> the executor process hangs indefinitely. Based on log messages, it appears to 
> be stuck at this line 
> https://github.com/apache/airflow/blob/v1-10-stable/airflow/contrib/executors/kubernetes_executor.py#L761



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to