vizeit commented on issue #39717:
URL: https://github.com/apache/airflow/issues/39717#issuecomment-2210232607

   I did few tests with new version 2.9.2 and have the following details with 
the log
   
   **Configuration**
   
   > Airflow version: 2.9.2
   > Compute: GKE
   > Executor: CeleryKubernetesExecutor
   > AIRFLOW__CORE__PARALLELISM: 160
   > AIRFLOW__SCHEDULER__MAX_TIS_PER_QUERY: 0
   > AIRFLOW__CELERY__WORKER_CONCURRENCY: 60
   > Worker replicas: 4
   > Scheduler replicas: 2
   > AIRFLOW__SCHEDULER__TASK_QUEUED_TIMEOUT: 3600
   
   I am running multiple instances of a dag with dynamic task mapping that 
expands into hundreds of tasks. The log shows that task gets scheduled and 
queued (at 2024-07-05T00:01:59.683) but does not get executed within task 
queued timeout period resulting into the reported error (at 
2024-07-05T01:02:09.431)
   
   ````  
   {
     "textPayload": "\t<TaskInstance: dynamic-map-group.supplier.agent 
manual__2024-07-04T23:59:12.475712+00:00 map_index=342 [scheduled]>\u001b[0m",
     "insertId": "5fqi9x47wvvla4jh",
     "resource": {
       "type": "k8s_container",
       "labels": {
         "container_name": "airflow-scheduler",
         "namespace_name": "mynamespacedevdev",
         "location": "us-central1",
         "project_id": "mygcp-project",
         "cluster_name": "mygkecluster",
         "pod_name": "mygkecluster-scheduler-6b77fc67d-8z6gs"
       }
     },
     "timestamp": "2024-07-05T00:01:50.234024733Z",
     "severity": "INFO",
     "labels": {
       "k8s-pod/release": "mygkecluster",
       "k8s-pod/component": "scheduler",
       "k8s-pod/pod-template-hash": "6b77fc67d",
       "compute.googleapis.com/resource_name": 
"gk3-mygkecluster-nap-c58g4osx-19a59e8c-pxv6",
       "k8s-pod/app": "airflow"
     },
     "logName": "projects/mygcp-project/logs/stdout",
     "receiveTimestamp": "2024-07-05T00:01:52.886420247Z"
   }
   
   {
     "textPayload": "[2024-07-05 00:01:59,683: INFO/ForkPoolWorker-49] Running 
<TaskInstance: dynamic-map-group.supplier.agent 
manual__2024-07-04T23:59:12.475712+00:00 map_index=342 [queued]> on host 
mygkecluster-worker-1.mygkecluster-worker.mynamespacedevdev.svc.cluster.local",
     "insertId": "4o989t3huassb59y",
     "resource": {
       "type": "k8s_container",
       "labels": {
         "container_name": "airflow-worker",
         "project_id": "mygcp-project",
         "location": "us-central1",
         "pod_name": "mygkecluster-worker-1",
         "namespace_name": "mynamespacedevdev",
         "cluster_name": "mygkecluster"
       }
     },
     "timestamp": "2024-07-05T00:01:59.683635856Z",
     "severity": "INFO",
     "labels": {
       "k8s-pod/apps_kubernetes_io/pod-index": "1",
       "k8s-pod/release": "mygkecluster",
       "k8s-pod/component": "worker",
       "k8s-pod/app": "airflow",
       "k8s-pod/statefulset_kubernetes_io/pod-name": "mygkecluster-worker-1",
       "k8s-pod/controller-revision-hash": "mygkecluster-worker-87b575989",
       "compute.googleapis.com/resource_name": 
"gk3-mygkecluster-nap-c58g4osx-19a59e8c-kdqm"
     },
     "logName": "projects/mygcp-project/logs/stderr",
     "receiveTimestamp": "2024-07-05T00:02:03.578550371Z"
   }
   
   {
     "textPayload": "[\u001b[34m2024-07-05T01:01:58.407+0000\u001b[0m] 
{\u001b[34mtask_context_logger.py:\u001b[0m91} WARNING\u001b[0m - Marking task 
instance <TaskInstance: dynamic-map-group.supplier.agent 
manual__2024-07-04T23:59:12.475712+00:00 map_index=342 [queued]> stuck in 
queued as failed. If the task instance has available retries, it will be 
retried.\u001b[0m",
     "insertId": "11k0z0jmz77mlcu6",
     "resource": {
       "type": "k8s_container",
       "labels": {
         "container_name": "airflow-scheduler",
         "project_id": "mygcp-project",
         "namespace_name": "mynamespacedevdev",
         "cluster_name": "mygkecluster",
         "location": "us-central1",
         "pod_name": "mygkecluster-scheduler-6b77fc67d-8z6gs"
       }
     },
     "timestamp": "2024-07-05T01:01:58.409116538Z",
     "severity": "INFO",
     "labels": {
       "k8s-pod/release": "mygkecluster",
       "k8s-pod/app": "airflow",
       "k8s-pod/component": "scheduler",
       "compute.googleapis.com/resource_name": 
"gk3-mygkecluster-nap-c58g4osx-19a59e8c-pxv6",
       "k8s-pod/pod-template-hash": "6b77fc67d"
     },
     "logName": "projects/mygcp-project/logs/stdout",
     "receiveTimestamp": "2024-07-05T01:02:02.907406580Z"
   }
   
   {
     "textPayload": "[\u001b[34m2024-07-05T01:02:09.431+0000\u001b[0m] 
{\u001b[34mtask_context_logger.py:\u001b[0m91} ERROR\u001b[0m - Executor 
reports task instance <TaskInstance: dynamic-map-group.supplier.agent 
manual__2024-07-04T23:59:12.475712+00:00 map_index=342 [queued]> finished 
(failed) although the task says it's queued. (Info: None) Was the task killed 
externally?\u001b[0m",
     "insertId": "pldcpv17g4ggyycu",
     "resource": {
       "type": "k8s_container",
       "labels": {
         "project_id": "mygcp-project",
         "location": "us-central1",
         "pod_name": "mygkecluster-scheduler-6b77fc67d-8z6gs",
         "cluster_name": "mygkecluster",
         "container_name": "airflow-scheduler",
         "namespace_name": "mynamespacedevdev"
       }
     },
     "timestamp": "2024-07-05T01:02:09.431825344Z",
     "severity": "INFO",
     "labels": {
       "k8s-pod/app": "airflow",
       "compute.googleapis.com/resource_name": 
"gk3-mygkecluster-nap-c58g4osx-19a59e8c-pxv6",
       "k8s-pod/component": "scheduler",
       "k8s-pod/release": "mygkecluster",
       "k8s-pod/pod-template-hash": "6b77fc67d"
     },
     "logName": "projects/mygcp-project/logs/stdout",
     "receiveTimestamp": "2024-07-05T01:02:12.928364383Z"
   }
   ````  
   I believe increasing **AIRFLOW__SCHEDULER__TASK_QUEUED_TIMEOUT** value will 
fix the issue. However, the root cause is still unknown, why did worker not 
process the task for an entire hour? Interesting observation is the failed task 
with this error is from the first instance of the DAG e.g. If I have 8 
instances of the dag running, the error shows up in the 1st instance. Which 
indicates that tasks from all the instances are running and processed but 
somehow-sometime task from the 1st instance never gets executed. It may have to 
do with the overall throughput so tasks never stay in the queue for that long 
and I am not sure yet how to increase it. I have enough CPU and memory for the 
worker replicas as well as scheduler. Any ideas?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to