This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 29285d049f0 [#31403] Relax prism constraints to allow python wordcount 
to execute. (#31644)
29285d049f0 is described below

commit 29285d049f06968a07968e69dd69668510f20546
Author: Robert Burke <lostl...@users.noreply.github.com>
AuthorDate: Thu Jun 20 14:43:35 2024 -0700

    [#31403] Relax prism constraints to allow python wordcount to execute. 
(#31644)
    
    Co-authored-by: lostluck <13907733+lostl...@users.noreply.github.com>
---
 .../prism/internal/jobservices/management.go       | 67 +++++++++++++++++++---
 1 file changed, 60 insertions(+), 7 deletions(-)

diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go 
b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
index 3526ee00cc1..5760ce7871b 100644
--- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
+++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go
@@ -127,6 +127,9 @@ func (s *Server) Prepare(ctx context.Context, req 
*jobpb.PrepareJobRequest) (*jo
                        urns.TransformCombinePerKey,
                        urns.TransformCombineGlobally,      // Used by Java SDK
                        urns.TransformCombineGroupedValues, // Used by Java SDK
+                       urns.TransformMerge,                // Used directly by 
Python SDK if "pre-optimized"
+                       urns.TransformPreCombine,           // Used directly by 
Python SDK if "pre-optimized"
+                       urns.TransformExtract,              // Used directly by 
Python SDK if "pre-optimized"
                        urns.TransformAssignWindows:
                // Very few expected transforms types for submitted pipelines.
                // Most URNs are for the runner to communicate back to the SDK 
for execution.
@@ -165,12 +168,6 @@ func (s *Server) Prepare(ctx context.Context, req 
*jobpb.PrepareJobRequest) (*jo
 
                        check("OnWindowExpirationTimerFamily", 
pardo.GetOnWindowExpirationTimerFamilySpec(), "") // Unsupported for now.
 
-               case "":
-                       // Composites can often have no spec
-                       if len(t.GetSubtransforms()) > 0 {
-                               continue
-                       }
-                       fallthrough
                case urns.TransformTestStream:
                        var testStream pipepb.TestStreamPayload
                        if err := proto.Unmarshal(t.GetSpec().GetPayload(), 
&testStream); err != nil {
@@ -179,7 +176,15 @@ func (s *Server) Prepare(ctx context.Context, req 
*jobpb.PrepareJobRequest) (*jo
 
                        t.EnvironmentId = "" // Unset the environment, to 
ensure it's handled prism side.
                        testStreamIds = append(testStreamIds, tid)
+
                default:
+                       // Composites can often have some unknown urn, permit 
those.
+                       // Eg. The Python SDK has urns 
"beam:transform:generic_composite:v1", "beam:transform:pickled_python:v1", as 
well as the deprecated "beam:transform:read:v1",
+                       // but they are composites. Since we don't do anything 
special with the high level, we simply use their internal subgraph.
+                       if len(t.GetSubtransforms()) > 0 {
+                               continue
+                       }
+                       // But if not, fail.
                        check("PTransform.Spec.Urn", urn+" "+t.GetUniqueName(), 
"<doesn't exist>")
                }
        }
@@ -191,7 +196,8 @@ func (s *Server) Prepare(ctx context.Context, req 
*jobpb.PrepareJobRequest) (*jo
        // Inspect Windowing strategies for unsupported features.
        for wsID, ws := range 
job.Pipeline.GetComponents().GetWindowingStrategies() {
                check("WindowingStrategy.AllowedLateness", 
ws.GetAllowedLateness(), int64(0))
-               check("WindowingStrategy.ClosingBehaviour", 
ws.GetClosingBehavior(), pipepb.ClosingBehavior_EMIT_IF_NONEMPTY)
+               // Both Closing behaviors are identical without additional 
trigger firings.
+               check("WindowingStrategy.ClosingBehaviour", 
ws.GetClosingBehavior(), pipepb.ClosingBehavior_EMIT_IF_NONEMPTY, 
pipepb.ClosingBehavior_EMIT_ALWAYS)
                check("WindowingStrategy.AccumulationMode", 
ws.GetAccumulationMode(), pipepb.AccumulationMode_DISCARDING)
                if ws.GetWindowFn().GetUrn() != urns.WindowFnSession {
                        check("WindowingStrategy.MergeStatus", 
ws.GetMergeStatus(), pipepb.MergeStatus_NON_MERGING)
@@ -398,3 +404,50 @@ func (s *Server) GetState(_ context.Context, req 
*jobpb.GetJobStateRequest) (*jo
                Timestamp: timestamppb.New(j.stateTime),
        }, nil
 }
+
+// DescribePipelineOptions is a no-op since it's unclear how it is to function.
+// Apparently only implemented in the Python SDK.
+func (s *Server) DescribePipelineOptions(context.Context, 
*jobpb.DescribePipelineOptionsRequest) (*jobpb.DescribePipelineOptionsResponse, 
error) {
+       return &jobpb.DescribePipelineOptionsResponse{
+               Options: []*jobpb.PipelineOptionDescriptor{},
+       }, nil
+}
+
+// GetStateStream returns the job state as it changes.
+func (s *Server) GetStateStream(req *jobpb.GetJobStateRequest, stream 
jobpb.JobService_GetStateStreamServer) error {
+       s.mu.Lock()
+       job, ok := s.jobs[req.GetJobId()]
+       s.mu.Unlock()
+       if !ok {
+               return fmt.Errorf("job with id %v not found", req.GetJobId())
+       }
+
+       job.streamCond.L.Lock()
+       defer job.streamCond.L.Unlock()
+
+       state := job.state.Load().(jobpb.JobState_Enum)
+       for {
+               job.streamCond.L.Unlock()
+               stream.Send(&jobpb.JobStateEvent{
+                       State:     state,
+                       Timestamp: timestamppb.Now(),
+               })
+               job.streamCond.L.Lock()
+               switch state {
+               case jobpb.JobState_CANCELLED, jobpb.JobState_DONE, 
jobpb.JobState_DRAINED, jobpb.JobState_UPDATED, jobpb.JobState_FAILED:
+                       // Reached terminal state.
+                       return nil
+               }
+               newState := job.state.Load().(jobpb.JobState_Enum)
+               for state == newState {
+                       select { // Quit out if the external connection is done.
+                       case <-stream.Context().Done():
+                               return context.Cause(stream.Context())
+                       default:
+                       }
+                       job.streamCond.Wait()
+                       newState = job.state.Load().(jobpb.JobState_Enum)
+               }
+               state = newState
+       }
+}

Reply via email to