This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3ea5e1f768d Set keepalive policy for prism grpc server. (#37021)
3ea5e1f768d is described below
commit 3ea5e1f768dbb70e6380e34ce5e40830cd3ff102
Author: Shunping Huang <[email protected]>
AuthorDate: Mon Dec 8 13:57:26 2025 -0500
Set keepalive policy for prism grpc server. (#37021)
---
sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go | 5 +++++
1 file changed, 5 insertions(+)
diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go
index f7d6ba5ad36..2399fd726da 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/server.go
@@ -30,6 +30,7 @@ import (
jobpb
"github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker"
"google.golang.org/grpc"
+ "google.golang.org/grpc/keepalive"
)
type Server struct {
@@ -80,6 +81,10 @@ func NewServer(port int, execute func(*Job)) *Server {
s.logger.Info("Serving JobManagement", slog.String("endpoint",
s.Endpoint()))
opts := []grpc.ServerOption{
grpc.MaxRecvMsgSize(math.MaxInt32),
+ grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
+ MinTime: 20 * time.Second, // Minimum
duration a client should wait before sending a keepalive ping
+ PermitWithoutStream: true, // Allow pings
even if there are no active streams
+ }),
}
s.server = grpc.NewServer(opts...)
jobpb.RegisterJobServiceServer(s.server, s)