johannaojeling commented on code in PR #25160:
URL: https://github.com/apache/beam/pull/25160#discussion_r1086291794


##########
sdks/go/pkg/beam/io/mongodbio/read.go:
##########
@@ -442,53 +266,53 @@ func (fn *readFn) ProcessElement(
        }()
 
        for cursor.Next(ctx) {
-               value, err := decodeDocument(cursor, fn.Type.T)
+               id, value, err := decodeDocument(cursor, fn.Type.T)
                if err != nil {
                        return err
                }
 
-               emit(value)
-       }
-
-       return cursor.Err()
-}
+               result := cursorResult{nextID: id}
+               if !rt.TryClaim(result) {
+                       return cursor.Err()
+               }
 
-func mergeFilters(idFilter bson.M, customFilter bson.M) bson.M {
-       if len(idFilter) == 0 {
-               return customFilter
+               emit(value)
        }
 
-       if len(customFilter) == 0 {
-               return idFilter
-       }
+       result := cursorResult{isExhausted: true}
+       rt.TryClaim(result)

Review Comment:
   I added this extra `TryClaim` invokation to be able to mark the tracker as 
done in the event that the cursor is exhausted before the expected count of 
documents have been claimed. This may happen if documents within the 
restriction changed, e.g. were deleted or modified to no longer match the 
filter, since the total number of documents within the restriction was counted 
upon restriction creation. I preferred to still have the DoFn succeed in this 
case.
   
   My first idea involved passing only the ID as the pos argument to TryClaim, 
and returning `sdf.StopProcessing()` after the cursor had been exhausted, 
inspired by the 
[example](https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint)
 in the programming guide. However I noticed user initiated checkpointing is 
less supported among the runners and also my pipeline was interpreted as a 
streaming job in Dataflow so I was unsure of whether that was the right 
approach to use and decided to go with this less elegant way instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to