This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c2e235a016 Remove subprocess.PIPE usage by using a temp file (#22654)
0c2e235a016 is described below

commit 0c2e235a0160df70d1be896471e42aa2a2d1fee1
Author: Chamikara Jayalath <chamikar...@gmail.com>
AuthorDate: Wed Aug 10 17:52:42 2022 -0700

    Remove subprocess.PIPE usage by using a temp file (#22654)
    
    * Remove subprocess.PIPE usage by using a temp file
    
    * Remove context manager usage
    
    * Fix yapf
---
 sdks/python/apache_beam/utils/subprocess_server.py | 29 ++++++++++++++++------
 1 file changed, 22 insertions(+), 7 deletions(-)

diff --git a/sdks/python/apache_beam/utils/subprocess_server.py 
b/sdks/python/apache_beam/utils/subprocess_server.py
index 769a5f4fe05..e11c8ffd0e9 100644
--- a/sdks/python/apache_beam/utils/subprocess_server.py
+++ b/sdks/python/apache_beam/utils/subprocess_server.py
@@ -114,17 +114,25 @@ class SubprocessServer(object):
         cmd = [arg.replace('{{PORT}}', str(port)) for arg in self._cmd]
       endpoint = 'localhost:%s' % port
       _LOGGER.info("Starting service with %s", str(cmd).replace("',", "'"))
+
+      stdout_file = tempfile.NamedTemporaryFile(delete=False)
+      self._stdout_file_name = stdout_file.name
       self._process = subprocess.Popen(
-          cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
+          cmd, stdout=stdout_file, stderr=subprocess.STDOUT)
 
       # Emit the output of this command as info level logging.
       def log_stdout():
-        line = self._process.stdout.readline()
-        while line:
-          # The log obtained from stdout is bytes, decode it into string.
-          # Remove newline via rstrip() to not print an empty line.
-          _LOGGER.info(line.decode(errors='backslashreplace').rstrip())
-          line = self._process.stdout.readline()
+        stdout_file = open(self._stdout_file_name, 'rb')
+        # Tailing the file forever.
+        while True:
+          line = stdout_file.readline()
+          if line:
+            # The log obtained from stdout is bytes, decode it into string.
+            # Remove newline via rstrip() to not print an empty line.
+            _LOGGER.info(line.decode(errors='backslashreplace').rstrip())
+          else:
+            # Will try again to read after a small sleep.
+            time.sleep(0.1)
 
       t = threading.Thread(target=log_stdout)
       t.daemon = True
@@ -147,6 +155,13 @@ class SubprocessServer(object):
       if self._process.poll() is None:
         self._process.kill()
       self._process = None
+      if self._stdout_file_name and os.path.exists(self._stdout_file_name):
+        try:
+          os.unlink(self._stdout_file_name)
+        except Exception as e:
+          logging.error((
+              'Could not remove temporary file %s due to %r' %
+              (self._stdout_file_name, e)))
 
   def local_temp_dir(self, **kwargs):
     return tempfile.mkdtemp(dir=self._local_temp_root, **kwargs)

Reply via email to