mcarter-twosigma opened a new issue, #61964:
URL: https://github.com/apache/airflow/issues/61964

   ### Apache Airflow version
   
   2.11.X
   
   ### If "Other Airflow 3 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   On Apache Airflow Version: 2.10.5 (version not available in this reporting 
tool), when using LocalExecutor with AIP-44 (Internal API) enabled via 
AIRFLOW_ENABLE_AIP_44=true and database_access_isolation=True, the Internal API 
is not properly activated in task supervisor processes. This causes 
InternalApiConfig.get_use_internal_api() to return False, which triggers the 
reconfiguration of SQLAlchemy to use NullPool instead of QueuePool at 
airflow/cli/commands/task_command.py:476.
   
   This defeats the purpose of AIP-44 and has serious consequences:
   1. Task supervisors use direct database access (not via Internal API)
   2. SQLAlchemy uses NullPool, which causes memory leaks with certain database 
drivers (notably PostgreSQL with libpq 16)
   3. The behavior is inconsistent with other Airflow components that properly 
activate the Internal API
   
   When database_access_isolation=True and AIRFLOW_ENABLE_AIP_44=true:
   - InternalApiConfig.set_use_internal_api() is never called in task 
supervisor processes
   - InternalApiConfig.get_use_internal_api() returns False at 
task_command.py:469
   - The ORM is reconfigured to use NullPool at task_command.py:476
   - Task supervisors use direct database access with NullPool (worst of both 
worlds)
   
   Root Cause
   
   The issue occurs in the execution flow when LocalExecutor forks to create 
task supervisor processes:
   
   Normal execution path (e.g., airflow tasks run from CLI):
   
   1. __main__.py:main() is called
   2. configure_internal_api() is called at line 60
   3. If database_access_isolation=True, calls 
InternalApiConfig.set_use_internal_api() at line 75
   4. task_run() is called
   5. Check at task_command.py:469 correctly sees get_use_internal_api() 
returns True
   
   LocalExecutor fork path (the problematic path):
   1. LocalExecutor worker forks at airflow/executors/local_executor.py:117
   2. Child process directly parses args and calls args.func(args) at line 142
   3. Skips __main__.py:configure_internal_api() entirely
   4. task_run() is called with InternalApiConfig._use_internal_api still at 
default False
   5. Check at task_command.py:469 sees get_use_internal_api() returns False
   6. Triggers settings.reconfigure_orm(disable_connection_pool=True) at line 
476
   
   Note: There is a call to InternalApiConfig.set_use_internal_api() in 
StandardTaskRunner._start_by_fork() at 
airflow/task/task_runner/standard_task_runner.py:79, but this happens in the 
task runner child process, not the task supervisor process where the check at 
line 469 occurs.
   
   Impact:
   
   1. Memory leak: With NullPool and PostgreSQL libpq 16, task supervisor 
processes leak ~22 MB/hour
   2. Scalability issues: With 96 parallel tasks, memory leaks quickly exhaust 
container limits, causing OOM kills
   3. AIP-44 broken for LocalExecutor: The feature doesn't work as designed for 
one of the most common executors
   4. Security concern: Untrusted components (task supervisors running user 
code) have direct database access when they shouldn't
   
   Proposed Fix
   
   In airflow/executors/local_executor.py, the _execute_work_in_fork() method 
should check database_access_isolation and call 
InternalApiConfig.set_use_internal_api() before invoking the task command:
   ```
   def _execute_work_in_fork(self, command: CommandType) -> TaskInstanceState:
       pid = os.fork()
       if pid:
           # In parent, wait for the child
           pid, ret = os.waitpid(pid, 0)
           return TaskInstanceState.SUCCESS if ret == 0 else 
TaskInstanceState.FAILED
   
       from airflow.sentry import Sentry
   
       ret = 1
       try:
           import signal
   
           from airflow.cli.cli_parser import get_parser
           from airflow.configuration import conf  # Add this import
   
           signal.signal(signal.SIGINT, signal.SIG_DFL)
           signal.signal(signal.SIGTERM, signal.SIG_DFL)
           signal.signal(signal.SIGUSR2, signal.SIG_DFL)
   
           # Add this block to configure Internal API if needed
           if conf.getboolean("core", "database_access_isolation", 
fallback=False):
               from airflow.api_internal.internal_api_call import 
InternalApiConfig
   
               # Set SQL connection to none:// to prevent direct DB access
               if "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN" in os.environ:
                   os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"] = "none://"
                   conf.set("database", "sql_alchemy_conn", "none://")
   
               InternalApiConfig.set_use_internal_api("LocalExecutor task 
supervisor")
   
           parser = get_parser()
           # [1:] - remove "airflow" from the start of the command
           args = parser.parse_args(command[1:])
           args.shut_down_logging = False
   
           setproctitle(f"airflow task supervisor: {command}")
   
           args.func(args)
           ret = 0
           return TaskInstanceState.SUCCESS
       except Exception as e:
           self.log.exception("Failed to execute task %s.", e)
           return TaskInstanceState.FAILED
       finally:
           Sentry.flush()
           logging.shutdown()
           os._exit(ret)
   ``` 
   
   This mirrors the logic in __main__.py:configure_internal_api() but applies 
it to the forked task supervisor process.
   
   Workaround
   
   As a temporary workaround, we've patched task_command.py:476-478 to skip the 
NullPool reconfiguration:
   ```
   if not InternalApiConfig.get_use_internal_api():
       # PATCH: Skip NullPool to avoid memory leak
       # Original: settings.reconfigure_orm(disable_connection_pool=True)
       pass  # Keep existing QueuePool from initial ORM configuration
   ```
   
   and configured SQLAlchemy connection pooling in airflow.cfg:
   ```
   [database]
   sql_alchemy_pool_enabled = True
   sql_alchemy_pool_size = 5
   sql_alchemy_max_overflow = 0
   sql_alchemy_pool_recycle = 3600
   sql_alchemy_pool_pre_ping = True
   ```
   
   This eliminates the memory leak but doesn't properly implement AIP-44.
   
   ### What you think should happen instead?
   
   When database_access_isolation=True and AIRFLOW_ENABLE_AIP_44=true:
   - Task supervisor processes should call 
InternalApiConfig.set_use_internal_api() during initialization
   - InternalApiConfig.get_use_internal_api() should return True at 
task_command.py:469
   - The ORM should NOT be reconfigured to use NullPool
   - Task supervisors should either:
     - Use the Internal API for database access, OR
     - Use QueuePool if direct database access is needed
   
   ### How to reproduce
   
   1. Set up Airflow 2.10.5 with PostgreSQL database
   2. Configure in airflow.cfg:
   ```
   [core]
   executor = LocalExecutor
   database_access_isolation = True
   ```
   3. Set environment variable:
   ```
   export AIRFLOW_ENABLE_AIP_44=true
   ```
   4. Start Airflow scheduler
   5. Trigger a DAG with a simple task
   6. Add debug logging to task_command.py:469:
   ```
   print(f"PID {os.getpid()}: get_use_internal_api() = 
{InternalApiConfig.get_use_internal_api()}")
   ```
   7. Observe that task supervisor processes print False
   
   ### Operating System
   
   Debian GNU/Linux 12 (bookworm)
   
   ### Versions of Apache Airflow Providers
   
   N/A
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   Environment:
   - Kubernetes
   - Executor: LocalExecutor
   - Python: 3.11+
   - Database: PostgreSQL with libpq 16
   
   ### Anything else?
   
   - Related to AIP-44: 
https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-44+Airflow+Internal+API
   - The memory leak with NullPool and libpq 16 is a separate issue but is 
triggered by this bug
   - Other executors may have similar issues if they fork processes without 
going through __main__.py
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to