shunping commented on issue #33815:
URL: https://github.com/apache/beam/issues/33815#issuecomment-2844032012
Ok. I have a simplified pipeline that has only one PeriodicImpulse and no
Flatten. Note that I need a Reshuffle to start a new stage so that the pending
watermark issue can be seen.
```go
package main
import (
"context"
"log"
"math"
"time"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
beamLog "github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/periodic"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)
func init() {
register.Function3x0(count)
register.Function3x0(initCount)
register.Emitter1[int]()
}
func initCount(ctx context.Context, _ []byte, emit func(int)) {
// beamLog.Info(ctx, "Starting count")
emit(1)
}
func count(ctx context.Context, currCount int, emit func(int)) {
beamLog.Infof(ctx, "Current Count = %d", currCount)
emit(currCount + 1)
}
func main() {
beam.Init()
ctx := context.Background()
pipeline, scope := beam.NewPipelineWithRoot()
duration := 5 * time.Second
unboundedSource := periodic.Impulse(scope,
time.UnixMilli(math.MaxInt32), time.UnixMilli(math.MaxInt64), duration, false)
trigger := unboundedSource
s1 := scope.Scope("source")
c1 := beam.ParDo(s1, initCount, trigger)
sr := scope.Scope("reshuffle")
r := beam.Reshuffle(sr, c1)
s2 := scope.Scope("count")
beam.ParDo(s2, count, r)
if err := beamx.Run(ctx, pipeline); err != nil {
log.Fatalf("Failed to execute job: %v", err)
}
}
```
Basically, the problem is no longer about using flatten on unbounded sources
(as suggested from the title), but something related to the following stage
after PeriodicImpulse's SDF which doesn't stop (perform any checkpoint). The
current implementation of PeriodicImpulse internal DoFn is that it will only
stop when it catches up with the current time. With a very early start time, it
will take a long time before the checkpoint happens, leading to a stuck
pipeline.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]