This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d6de9aa927 [Prism] Fix two kinds of errors when using json log 
format. (#36020)
7d6de9aa927 is described below

commit 7d6de9aa927bca566f00f7cddb373b9e92484be9
Author: Shunping Huang <[email protected]>
AuthorDate: Tue Sep 2 16:30:51 2025 -0400

    [Prism] Fix two kinds of errors when using json log format. (#36020)
    
    * Fix two kinds of errors when using json log format.
    
    * Refactor log filter logic. Override expansion service logger name.
    
    * Fix lints
    
    * Address the issues found by gemini.
---
 .../prism/internal/engine/elementmanager.go        | 23 +++++++++++++++++++++-
 .../beam/runners/prism/internal/environments.go    |  8 +++++++-
 sdks/go/pkg/beam/runners/prism/internal/execute.go |  6 +++++-
 sdks/go/pkg/beam/runners/prism/internal/stage.go   | 13 ++++++++++++
 .../apache_beam/runners/portability/job_server.py  |  7 ++++++-
 .../runners/portability/prism_runner.py            | 13 ++++++------
 sdks/python/apache_beam/transforms/external.py     |  3 ++-
 sdks/python/apache_beam/utils/subprocess_server.py | 23 ++++++++++++++++------
 8 files changed, 79 insertions(+), 17 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go 
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index 2ddd7bbc5c1..8c8b71ca414 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -89,6 +89,14 @@ type PColInfo struct {
        KeyDec      func(io.Reader) []byte
 }
 
+func (info PColInfo) LogValue() slog.Value {
+       return slog.GroupValue(
+               slog.String("GlobalID", info.GlobalID),
+               slog.String("WindowCoder", info.WindowCoder.String()),
+               // Do not attempt to log functions, or it will result in JSON 
marshaling error.
+       )
+}
+
 // WinCoderType indicates what kind of coder
 // the window is using. There are only 3
 // valid single window encodings.
@@ -110,6 +118,19 @@ const (
        WinCustom
 )
 
+func (wct WinCoderType) String() string {
+       switch wct {
+       case WinGlobal:
+               return "WinGlobal"
+       case WinInterval:
+               return "WinInterval"
+       case WinCustom:
+               return "WinCustom"
+       default:
+               return fmt.Sprintf("Unknown(%d)", wct)
+       }
+}
+
 // ToData recodes the elements with their approprate windowed value header.
 func (es elements) ToData(info PColInfo) [][]byte {
        var ret [][]byte
@@ -338,7 +359,7 @@ func (rb RunBundle) LogValue() slog.Value {
        return slog.GroupValue(
                slog.String("ID", rb.BundleID),
                slog.String("stage", rb.StageID),
-               slog.Time("watermark", rb.Watermark.ToTime()))
+               slog.Any("watermark", rb.Watermark))
 }
 
 // Bundles is the core execution loop. It produces a sequences of bundles able 
to be executed.
diff --git a/sdks/go/pkg/beam/runners/prism/internal/environments.go 
b/sdks/go/pkg/beam/runners/prism/internal/environments.go
index 4589d2a6ce6..1f852e0862f 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/environments.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/environments.go
@@ -24,6 +24,7 @@ import (
        "os"
        "os/exec"
        "slices"
+       "strconv"
        "time"
 
        fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
@@ -259,7 +260,12 @@ func dockerEnvironment(ctx context.Context, logger 
*slog.Logger, dp *pipepb.Dock
                                defer rc.Close()
                                var buf bytes.Buffer
                                stdcopy.StdCopy(&buf, &buf, rc)
-                               logger.Info("container being killed", 
slog.Any("cause", context.Cause(ctx)), slog.String("containerLog", 
buf.String()))
+                               logger.Info("container being killed", 
slog.Any("cause", context.Cause(ctx)))
+                               msgs, err := strconv.Unquote(buf.String())
+                               if err != nil {
+                                       msgs = buf.String()
+                               }
+                               logger.Debug("container log", "log", msgs)
                        }
                        // Can't use command context, since it's already 
canceled here.
                        if err := cli.ContainerKill(bgctx, containerID, ""); 
err != nil {
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go 
b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index 5b277923d29..e9edbe62c81 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -360,7 +360,11 @@ func executePipeline(ctx context.Context, wks 
map[string]*worker.W, j *jobservic
                case rb, ok := <-bundles:
                        if !ok {
                                err := eg.Wait()
-                               j.Logger.Debug("pipeline done!", 
slog.String("job", j.String()), slog.Any("error", err), slog.Any("topo", topo))
+                               var topoAttrs []any
+                               for _, s := range topo {
+                                       topoAttrs = append(topoAttrs, 
slog.Any(s.ID, s))
+                               }
+                               j.Logger.Debug("pipeline done!", 
slog.String("job", j.String()), slog.Any("error", err), slog.Group("topo", 
topoAttrs...))
                                return err
                        }
                        eg.Go(func() error {
diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go 
b/sdks/go/pkg/beam/runners/prism/internal/stage.go
index 97300cb1122..101d7a8dc0f 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/stage.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go
@@ -108,6 +108,19 @@ func clampTick(dur time.Duration) time.Duration {
        }
 }
 
+func (s *stage) LogValue() slog.Value {
+       var outAttrs []any
+       for k, v := range s.OutputsToCoders {
+               outAttrs = append(outAttrs, slog.Any(k, v))
+       }
+       return slog.GroupValue(
+               slog.String("ID", s.ID),
+               slog.Any("transforms", s.transforms),
+               slog.Any("inputInfo", s.inputInfo),
+               slog.Group("outputInfo", outAttrs...),
+       )
+}
+
 func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, 
comps *pipepb.Components, em *engine.ElementManager, rb engine.RunBundle) (err 
error) {
        if s.baseProgTick.Load() == nil {
                s.baseProgTick.Store(minimumProgTick)
diff --git a/sdks/python/apache_beam/runners/portability/job_server.py 
b/sdks/python/apache_beam/runners/portability/job_server.py
index f4163cf864b..0d98de6bdf3 100644
--- a/sdks/python/apache_beam/runners/portability/job_server.py
+++ b/sdks/python/apache_beam/runners/portability/job_server.py
@@ -18,6 +18,7 @@
 # pytype: skip-file
 
 import atexit
+import logging
 import shutil
 import signal
 import tempfile
@@ -102,6 +103,7 @@ class SubprocessJobServer(JobServer):
   def __init__(self):
     self._local_temp_root = None
     self._server = None
+    self._log_filter = None
 
   def subprocess_cmd_and_endpoint(self):
     raise NotImplementedError(type(self))
@@ -111,8 +113,11 @@ class SubprocessJobServer(JobServer):
       self._local_temp_root = tempfile.mkdtemp(prefix='beam-temp')
       cmd, endpoint = self.subprocess_cmd_and_endpoint()
       port = int(endpoint.split(':')[-1])
+      logger = logging.getLogger(f"{self.__class__.__name__}")
+      if self._log_filter is not None:
+        logger.addFilter(self._log_filter)
       self._server = subprocess_server.SubprocessServer(
-          beam_job_api_pb2_grpc.JobServiceStub, cmd, port=port)
+          beam_job_api_pb2_grpc.JobServiceStub, cmd, port=port, logger=logger)
     return self._server.start()
 
   def stop(self):
diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py 
b/sdks/python/apache_beam/runners/portability/prism_runner.py
index 50ecdb1252e..db9ca4110ac 100644
--- a/sdks/python/apache_beam/runners/portability/prism_runner.py
+++ b/sdks/python/apache_beam/runners/portability/prism_runner.py
@@ -44,7 +44,6 @@ from apache_beam.runners.portability import portable_runner
 from apache_beam.transforms import environments
 from apache_beam.utils import shared
 from apache_beam.utils import subprocess_server
-from apache_beam.utils.subprocess_server import _LOGGER as 
subprocess_server_logger
 from apache_beam.version import __version__ as beam_version
 
 # pytype: skip-file
@@ -120,9 +119,7 @@ class PrismRunnerLogFilter(logging.Filter):
   def filter(self, record):
     if record.funcName == 'log_stdout':
       try:
-        # TODO: Fix this error message from prism
-        message = record.getMessage().replace(
-            '"!ERROR:time.Time year outside of range [0,9999]"', '')
+        message = record.getMessage()
         json_record = json.loads(message)
         record.levelno = getattr(logging, json_record["level"])
         record.levelname = logging.getLevelName(record.levelno)
@@ -148,7 +145,11 @@ class PrismRunnerLogFilter(logging.Filter):
           record.msg = (
               f"{json_record['msg']} "
               f"({', '.join(f'{k}={v!r}' for k, v in extras.items())})")
-      except (json.JSONDecodeError, KeyError, ValueError):
+      except (json.JSONDecodeError,
+              KeyError,
+              ValueError,
+              TypeError,
+              AttributeError):
         # The log parsing/filtering is best-effort.
         pass
 
@@ -181,7 +182,7 @@ class PrismJobServer(job_server.SubprocessJobServer):
     # override console to json with log filter enabled
     if self._log_kind == "console":
       self._log_kind = "json"
-      subprocess_server_logger.addFilter(PrismRunnerLogFilter())
+      self._log_filter = PrismRunnerLogFilter()
 
   # the method is only kept for testing and backward compatibility
   @classmethod
diff --git a/sdks/python/apache_beam/transforms/external.py 
b/sdks/python/apache_beam/transforms/external.py
index f0b69a047b7..782fa3d030b 100644
--- a/sdks/python/apache_beam/transforms/external.py
+++ b/sdks/python/apache_beam/transforms/external.py
@@ -1097,7 +1097,8 @@ class JavaJarExpansionService(object):
           ExpansionAndArtifactRetrievalStub,
           self.path_to_jar,
           self._extra_args,
-          classpath=classpath_urls)
+          classpath=classpath_urls,
+          logger="ExpansionService")
       self._service = self._service_provider.__enter__()
     self._service_count += 1
     return self._service
diff --git a/sdks/python/apache_beam/utils/subprocess_server.py 
b/sdks/python/apache_beam/utils/subprocess_server.py
index babe81d6bde..84848479430 100644
--- a/sdks/python/apache_beam/utils/subprocess_server.py
+++ b/sdks/python/apache_beam/utils/subprocess_server.py
@@ -132,7 +132,7 @@ class SubprocessServer(object):
       with SubprocessServer(GrpcStubClass, [executable, arg, ...]) as stub:
           stub.CallService(...)
   """
-  def __init__(self, stub_class, cmd, port=None):
+  def __init__(self, stub_class, cmd, port=None, logger=None):
     """Creates the server object.
 
     :param stub_class: the auto-generated GRPC client stub class used for
@@ -143,12 +143,21 @@ class SubprocessServer(object):
         service.  If not given, one will be randomly chosen and the special
         string "{{PORT}}" will be substituted in the command line arguments
         with the chosen port.
+    :param logger: (optional) The logger or logger name to use for the
+        subprocess's stderr and stdout. If not given, the current module logger
+        would be used.
     """
     self._owner_id = None
     self._stub_class = stub_class
     self._cmd = [str(arg) for arg in cmd]
     self._port = port
     self._grpc_channel = None
+    if isinstance(logger, str):
+      self._logger = logging.getLogger(logger)
+    elif isinstance(logger, logging.Logger):
+      self._logger = logger
+    else:
+      self._logger = _LOGGER
 
   @classmethod
   @contextlib.contextmanager
@@ -203,9 +212,9 @@ class SubprocessServer(object):
     if self._owner_id is not None:
       self._cache.purge(self._owner_id)
     self._owner_id = self._cache.register()
-    return self._cache.get(tuple(self._cmd), self._port)
+    return self._cache.get(tuple(self._cmd), self._port, self._logger)
 
-  def _really_start_process(cmd, port):
+  def _really_start_process(cmd, port, logger):
     if not port:
       port, = pick_port(None)
       cmd = [arg.replace('{{PORT}}', str(port)) for arg in cmd]  # pylint: 
disable=not-an-iterable
@@ -220,7 +229,7 @@ class SubprocessServer(object):
       while line:
         # The log obtained from stdout is bytes, decode it into string.
         # Remove newline via rstrip() to not print an empty line.
-        _LOGGER.info(line.decode(errors='backslashreplace').rstrip())
+        logger.info(line.decode(errors='backslashreplace').rstrip())
         line = process.stdout.readline()
 
     t = threading.Thread(target=log_stdout)
@@ -283,7 +292,8 @@ class JavaJarServer(SubprocessServer):
       path_to_jar,
       java_arguments,
       classpath=None,
-      cache_dir=None):
+      cache_dir=None,
+      logger=None):
     self._java_path = JavaHelper.get_java()
     if classpath:
       # java -jar ignores the classpath, so we make a new jar that embeds
@@ -291,7 +301,8 @@ class JavaJarServer(SubprocessServer):
       path_to_jar = self.make_classpath_jar(path_to_jar, classpath, cache_dir)
     super().__init__(
         stub_class,
-        [self._java_path, '-jar', path_to_jar] + list(java_arguments))
+        [self._java_path, '-jar', path_to_jar] + list(java_arguments),
+        logger=logger)
     self._existing_service = path_to_jar if is_service_endpoint(
         path_to_jar) else None
 

Reply via email to