ecodina opened a new issue, #61916:
URL: https://github.com/apache/airflow/issues/61916

   ### Apache Airflow version
   
   3.1.7
   
   ### If "Other Airflow 3 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   The triggerer fails every few hours with the OSError "Too many open files". 
In there I run `ExternalTaskSensor` as well as a custom trigger (below).
   
   I thought it could be related to #56366, but my triggerer does not use the 
`cleanup` method. I've also seen issues for the worker (#51624) and 
dagprocessor (#49887).
   
   I have been investigating and see that `/proc/7/fd` always increases. 
Instead, `/proc/24/fd` does handle closing files / sockets correctly. From what 
I've seen, my trigger code runs as PID 24 (used `os.getpid()` to verify it), so 
PID 7 is probably the parent process:
   
   ```
   root@99ee11a02435:/opt/airflow# ps aux
   USER         PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
   default        1  0.0  0.0   2336  1024 ?        Ss   16:59   0:00 
/usr/bin/dumb-init -- /entrypoint triggerer --skip-serve-logs
   default        7  2.3  2.7 386360 221004 ?       Ssl  16:59   0:16 
/usr/python/bin/python3.12 /home/airflow/.local/bin/airflow triggerer 
--skip-serve-logs
   default       24  0.2  1.8 359924 153816 ?       Sl   17:01   0:01 
/usr/python/bin/python3.12 /home/airflow/.local/bin/airflow triggerer 
--skip-serve-logs
   ```
   
   <details>
   
   <summary>What my custom trigger does</summary>
   
   My trigger basically connects to a Redis database and waits for a certain 
key to change to a certain status.
   
   ```python
   class MyTrigger(BaseTrigger):
       
       ...
      
       async def check_slurm_state(self, redis_conn: Redis):
           """
           Checks the slurm's job state every *self.polling_interval* on the 
Redis database.
           """
           finished_ok = False
           final_message = ""
   
           while True:
               state = await self.get_sacct_output(redis_conn)
   
               # We check if state in SACCT_RUNNING in case it is stuck in a 
completed state
               if (
                   self.last_known_state == state["state"]
                   and state["state"] in SACCT_RUNNING
                   or state["state"] == "UNKNOWN"
               ):
                   await asyncio.sleep(self.polling_interval)
                   continue
   
               # The state has changed!
               self.log.ainfo(f"Job has changed to status {state['state']}")    
# I've also tried self.log.info
               await self.store_state(redis_conn, state["state"])
               is_finished, finished_ok, final_message = await 
self.parse_state_change(
                   state["state"], state["reason"]
               )
   
               self.last_known_state = state["state"]
   
               if not is_finished:
                   await asyncio.sleep(self.polling_interval)
               else:
                   break
   
           return finished_ok, final_message
   
       async def run(self):
           redis_hook = RedisHook(redis_conn_id="my_redis_conn")
           conn = await redis_hook.aget_connection(redis_hook.redis_conn_id)
   
           redis_client = Redis(
               host=conn.host,
               port=conn.port,
               username=conn.login,
               password=None
               if str(conn.password).lower() in ["none", "false", ""]
               else conn.password,
               db=conn.extra_dejson.get("db"),
               max_connections=5,
               decode_responses=True,
           )
   
           async with redis_client:
               ...
               finished_ok, final_message = await 
self.check_slurm_state(redis_client)
   
           ...
   
           yield TriggerEvent({"finished_ok": finished_ok, "final_message": 
final_message)
   ```
   
   </details>
   
   
   When I saw this problem I thought that it may be due to logging. We use a 
custom FileTaskHandler, but have configured the trigger **not** to use it by 
setting the variable AIRFLOW__LOGGING__LOGGING_CONFIG_CLASS to "":
   
   ```bash
   default@99ee11a02435:/opt/airflow$ airflow info
   Apache Airflow
   version                | 3.1.7                                               
          
   executor               | LocalExecutor                                       
          
   task_logging_handler   | airflow.utils.log.file_task_handler.FileTaskHandler 
          
   sql_alchemy_conn       | postgresql+psycopg2://db_editor:****@db:5432/airflow
   dags_folder            | /opt/airflow/dags                                   
          
   plugins_folder         | /opt/airflow/plugins                                
          
   base_log_folder        | /opt/airflow/logs                                   
          
   remote_base_log_folder |                          
   ```
   
   ### What you think should happen instead?
   
   Files / sockets should be closed when they are no longer needed for PID 7.
   
   ### How to reproduce
   
   Run Airflow in Docker and create 2 Dags: a "parent" Dag, and a "child" Dag 
with an ExternalTaskSensor in deferrable mode.
   
   Access the container and check how many `fd` are there for each PID:
   
   ```
   ls /proc/7/fd | wc -l
   ls /proc/24/fd | wc -l
   ```
   
   You'll see that the number of `fd` for PID 24 increases when the 
ExternalTaskSensor starts, and decreases when it finishes.
   You'll also see that the number of `fd` for PID 7 increases when the 
ExternalTaskSensor starts, but never decreases.
   
   ### Operating System
   
   Debian GNU/Linux 12 (bookworm)
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-common-compat==1.13.0
   apache-airflow-providers-common-io==1.7.1
   apache-airflow-providers-common-sql==1.30.4
   apache-airflow-providers-ftp==3.14.1
   apache-airflow-providers-git==0.2.2
   apache-airflow-providers-http==5.6.4
   apache-airflow-providers-imap==3.10.3
   apache-airflow-providers-keycloak==0.5.1
   apache-airflow-providers-postgres==6.5.3
   apache-airflow-providers-redis==4.4.2
   apache-airflow-providers-sftp==5.7.0
   apache-airflow-providers-smtp==2.4.2
   apache-airflow-providers-ssh==4.3.1
   apache-airflow-providers-standard==1.11.0
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   - We use Docker Swarm. 
   - We use Airflow's Docker image where we install our own provider that has 
our trigger, as well as some other providers.
   - We use python 3.12
   - The logs volume is mounted as an NFS volume with options 
`nfsvers=4.2,rw,noatime,nocto,actimeo=5,nolock`
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to