youngoli commented on a change in pull request #12350: URL: https://github.com/apache/beam/pull/12350#discussion_r459848001
########## File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go ########## @@ -302,31 +302,67 @@ func (n *DataSource) Split(splits []int64, frac float64, bufSize int64) (int64, } n.mu.Lock() + defer n.mu.Unlock() + var currProg float64 // Current element progress. - if n.index < 0 { // Progress is at the end of the non-existant -1st element. + var su SplittableUnit + if n.index < 0 { // Progress is at the end of the non-existant -1st element. currProg = 1.0 - } else if n.rt == nil { // If this isn't sub-element splittable, estimate some progress. + } else if n.su == nil { // If this isn't sub-element splittable, estimate some progress. currProg = 0.5 } else { // If this is sub-element splittable, get progress of the current element. - rt := <-n.rt - d, r := rt.GetProgress() - currProg = d / (d + r) - n.rt <- rt + // If splittable, hold this tracker for the rest of the function so the element + // doesn't finish processing during a split. + su = <-n.su + if su == nil { + return SplitResult{}, fmt.Errorf("failed to split: splittable unit was nil") + } + defer func() { + n.su <- su + }() + currProg = su.GetProgress() } // Size to split within is the minimum of bufSize or splitIdx so we avoid // including elements we already know won't be processed. if bufSize <= 0 || n.splitIdx < bufSize { bufSize = n.splitIdx } - s, _, err := splitHelper(n.index, bufSize, currProg, splits, frac, false) + s, f, err := splitHelper(n.index, bufSize, currProg, splits, frac, su != nil) if err != nil { - n.mu.Unlock() - return 0, err + return SplitResult{}, err + } + if f > 0.0 { Review comment: Sounds good to me. Done. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org