uranusjr commented on a change in pull request #16110: URL: https://github.com/apache/airflow/pull/16110#discussion_r649381846
########## File path: airflow/utils/process_utils.py ########## @@ -79,7 +86,7 @@ def signal_procs(sig): else: raise - if pgid == os.getpgid(0): + if IS_WINDOWS and pgid == os.getpid() or not IS_WINDOWS and pgid == os.getpgid(0): Review comment: I think this needs some parentheses? At least for readability. ########## File path: setup.cfg ########## @@ -156,6 +156,9 @@ install_requires = typing-extensions>=3.7.4;python_version<"3.8" unicodecsv>=0.14.1 werkzeug~=1.0, >=1.0.1 + # needed for generating virtual python environments when running tasks + virtualenv>=20.4.3 + psycopg2>=2.8.6 Review comment: Why is psycopg needed? ########## File path: airflow/utils/process_utils.py ########## @@ -155,11 +162,13 @@ def execute_interactive(cmd: List[str], **kwargs): """ log.info("Executing cmd: %s", " ".join(shlex.quote(c) for c in cmd)) - old_tty = termios.tcgetattr(sys.stdin) - tty.setraw(sys.stdin.fileno()) + if not IS_WINDOWS: + old_tty = termios.tcgetattr(sys.stdin) + tty.setraw(sys.stdin.fileno()) + + # open pseudo-terminal to interact with subprocess + master_fd, slave_fd = pty.openpty() Review comment: I don’t think the function works after the patch (at least `slave_fd` would become undefined). Since this function is only used to run database commands, maybe we should mark this function as POSIX-only, and implement a separate function for Windows. (Would it be enough to simply brige stdin/out/err to the subprocess?) ########## File path: airflow/utils/timeout.py ########## @@ -31,20 +34,34 @@ def __init__(self, seconds=1, error_message='Timeout'): self.seconds = seconds self.error_message = error_message + ', PID: ' + str(os.getpid()) - def handle_timeout(self, signum, frame): # pylint: disable=unused-argument + def handle_timeout(self, *args): # pylint: disable=unused-argument """Logs information and raises AirflowTaskTimeout.""" self.log.error("Process timed out, PID: %s", str(os.getpid())) raise AirflowTaskTimeout(self.error_message) def __enter__(self): try: - signal.signal(signal.SIGALRM, self.handle_timeout) - signal.setitimer(signal.ITIMER_REAL, self.seconds) + if IS_WINDOWS: + if hasattr(self, TIMER_THREAD_ATTR) and getattr(self, TIMER_THREAD_ATTR) is not None: + getattr(self, TIMER_THREAD_ATTR).cancel() + timer = Timer(self.seconds, self.handle_timeout) + setattr(self, TIMER_THREAD_ATTR, timer) + timer.start() Review comment: The exception message below (about hte current context) doesn’t apply to the thread-based implementation, so we should move the `if IS_WINDOWS:` block out of `try:`. ########## File path: airflow/utils/timeout.py ########## @@ -31,20 +34,34 @@ def __init__(self, seconds=1, error_message='Timeout'): self.seconds = seconds self.error_message = error_message + ', PID: ' + str(os.getpid()) - def handle_timeout(self, signum, frame): # pylint: disable=unused-argument + def handle_timeout(self, *args): # pylint: disable=unused-argument """Logs information and raises AirflowTaskTimeout.""" self.log.error("Process timed out, PID: %s", str(os.getpid())) raise AirflowTaskTimeout(self.error_message) def __enter__(self): try: - signal.signal(signal.SIGALRM, self.handle_timeout) - signal.setitimer(signal.ITIMER_REAL, self.seconds) + if IS_WINDOWS: + if hasattr(self, TIMER_THREAD_ATTR) and getattr(self, TIMER_THREAD_ATTR) is not None: + getattr(self, TIMER_THREAD_ATTR).cancel() + timer = Timer(self.seconds, self.handle_timeout) + setattr(self, TIMER_THREAD_ATTR, timer) + timer.start() Review comment: BTW why do we need to use `has|get|set_attr` here? Why not put them on the instance instead, they would just be unused on POSIX. Or we can do something like ```python _timeout = ContextManager[None] class _timeout_windows(_timeout): ... # Implementation for Windows. class _timeout_posix(_timeout): ... # Implementation for POSIX. if IS_WINDOWS: timeout: Type[_timeout] = _timeout_windows else: timeout = _timeout_posix ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org