This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7d6de9aa927 [Prism] Fix two kinds of errors when using json log
format. (#36020)
7d6de9aa927 is described below
commit 7d6de9aa927bca566f00f7cddb373b9e92484be9
Author: Shunping Huang <[email protected]>
AuthorDate: Tue Sep 2 16:30:51 2025 -0400
[Prism] Fix two kinds of errors when using json log format. (#36020)
* Fix two kinds of errors when using json log format.
* Refactor log filter logic. Override expansion service logger name.
* Fix lints
* Address the issues found by gemini.
---
.../prism/internal/engine/elementmanager.go | 23 +++++++++++++++++++++-
.../beam/runners/prism/internal/environments.go | 8 +++++++-
sdks/go/pkg/beam/runners/prism/internal/execute.go | 6 +++++-
sdks/go/pkg/beam/runners/prism/internal/stage.go | 13 ++++++++++++
.../apache_beam/runners/portability/job_server.py | 7 ++++++-
.../runners/portability/prism_runner.py | 13 ++++++------
sdks/python/apache_beam/transforms/external.py | 3 ++-
sdks/python/apache_beam/utils/subprocess_server.py | 23 ++++++++++++++++------
8 files changed, 79 insertions(+), 17 deletions(-)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
index 2ddd7bbc5c1..8c8b71ca414 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
@@ -89,6 +89,14 @@ type PColInfo struct {
KeyDec func(io.Reader) []byte
}
+func (info PColInfo) LogValue() slog.Value {
+ return slog.GroupValue(
+ slog.String("GlobalID", info.GlobalID),
+ slog.String("WindowCoder", info.WindowCoder.String()),
+ // Do not attempt to log functions, or it will result in JSON
marshaling error.
+ )
+}
+
// WinCoderType indicates what kind of coder
// the window is using. There are only 3
// valid single window encodings.
@@ -110,6 +118,19 @@ const (
WinCustom
)
+func (wct WinCoderType) String() string {
+ switch wct {
+ case WinGlobal:
+ return "WinGlobal"
+ case WinInterval:
+ return "WinInterval"
+ case WinCustom:
+ return "WinCustom"
+ default:
+ return fmt.Sprintf("Unknown(%d)", wct)
+ }
+}
+
// ToData recodes the elements with their approprate windowed value header.
func (es elements) ToData(info PColInfo) [][]byte {
var ret [][]byte
@@ -338,7 +359,7 @@ func (rb RunBundle) LogValue() slog.Value {
return slog.GroupValue(
slog.String("ID", rb.BundleID),
slog.String("stage", rb.StageID),
- slog.Time("watermark", rb.Watermark.ToTime()))
+ slog.Any("watermark", rb.Watermark))
}
// Bundles is the core execution loop. It produces a sequences of bundles able
to be executed.
diff --git a/sdks/go/pkg/beam/runners/prism/internal/environments.go
b/sdks/go/pkg/beam/runners/prism/internal/environments.go
index 4589d2a6ce6..1f852e0862f 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/environments.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/environments.go
@@ -24,6 +24,7 @@ import (
"os"
"os/exec"
"slices"
+ "strconv"
"time"
fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
@@ -259,7 +260,12 @@ func dockerEnvironment(ctx context.Context, logger
*slog.Logger, dp *pipepb.Dock
defer rc.Close()
var buf bytes.Buffer
stdcopy.StdCopy(&buf, &buf, rc)
- logger.Info("container being killed",
slog.Any("cause", context.Cause(ctx)), slog.String("containerLog",
buf.String()))
+ logger.Info("container being killed",
slog.Any("cause", context.Cause(ctx)))
+ msgs, err := strconv.Unquote(buf.String())
+ if err != nil {
+ msgs = buf.String()
+ }
+ logger.Debug("container log", "log", msgs)
}
// Can't use command context, since it's already
canceled here.
if err := cli.ContainerKill(bgctx, containerID, "");
err != nil {
diff --git a/sdks/go/pkg/beam/runners/prism/internal/execute.go
b/sdks/go/pkg/beam/runners/prism/internal/execute.go
index 5b277923d29..e9edbe62c81 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/execute.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/execute.go
@@ -360,7 +360,11 @@ func executePipeline(ctx context.Context, wks
map[string]*worker.W, j *jobservic
case rb, ok := <-bundles:
if !ok {
err := eg.Wait()
- j.Logger.Debug("pipeline done!",
slog.String("job", j.String()), slog.Any("error", err), slog.Any("topo", topo))
+ var topoAttrs []any
+ for _, s := range topo {
+ topoAttrs = append(topoAttrs,
slog.Any(s.ID, s))
+ }
+ j.Logger.Debug("pipeline done!",
slog.String("job", j.String()), slog.Any("error", err), slog.Group("topo",
topoAttrs...))
return err
}
eg.Go(func() error {
diff --git a/sdks/go/pkg/beam/runners/prism/internal/stage.go
b/sdks/go/pkg/beam/runners/prism/internal/stage.go
index 97300cb1122..101d7a8dc0f 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/stage.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/stage.go
@@ -108,6 +108,19 @@ func clampTick(dur time.Duration) time.Duration {
}
}
+func (s *stage) LogValue() slog.Value {
+ var outAttrs []any
+ for k, v := range s.OutputsToCoders {
+ outAttrs = append(outAttrs, slog.Any(k, v))
+ }
+ return slog.GroupValue(
+ slog.String("ID", s.ID),
+ slog.Any("transforms", s.transforms),
+ slog.Any("inputInfo", s.inputInfo),
+ slog.Group("outputInfo", outAttrs...),
+ )
+}
+
func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W,
comps *pipepb.Components, em *engine.ElementManager, rb engine.RunBundle) (err
error) {
if s.baseProgTick.Load() == nil {
s.baseProgTick.Store(minimumProgTick)
diff --git a/sdks/python/apache_beam/runners/portability/job_server.py
b/sdks/python/apache_beam/runners/portability/job_server.py
index f4163cf864b..0d98de6bdf3 100644
--- a/sdks/python/apache_beam/runners/portability/job_server.py
+++ b/sdks/python/apache_beam/runners/portability/job_server.py
@@ -18,6 +18,7 @@
# pytype: skip-file
import atexit
+import logging
import shutil
import signal
import tempfile
@@ -102,6 +103,7 @@ class SubprocessJobServer(JobServer):
def __init__(self):
self._local_temp_root = None
self._server = None
+ self._log_filter = None
def subprocess_cmd_and_endpoint(self):
raise NotImplementedError(type(self))
@@ -111,8 +113,11 @@ class SubprocessJobServer(JobServer):
self._local_temp_root = tempfile.mkdtemp(prefix='beam-temp')
cmd, endpoint = self.subprocess_cmd_and_endpoint()
port = int(endpoint.split(':')[-1])
+ logger = logging.getLogger(f"{self.__class__.__name__}")
+ if self._log_filter is not None:
+ logger.addFilter(self._log_filter)
self._server = subprocess_server.SubprocessServer(
- beam_job_api_pb2_grpc.JobServiceStub, cmd, port=port)
+ beam_job_api_pb2_grpc.JobServiceStub, cmd, port=port, logger=logger)
return self._server.start()
def stop(self):
diff --git a/sdks/python/apache_beam/runners/portability/prism_runner.py
b/sdks/python/apache_beam/runners/portability/prism_runner.py
index 50ecdb1252e..db9ca4110ac 100644
--- a/sdks/python/apache_beam/runners/portability/prism_runner.py
+++ b/sdks/python/apache_beam/runners/portability/prism_runner.py
@@ -44,7 +44,6 @@ from apache_beam.runners.portability import portable_runner
from apache_beam.transforms import environments
from apache_beam.utils import shared
from apache_beam.utils import subprocess_server
-from apache_beam.utils.subprocess_server import _LOGGER as
subprocess_server_logger
from apache_beam.version import __version__ as beam_version
# pytype: skip-file
@@ -120,9 +119,7 @@ class PrismRunnerLogFilter(logging.Filter):
def filter(self, record):
if record.funcName == 'log_stdout':
try:
- # TODO: Fix this error message from prism
- message = record.getMessage().replace(
- '"!ERROR:time.Time year outside of range [0,9999]"', '')
+ message = record.getMessage()
json_record = json.loads(message)
record.levelno = getattr(logging, json_record["level"])
record.levelname = logging.getLevelName(record.levelno)
@@ -148,7 +145,11 @@ class PrismRunnerLogFilter(logging.Filter):
record.msg = (
f"{json_record['msg']} "
f"({', '.join(f'{k}={v!r}' for k, v in extras.items())})")
- except (json.JSONDecodeError, KeyError, ValueError):
+ except (json.JSONDecodeError,
+ KeyError,
+ ValueError,
+ TypeError,
+ AttributeError):
# The log parsing/filtering is best-effort.
pass
@@ -181,7 +182,7 @@ class PrismJobServer(job_server.SubprocessJobServer):
# override console to json with log filter enabled
if self._log_kind == "console":
self._log_kind = "json"
- subprocess_server_logger.addFilter(PrismRunnerLogFilter())
+ self._log_filter = PrismRunnerLogFilter()
# the method is only kept for testing and backward compatibility
@classmethod
diff --git a/sdks/python/apache_beam/transforms/external.py
b/sdks/python/apache_beam/transforms/external.py
index f0b69a047b7..782fa3d030b 100644
--- a/sdks/python/apache_beam/transforms/external.py
+++ b/sdks/python/apache_beam/transforms/external.py
@@ -1097,7 +1097,8 @@ class JavaJarExpansionService(object):
ExpansionAndArtifactRetrievalStub,
self.path_to_jar,
self._extra_args,
- classpath=classpath_urls)
+ classpath=classpath_urls,
+ logger="ExpansionService")
self._service = self._service_provider.__enter__()
self._service_count += 1
return self._service
diff --git a/sdks/python/apache_beam/utils/subprocess_server.py
b/sdks/python/apache_beam/utils/subprocess_server.py
index babe81d6bde..84848479430 100644
--- a/sdks/python/apache_beam/utils/subprocess_server.py
+++ b/sdks/python/apache_beam/utils/subprocess_server.py
@@ -132,7 +132,7 @@ class SubprocessServer(object):
with SubprocessServer(GrpcStubClass, [executable, arg, ...]) as stub:
stub.CallService(...)
"""
- def __init__(self, stub_class, cmd, port=None):
+ def __init__(self, stub_class, cmd, port=None, logger=None):
"""Creates the server object.
:param stub_class: the auto-generated GRPC client stub class used for
@@ -143,12 +143,21 @@ class SubprocessServer(object):
service. If not given, one will be randomly chosen and the special
string "{{PORT}}" will be substituted in the command line arguments
with the chosen port.
+ :param logger: (optional) The logger or logger name to use for the
+ subprocess's stderr and stdout. If not given, the current module logger
+ would be used.
"""
self._owner_id = None
self._stub_class = stub_class
self._cmd = [str(arg) for arg in cmd]
self._port = port
self._grpc_channel = None
+ if isinstance(logger, str):
+ self._logger = logging.getLogger(logger)
+ elif isinstance(logger, logging.Logger):
+ self._logger = logger
+ else:
+ self._logger = _LOGGER
@classmethod
@contextlib.contextmanager
@@ -203,9 +212,9 @@ class SubprocessServer(object):
if self._owner_id is not None:
self._cache.purge(self._owner_id)
self._owner_id = self._cache.register()
- return self._cache.get(tuple(self._cmd), self._port)
+ return self._cache.get(tuple(self._cmd), self._port, self._logger)
- def _really_start_process(cmd, port):
+ def _really_start_process(cmd, port, logger):
if not port:
port, = pick_port(None)
cmd = [arg.replace('{{PORT}}', str(port)) for arg in cmd] # pylint:
disable=not-an-iterable
@@ -220,7 +229,7 @@ class SubprocessServer(object):
while line:
# The log obtained from stdout is bytes, decode it into string.
# Remove newline via rstrip() to not print an empty line.
- _LOGGER.info(line.decode(errors='backslashreplace').rstrip())
+ logger.info(line.decode(errors='backslashreplace').rstrip())
line = process.stdout.readline()
t = threading.Thread(target=log_stdout)
@@ -283,7 +292,8 @@ class JavaJarServer(SubprocessServer):
path_to_jar,
java_arguments,
classpath=None,
- cache_dir=None):
+ cache_dir=None,
+ logger=None):
self._java_path = JavaHelper.get_java()
if classpath:
# java -jar ignores the classpath, so we make a new jar that embeds
@@ -291,7 +301,8 @@ class JavaJarServer(SubprocessServer):
path_to_jar = self.make_classpath_jar(path_to_jar, classpath, cache_dir)
super().__init__(
stub_class,
- [self._java_path, '-jar', path_to_jar] + list(java_arguments))
+ [self._java_path, '-jar', path_to_jar] + list(java_arguments),
+ logger=logger)
self._existing_service = path_to_jar if is_service_endpoint(
path_to_jar) else None