youngoli commented on a change in pull request #12350:
URL: https://github.com/apache/beam/pull/12350#discussion_r459848001



##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -302,31 +302,67 @@ func (n *DataSource) Split(splits []int64, frac float64, 
bufSize int64) (int64,
        }
 
        n.mu.Lock()
+       defer n.mu.Unlock()
+
        var currProg float64 // Current element progress.
-       if n.index < 0 {     // Progress is at the end of the non-existant -1st 
element.
+       var su SplittableUnit
+       if n.index < 0 { // Progress is at the end of the non-existant -1st 
element.
                currProg = 1.0
-       } else if n.rt == nil { // If this isn't sub-element splittable, 
estimate some progress.
+       } else if n.su == nil { // If this isn't sub-element splittable, 
estimate some progress.
                currProg = 0.5
        } else { // If this is sub-element splittable, get progress of the 
current element.
-               rt := <-n.rt
-               d, r := rt.GetProgress()
-               currProg = d / (d + r)
-               n.rt <- rt
+               // If splittable, hold this tracker for the rest of the 
function so the element
+               // doesn't finish processing during a split.
+               su = <-n.su
+               if su == nil {
+                       return SplitResult{}, fmt.Errorf("failed to split: 
splittable unit was nil")
+               }
+               defer func() {
+                       n.su <- su
+               }()
+               currProg = su.GetProgress()
        }
        // Size to split within is the minimum of bufSize or splitIdx so we 
avoid
        // including elements we already know won't be processed.
        if bufSize <= 0 || n.splitIdx < bufSize {
                bufSize = n.splitIdx
        }
-       s, _, err := splitHelper(n.index, bufSize, currProg, splits, frac, 
false)
+       s, f, err := splitHelper(n.index, bufSize, currProg, splits, frac, su 
!= nil)
        if err != nil {
-               n.mu.Unlock()
-               return 0, err
+               return SplitResult{}, err
+       }
+       if f > 0.0 {

Review comment:
       Sounds good to me. Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to