[GitHub] [airflow] ashb commented on a change in pull request #6627: [AIRFLOW-5931] Use os.fork when appropriate to speed up task execution.
ashb commented on a change in pull request #6627: [AIRFLOW-5931] Use os.fork when appropriate to speed up task execution. URL: https://github.com/apache/airflow/pull/6627#discussion_r351053566 ## File path: tests/dags/test_on_kill.py ## @@ -25,6 +25,11 @@ class DummyWithOnKill(DummyOperator): def execute(self, context): +import os Review comment: This wasn't an issue, I was just making doubley sure that the tests created more processes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6627: [AIRFLOW-5931] Use os.fork when appropriate to speed up task execution.
ashb commented on a change in pull request #6627: [AIRFLOW-5931] Use os.fork when appropriate to speed up task execution. URL: https://github.com/apache/airflow/pull/6627#discussion_r350315455 ## File path: airflow/task/task_runner/standard_task_runner.py ## @@ -17,28 +17,94 @@ # specific language governing permissions and limitations # under the License. +import os + import psutil +from setproctitle import setproctitle from airflow.task.task_runner.base_task_runner import BaseTaskRunner from airflow.utils.helpers import reap_process_group +CAN_FORK = hasattr(os, 'fork') + class StandardTaskRunner(BaseTaskRunner): """ Runs the raw Airflow task by invoking through the Bash shell. """ def __init__(self, local_task_job): super().__init__(local_task_job) +self._rc = None def start(self): -self.process = self.run_command() +if CAN_FORK and not self.run_as_user: +self.process = self._start_by_fork() +else: +self.process = self._start_by_exec() + +def _start_by_exec(self): +subprocess = self.run_command() +return psutil.Process(subprocess.pid) + +def _start_by_fork(self): +pid = os.fork() +if pid: +self.log.info("Started process %d to run task", pid) +return psutil.Process(pid) +else: +from airflow.bin.cli import get_parser +from airflow.logging_config import configure_logging +import signal +import airflow.settings as settings + +signal.signal(signal.SIGINT, signal.SIG_DFL) +signal.signal(signal.SIGTERM, signal.SIG_DFL) +# Start a new process group +os.setpgid(0, 0) + +configure_logging() + +# Force a new SQLAlchemy session. We can't share open DB handles +# between process. The cli code will re-create this as part of its +# normal startup +settings.engine.pool.dispose() +settings.engine.dispose() -def return_code(self): -return self.process.poll() +parser = get_parser() +args = parser.parse_args(self._command[1:]) Review comment: Yes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6627: [AIRFLOW-5931] Use os.fork when appropriate to speed up task execution.
ashb commented on a change in pull request #6627: [AIRFLOW-5931] Use os.fork when appropriate to speed up task execution. URL: https://github.com/apache/airflow/pull/6627#discussion_r349531258 ## File path: airflow/task/task_runner/standard_task_runner.py ## @@ -17,28 +17,89 @@ # specific language governing permissions and limitations # under the License. +import os + import psutil +from setproctitle import setproctitle from airflow.task.task_runner.base_task_runner import BaseTaskRunner from airflow.utils.helpers import reap_process_group +CAN_FORK = hasattr(os, 'fork') + class StandardTaskRunner(BaseTaskRunner): """ Runs the raw Airflow task by invoking through the Bash shell. """ def __init__(self, local_task_job): super().__init__(local_task_job) +self._rc = None def start(self): -self.process = self.run_command() +if CAN_FORK and not self.run_as_user: +self.process = self._start_by_fork() +else: +self.process = self._start_by_exec() + +def _start_by_exec(self): +subprocess = self.run_command() +return psutil.Process(subprocess.pid) + +def _start_by_fork(self): +pid = os.fork() +if pid: +self.log.info("Started process %d to run task", pid) +return psutil.Process(pid) +else: +from airflow.bin.cli import get_parser +import signal +import airflow.settings as settings + +signal.signal(signal.SIGINT, signal.SIG_DFL) +signal.signal(signal.SIGTERM, signal.SIG_DFL) +# Start a new process group +os.setpgid(0, 0) + +# Force a new SQLAlchemy session. We can't share open DB handles between process. +settings.engine.pool.dispose() +settings.engine.pool.recreate() + +parser = get_parser() +args = parser.parse_args(self._command[1:]) +setproctitle( +"airflow task runner: {0.dag_id} {0.task_id} {0.execution_date} {0.job_id}".format(args) +) +try: +args.func(args) +os._exit(0) +except Exception: +os._exit(1) + +def return_code(self, timeout=0): +# We call this multiple times, but we can only wait on the process once +if self._rc is not None or not self.process: +return self._rc + +try: +self._rc = self.process.wait(timeout=timeout) +self.process = None +except psutil.TimeoutExpired: +pass -def return_code(self): -return self.process.poll() +return self._rc def terminate(self): -if self.process and psutil.pid_exists(self.process.pid): -reap_process_group(self.process.pid, self.log) +if self.process: +if self.process.is_running(): +reap_process_group(self.process.pid, self.log) Review comment: This is just so that `os.pgkill` beahves itself, not for anything more than that. We've already got a Cgroup task runner (not that I know what state it is in!) Errr. This is not the on the line I thought it was. Yea, pgroups might not be perfect but this isn't a change to the existing behaviour. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6627: [AIRFLOW-5931] Use os.fork when appropriate to speed up task execution.
ashb commented on a change in pull request #6627: [AIRFLOW-5931] Use os.fork when appropriate to speed up task execution. URL: https://github.com/apache/airflow/pull/6627#discussion_r349531258 ## File path: airflow/task/task_runner/standard_task_runner.py ## @@ -17,28 +17,89 @@ # specific language governing permissions and limitations # under the License. +import os + import psutil +from setproctitle import setproctitle from airflow.task.task_runner.base_task_runner import BaseTaskRunner from airflow.utils.helpers import reap_process_group +CAN_FORK = hasattr(os, 'fork') + class StandardTaskRunner(BaseTaskRunner): """ Runs the raw Airflow task by invoking through the Bash shell. """ def __init__(self, local_task_job): super().__init__(local_task_job) +self._rc = None def start(self): -self.process = self.run_command() +if CAN_FORK and not self.run_as_user: +self.process = self._start_by_fork() +else: +self.process = self._start_by_exec() + +def _start_by_exec(self): +subprocess = self.run_command() +return psutil.Process(subprocess.pid) + +def _start_by_fork(self): +pid = os.fork() +if pid: +self.log.info("Started process %d to run task", pid) +return psutil.Process(pid) +else: +from airflow.bin.cli import get_parser +import signal +import airflow.settings as settings + +signal.signal(signal.SIGINT, signal.SIG_DFL) +signal.signal(signal.SIGTERM, signal.SIG_DFL) +# Start a new process group +os.setpgid(0, 0) + +# Force a new SQLAlchemy session. We can't share open DB handles between process. +settings.engine.pool.dispose() +settings.engine.pool.recreate() + +parser = get_parser() +args = parser.parse_args(self._command[1:]) +setproctitle( +"airflow task runner: {0.dag_id} {0.task_id} {0.execution_date} {0.job_id}".format(args) +) +try: +args.func(args) +os._exit(0) +except Exception: +os._exit(1) + +def return_code(self, timeout=0): +# We call this multiple times, but we can only wait on the process once +if self._rc is not None or not self.process: +return self._rc + +try: +self._rc = self.process.wait(timeout=timeout) +self.process = None +except psutil.TimeoutExpired: +pass -def return_code(self): -return self.process.poll() +return self._rc def terminate(self): -if self.process and psutil.pid_exists(self.process.pid): -reap_process_group(self.process.pid, self.log) +if self.process: +if self.process.is_running(): +reap_process_group(self.process.pid, self.log) Review comment: This is just so that `os.pgkill` beahves itself, not for anything more than that. We've already got a Cgroup task runner (not that I know what state it is in!) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [airflow] ashb commented on a change in pull request #6627: [AIRFLOW-5931] Use os.fork when appropriate to speed up task execution.
ashb commented on a change in pull request #6627: [AIRFLOW-5931] Use os.fork when appropriate to speed up task execution. URL: https://github.com/apache/airflow/pull/6627#discussion_r349217376 ## File path: airflow/task/task_runner/standard_task_runner.py ## @@ -17,28 +17,69 @@ # specific language governing permissions and limitations # under the License. +import os + import psutil +from setproctitle import setproctitle from airflow.task.task_runner.base_task_runner import BaseTaskRunner from airflow.utils.helpers import reap_process_group +CAN_FORK = hasattr(os, 'fork') + class StandardTaskRunner(BaseTaskRunner): """ Runs the raw Airflow task by invoking through the Bash shell. """ def __init__(self, local_task_job): super().__init__(local_task_job) +self._rc = None def start(self): -self.process = self.run_command() +if CAN_FORK and not self.run_as_user: +self.process = self._start_by_fork() +else: +self.process = self._start_by_exec() -def return_code(self): -return self.process.poll() +def _start_by_exec(self): +subprocess = self.run_command() +return psutil.Process(subprocess.pid) -def terminate(self): -if self.process and psutil.pid_exists(self.process.pid): -reap_process_group(self.process.pid, self.log) +def _start_by_fork(self): +pid = os.fork() +if pid: Review comment: Could do, but for python it doesn't matter so much - either it turns a pid, 0 in the child or throws an error (C can return 0, pid or -1 on error, but python converts that to an exception for us) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services