SledgeHammer01 commented on code in PR #41073: URL: https://github.com/apache/airflow/pull/41073#discussion_r1696195558
########## airflow/hooks/subprocess.py: ########## @@ -61,41 +64,68 @@ def run_command( or stdout """ self.log.info("Tmp dir root location: %s", gettempdir()) - with contextlib.ExitStack() as stack: - if cwd is None: - cwd = stack.enter_context(TemporaryDirectory(prefix="airflowtmp")) - - def pre_exec(): - # Restore default signal disposition and invoke setsid - for sig in ("SIGPIPE", "SIGXFZ", "SIGXFSZ"): - if hasattr(signal, sig): - signal.signal(getattr(signal, sig), signal.SIG_DFL) - os.setsid() - - self.log.info("Running command: %s", command) - - self.sub_process = Popen( - command, - stdout=PIPE, - stderr=STDOUT, - cwd=cwd, - env=env if env or env == {} else os.environ, - preexec_fn=pre_exec, - ) - - self.log.info("Output:") - line = "" - if self.sub_process is None: - raise RuntimeError("The subprocess should be created here and is None!") - if self.sub_process.stdout is not None: - for raw_line in iter(self.sub_process.stdout.readline, b""): - line = raw_line.decode(output_encoding, errors="backslashreplace").rstrip() - self.log.info("%s", line) - - self.sub_process.wait() - - self.log.info("Command exited with return code %s", self.sub_process.returncode) - return_code: int = self.sub_process.returncode + + safe_cleanup = False + + try: + with contextlib.ExitStack() as stack: + if cwd is None: + # TemporaryDirectory will call shutil.rmtree() internally when the context exits. On + # Windows, shutil.rmtree() is unreliable and there is a race condition, where even after + # self.sub_process.wait(), the process will still be holding onto the directory causing + # an exception. The work-around is to call shutil.rmtree() in a retry loop. If we're not + # running under Windows, shutil.rmtree() is reliable and no retry loop is needed. + + cwd = stack.enter_context(TemporaryDirectory(prefix="airflowtmp")) + safe_cleanup = IS_WINDOWS + + def pre_exec(): + # Restore default signal disposition and invoke setsid + for sig in ("SIGPIPE", "SIGXFZ", "SIGXFSZ"): + if hasattr(signal, sig): + signal.signal(getattr(signal, sig), signal.SIG_DFL) + os.setsid() + + self.log.info("Running command: %s", command) + + self.sub_process = Popen( + command, + stdout=PIPE, + stderr=STDOUT, + cwd=cwd, + env=env if env or env == {} else os.environ, + preexec_fn=pre_exec if not IS_WINDOWS else None, + ) + + self.log.info("Output:") + line = "" + if self.sub_process is None: + raise RuntimeError("The subprocess should be created here and is None!") + if self.sub_process.stdout is not None: + for raw_line in iter(self.sub_process.stdout.readline, b""): + line = raw_line.decode(output_encoding, errors="backslashreplace").rstrip() + self.log.info("%s", line) + + self.sub_process.wait() + self.log.info("Command exited with return code %s", self.sub_process.returncode) + + return_code: int = self.sub_process.returncode Review Comment: @uranusjr I updated to a simplified approach. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org