shunping opened a new issue, #36145:
URL: https://github.com/apache/beam/issues/36145

   ### What happened?
   
   Trying to enable TriggerAlways primitive integration test 
(https://github.com/apache/beam/blob/6077034b7337283fc2f2467c6455ef7b308379c8/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go#L52),
 but it is not successful.
   
   I am able to reproduce it with a standalone code
   ```
   package main
   
   import (
        "context"
        "log"
        "time"
   
        "github.com/apache/beam/sdks/v2/go/pkg/beam"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window/trigger"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/teststream"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
   )
   
   func init() {}
   
   func main() {
        beam.Init()
   
        ctx := context.Background()
        pipeline, s := beam.NewPipelineWithRoot()
   
        con := teststream.NewConfig()
        con.AddElements(1000, 1.0, 2.0, 3.0)
        con.AdvanceWatermark(11000)
        col := teststream.Create(s, con)
        windowSize := 10 * time.Second
        wfn := window.NewFixedWindows(windowSize)
        windowed := beam.WindowInto(s.Scope("Fixed"), wfn, col, 
beam.Trigger(trigger.Always()))
   
        sums := stats.Sum(s, windowed)
        debug.Print(s, sums)
   
        if err := beamx.Run(ctx, pipeline); err != nil {
                log.Fatalf("Failed to execute job: %v", err)
        }
   }
   
   ```
   
   The stats.Sum should sum on three window panes separately, since 
trigger.Always() is used. However, the result returns 6.
   
   ```
   2025/09/14 15:28:17 INFO log from SDK worker 
worker.ID=job-001[go-job-1-1757878097559282000]_go 
worker.endpoint=localhost:53202 sdk.transformID=e7 
sdk.location=.../sdks/go/pkg/beam/x/debug/print.go:54 
sdk.time=2025-09-14T19:28:17.572Z sdk.msg="Elm: 6"
   ```
   
   I changed `stats.Sum` to GroupByKey, and I can three three panes.
   
   ```
   package main
   
   import (
        "context"
        "log"
        "time"
   
        "github.com/apache/beam/sdks/v2/go/pkg/beam"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window/trigger"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/teststream"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
        "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
   )
   
   func init() {}
   
   func main() {
        beam.Init()
   
        ctx := context.Background()
        pipeline, s := beam.NewPipelineWithRoot()
   
        con := teststream.NewConfig()
        con.AddElements(1000, 1.0, 2.0, 3.0)
        con.AdvanceWatermark(11000)
        col := teststream.Create(s, con)
        windowSize := 10 * time.Second
        wfn := window.NewFixedWindows(windowSize)
        windowed := beam.WindowInto(s.Scope("Fixed"), wfn, col, 
beam.Trigger(trigger.Always()))
   
        kv := beam.ParDo(s, func(element float64) (string, float64) {
                return "0", element
        }, windowed)
   
        gbk := beam.GroupByKey(s, kv)
        debug.Print(s, gbk)
   
        if err := beamx.Run(ctx, pipeline); err != nil {
                log.Fatalf("Failed to execute job: %v", err)
        }
   }
   ```
   
   ```
   2025/09/14 15:32:38 INFO log from SDK worker 
worker.ID=job-001[go-job-1-1757878358434257000]_go 
worker.endpoint=localhost:53506 sdk.transformID=e5 
sdk.location=.../sdks/go/pkg/beam/x/debug/print.go:77 
sdk.time=2025-09-14T19:32:38.500Z sdk.msg="Elm: (0,[2])"
   2025/09/14 15:32:38 INFO log from SDK worker 
worker.ID=job-001[go-job-1-1757878358434257000]_go 
worker.endpoint=localhost:53506 sdk.transformID=e5 
sdk.location=.../sdks/go/pkg/beam/x/debug/print.go:77 
sdk.time=2025-09-14T19:32:38.500Z sdk.msg="Elm: (0,[1])"
   2025/09/14 15:32:38 INFO log from SDK worker 
worker.ID=job-001[go-job-1-1757878358434257000]_go 
worker.endpoint=localhost:53506 sdk.transformID=e5 
sdk.location=.../sdks/go/pkg/beam/x/debug/print.go:77 
sdk.time=2025-09-14T19:32:38.500Z sdk.msg="Elm: (0,[3])"
   ```
   
   ### Issue Failure
   
   Failure: Test is flaky
   
   ### Issue Priority
   
   Priority: 1 (unhealthy code / failing or flaky postcommit so we cannot be 
sure the product is healthy)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Infrastructure
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to