camphillips22 commented on issue #26245:
URL: https://github.com/apache/beam/issues/26245#issuecomment-1507013684

   Runner: Google Dataflow
   Go SDK version: 2.46.0
   
   Example code:
   
   ```golang
   
   func GenerateData(s beam.Scope, start, end time.Time, col beam.PCollection) 
beam.PCollection {
        s = s.Scope("GenerateData")
        return beam.ParDo(s, &pageIdGeneratorFn{
                Start: start.Format(time.RFC3339),
                End:   end.Format(time.RFC3339),
        }, col)
   }
   
   type dataGeneratorFn struct {
        Start string
        End   string
   }
   
   func (f *dataGeneratorFn) CreateInitialRestriction(_ []byte) 
offsetrange.Restriction {
        s, _ := time.Parse(time.RFC3339Nano, f.Start)
        e, _ := time.Parse(time.RFC3339Nano, f.End)
        return offsetrange.Restriction{
                Start: s.Unix(),
                End:   e.Unix(),
        }
   }
   
   func (f *dataGeneratorFn) CreateTracker(rest offsetrange.Restriction) 
*sdf.LockRTracker {
        return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
   }
   
   func (f *dataGeneratorFn) RestrictionSize(_ []byte, rest 
offsetrange.Restriction) float64 {
        return rest.Size()
   }
   
   func (f *dataGeneratorFn) SplitRestriction(_ []byte, rest 
offsetrange.Restriction) (splits []offsetrange.Restriction) {
           rest.SizedSplits(int64(time.Hour.Seconds()))
   }
   
   func (f *dataGeneratorFn) ProcessElement(ctx context.Context, rt 
*sdf.LockRTracker, _ []byte, emit func([]byte)) (sdf.ProcessContinuation, 
error) {
   
           rest := rt.GetRestriction().(offsetrange.Restriction)
        start := time.Unix(rest.Start, 0)
        end := time.Unix(rest.End, 0)
           
           // iterator may take a while to set up. Claim the start before 
setting up since dataflow aggressively splits.
           if !rt.TryClaim(rest.Start) {
                  return sdf.StopProcessing(), nil
           }
           lastClaim := rest.Start
   
           // setup iterator that takes in time bounds
           iter := GetIterator(ctx, start, end)
        for {
                v, err := iter.Next()
   
                if err != nil {
                        if err == io.EOF {
                                    rt.TryClaim(rest.End)
                                return sdf.StopProcessing(), nil
                        }
                        return sdf.ResumeProcessingIn(5 * time.Second), err
                }
                   // Note: this is simplified to remove tracking if the 
value's timestamp has already been
                   if v.Timestamp() != lastClaim {
                        if !rt.TryClaim(v.Timestamp()) {
                                return sdf.StopProcessing(), nil
                        }
                           lastClaim = v.Timestamp()
                   }
                emit(v.Data())
        }
   }
   
   ```
   
   As far as passing tests, I think that even with the error, I'm getting 
correct results, but the volume of errors is a bit concerning.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to