This is an automated email from the ASF dual-hosted git repository. chamikara pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 0c2e235a016 Remove subprocess.PIPE usage by using a temp file (#22654) 0c2e235a016 is described below commit 0c2e235a0160df70d1be896471e42aa2a2d1fee1 Author: Chamikara Jayalath <chamikar...@gmail.com> AuthorDate: Wed Aug 10 17:52:42 2022 -0700 Remove subprocess.PIPE usage by using a temp file (#22654) * Remove subprocess.PIPE usage by using a temp file * Remove context manager usage * Fix yapf --- sdks/python/apache_beam/utils/subprocess_server.py | 29 ++++++++++++++++------ 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/utils/subprocess_server.py b/sdks/python/apache_beam/utils/subprocess_server.py index 769a5f4fe05..e11c8ffd0e9 100644 --- a/sdks/python/apache_beam/utils/subprocess_server.py +++ b/sdks/python/apache_beam/utils/subprocess_server.py @@ -114,17 +114,25 @@ class SubprocessServer(object): cmd = [arg.replace('{{PORT}}', str(port)) for arg in self._cmd] endpoint = 'localhost:%s' % port _LOGGER.info("Starting service with %s", str(cmd).replace("',", "'")) + + stdout_file = tempfile.NamedTemporaryFile(delete=False) + self._stdout_file_name = stdout_file.name self._process = subprocess.Popen( - cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + cmd, stdout=stdout_file, stderr=subprocess.STDOUT) # Emit the output of this command as info level logging. def log_stdout(): - line = self._process.stdout.readline() - while line: - # The log obtained from stdout is bytes, decode it into string. - # Remove newline via rstrip() to not print an empty line. - _LOGGER.info(line.decode(errors='backslashreplace').rstrip()) - line = self._process.stdout.readline() + stdout_file = open(self._stdout_file_name, 'rb') + # Tailing the file forever. + while True: + line = stdout_file.readline() + if line: + # The log obtained from stdout is bytes, decode it into string. + # Remove newline via rstrip() to not print an empty line. + _LOGGER.info(line.decode(errors='backslashreplace').rstrip()) + else: + # Will try again to read after a small sleep. + time.sleep(0.1) t = threading.Thread(target=log_stdout) t.daemon = True @@ -147,6 +155,13 @@ class SubprocessServer(object): if self._process.poll() is None: self._process.kill() self._process = None + if self._stdout_file_name and os.path.exists(self._stdout_file_name): + try: + os.unlink(self._stdout_file_name) + except Exception as e: + logging.error(( + 'Could not remove temporary file %s due to %r' % + (self._stdout_file_name, e))) def local_temp_dir(self, **kwargs): return tempfile.mkdtemp(dir=self._local_temp_root, **kwargs)