camphillips22 commented on issue #26245:
URL: https://github.com/apache/beam/issues/26245#issuecomment-1507013684
Runner: Google Dataflow
Go SDK version: 2.46.0
Example code:
```golang
func GenerateData(s beam.Scope, start, end time.Time, col beam.PCollection)
beam.PCollection {
s = s.Scope("GenerateData")
return beam.ParDo(s, &pageIdGeneratorFn{
Start: start.Format(time.RFC3339),
End: end.Format(time.RFC3339),
}, col)
}
type dataGeneratorFn struct {
Start string
End string
}
func (f *dataGeneratorFn) CreateInitialRestriction(_ []byte)
offsetrange.Restriction {
s, _ := time.Parse(time.RFC3339Nano, f.Start)
e, _ := time.Parse(time.RFC3339Nano, f.End)
return offsetrange.Restriction{
Start: s.Unix(),
End: e.Unix(),
}
}
func (f *dataGeneratorFn) CreateTracker(rest offsetrange.Restriction)
*sdf.LockRTracker {
return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
}
func (f *dataGeneratorFn) RestrictionSize(_ []byte, rest
offsetrange.Restriction) float64 {
return rest.Size()
}
func (f *dataGeneratorFn) SplitRestriction(_ []byte, rest
offsetrange.Restriction) (splits []offsetrange.Restriction) {
rest.SizedSplits(int64(time.Hour.Seconds()))
}
func (f *dataGeneratorFn) ProcessElement(ctx context.Context, rt
*sdf.LockRTracker, _ []byte, emit func([]byte)) (sdf.ProcessContinuation,
error) {
rest := rt.GetRestriction().(offsetrange.Restriction)
start := time.Unix(rest.Start, 0)
end := time.Unix(rest.End, 0)
// iterator may take a while to set up. Claim the start before
setting up since dataflow aggressively splits.
if !rt.TryClaim(rest.Start) {
return sdf.StopProcessing(), nil
}
lastClaim := rest.Start
// setup iterator that takes in time bounds
iter := GetIterator(ctx, start, end)
for {
v, err := iter.Next()
if err != nil {
if err == io.EOF {
rt.TryClaim(rest.End)
return sdf.StopProcessing(), nil
}
return sdf.ResumeProcessingIn(5 * time.Second), err
}
// Note: this is simplified to remove tracking if the
value's timestamp has already been
if v.Timestamp() != lastClaim {
if !rt.TryClaim(v.Timestamp()) {
return sdf.StopProcessing(), nil
}
lastClaim = v.Timestamp()
}
emit(v.Data())
}
}
```
As far as passing tests, I think that even with the error, I'm getting
correct results, but the volume of errors is a bit concerning.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]