lostluck commented on a change in pull request #12350:
URL: https://github.com/apache/beam/pull/12350#discussion_r462445798



##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -302,31 +308,76 @@ func (n *DataSource) Split(splits []int64, frac float64, 
bufSize int64) (int64,
        }
 
        n.mu.Lock()
+       defer n.mu.Unlock()
+
        var currProg float64 // Current element progress.
-       if n.index < 0 {     // Progress is at the end of the non-existant -1st 
element.
+       var su SplittableUnit
+       if n.index < 0 { // Progress is at the end of the non-existant -1st 
element.
                currProg = 1.0
-       } else if n.rt == nil { // If this isn't sub-element splittable, 
estimate some progress.
+       } else if n.su == nil { // If this isn't sub-element splittable, 
estimate some progress.
                currProg = 0.5
        } else { // If this is sub-element splittable, get progress of the 
current element.
-               rt := <-n.rt
-               d, r := rt.GetProgress()
-               currProg = d / (d + r)
-               n.rt <- rt
+               // If splittable, hold this tracker for the rest of the 
function so the element
+               // doesn't finish processing during a split.
+               su = <-n.su
+               if su == nil {
+                       return SplitResult{}, fmt.Errorf("failed to split: 
splittable unit was nil")
+               }
+               defer func() {
+                       n.su <- su
+               }()
+               currProg = su.GetProgress()
        }
        // Size to split within is the minimum of bufSize or splitIdx so we 
avoid
        // including elements we already know won't be processed.
        if bufSize <= 0 || n.splitIdx < bufSize {
                bufSize = n.splitIdx
        }
-       s, _, err := splitHelper(n.index, bufSize, currProg, splits, frac, 
false)
+       s, f, err := splitHelper(n.index, bufSize, currProg, splits, frac, su 
!= nil)
        if err != nil {
-               n.mu.Unlock()
-               return 0, err
+               return SplitResult{}, err
+       }
+
+       // No fraction returned, perform channel split.
+       if f < 0 {
+               n.splitIdx = s
+               return SplitResult{PI: s - 1, RI: s}, nil
+       }
+       // Otherwise, perform a sub-element split.
+       fr := f / (1.0 - currProg)
+       p, r, err := su.Split(fr)
+       if err != nil {
+               return SplitResult{}, err
+       }
+
+       if p != nil && r != nil { // Successful split.

Review comment:
       Consider that reversal technique I mentioned, so the short case is 
what's indented and returns early, and the long case is unindented.
   ```
   if p == nil || r == nil  {
               // Fallback to channel split, so split at next elm, not current.
                n.splitIdx = s + 1
                return SplitResult{PI: s, RI: s + 1}, nil
   }  // no need for an else.
   // .. the original contents of the if block ...
   ```

##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
##########
@@ -412,17 +423,120 @@ func TestDataSource_Split(t *testing.T) {
 
                // SDK never splits on 0, so check that every test.
                sp := SplitPoints{Splits: test.splitPts, Frac: test.frac, 
BufSize: test.bufSize}
-               if splitIdx, err := p.Split(sp); err != nil {
+               if splitRes, err := p.Split(sp); err != nil {
                        t.Fatalf("error in Split: %v", err)
-               } else if got, want := splitIdx, test.splitIdx; got != want {
-                       t.Fatalf("error in Split: got splitIdx = %v, want %v ", 
got, want)
+               } else {

Review comment:
       And here.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/sdf.go
##########
@@ -215,14 +215,25 @@ func (n *SplitAndSizeRestrictions) String() string {
 // changes to support the SDF's method signatures and the expected structure
 // of the FullValue being received.
 type ProcessSizedElementsAndRestrictions struct {
-       PDo *ParDo
-
-       inv *ctInvoker
-
-       // Rt allows this unit to send out restriction trackers being processed.
-       // Receivers of the tracker do not own it, and must send it back 
through the
-       // same channel once finished with it.
-       Rt chan sdf.RTracker
+       PDo     *ParDo
+       TfId    string // Transform ID. Needed for splitting.
+       ctInv   *ctInvoker
+       sizeInv *rsInvoker
+
+       // SU is a buffered channel for indicating when this unit is splittable.
+       // When this unit is processing an element, it sends a SplittableUnit
+       // interface through the channel. That interface can be received on 
other
+       // threads and used to perform splitting or other related operation.
+       //
+       // Receiving the SplittableUnit prevents the current element from 
finishing
+       // processing, so he element does not unexpectedly change during a 
split.

Review comment:
       so the element

##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
##########
@@ -278,10 +279,15 @@ func TestDataSource_Split(t *testing.T) {
                        runOnRoots(ctx, t, p, "StartBundle", func(root Root, 
ctx context.Context) error { return root.StartBundle(ctx, "1", dc) })
 
                        // SDK never splits on 0, so check that every test.
-                       if splitIdx, err := p.Split(SplitPoints{Splits: 
[]int64{0, test.splitIdx}}); err != nil {
+                       if splitRes, err := p.Split(SplitPoints{Splits: 
[]int64{0, test.splitIdx}}); err != nil {
                                t.Fatalf("error in Split: %v", err)
-                       } else if got, want := splitIdx, test.splitIdx; got != 
want {
-                               t.Fatalf("error in Split: got splitIdx = %v, 
want %v ", got, want)
+                       } else {

Review comment:
       Nit: Since the previous if ends with a t.Fatalf, and thus terminates the 
test it's not unreasonable to lift the splitRes assignment and avoid the else 
block and it's indentation.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -47,20 +47,21 @@ type DataSource struct {
        splitIdx  int64
        start     time.Time
 
-       // rt is non-nil if this DataSource feeds directly to a splittable unit,
-       // and receives the current restriction tracker being processed.
-       rt chan sdf.RTracker
+       // su is non-nil if this DataSource feeds directly to a splittable unit,
+       // and receives that splittable unit when it is available for splitting.
+       su chan SplittableUnit

Review comment:
       I see it is documented at ProcessSizedElements! Consider mentioning that 
here.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource.go
##########
@@ -47,20 +47,21 @@ type DataSource struct {
        splitIdx  int64
        start     time.Time
 
-       // rt is non-nil if this DataSource feeds directly to a splittable unit,
-       // and receives the current restriction tracker being processed.
-       rt chan sdf.RTracker
+       // su is non-nil if this DataSource feeds directly to a splittable unit,
+       // and receives that splittable unit when it is available for splitting.
+       su chan SplittableUnit

Review comment:
       As discussed, it may be good to document how/why we thing this channel 
will work as a safe concurrency primitive WRT the process bundle goroutine and 
the split call goroutine, in particular at the start and end of a bundle.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
##########
@@ -342,10 +348,15 @@ func TestDataSource_Split(t *testing.T) {
                                        <-blockedCh
                                        // Validate that we do not split on the 
element we're blocking on index.
                                        // The first valid split is at 
test.splitIdx.
-                                       if splitIdx, err := 
source.Split([]int64{0, 1, 2, 3, 4, 5}, -1, 0); err != nil {
+                                       if splitRes, err := 
source.Split([]int64{0, 1, 2, 3, 4, 5}, -1, 0); err != nil {
                                                t.Errorf("error in Split: %v", 
err)
-                                       } else if got, want := splitIdx, 
test.splitIdx; got != want {
-                                               t.Errorf("error in Split: got 
splitIdx = %v, want %v ", got, want)
+                                       } else {

Review comment:
       Same comment here.

##########
File path: sdks/go/pkg/beam/core/runtime/exec/dynsplit_test.go
##########
@@ -0,0 +1,464 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package exec
+
+import (
+       "bytes"
+       "context"
+       "github.com/google/go-cmp/cmp/cmpopts"
+       "io"
+       "reflect"
+       "sync"
+       "testing"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/graph"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
+       "github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
+       "github.com/apache/beam/sdks/go/pkg/beam/io/rtrackers/offsetrange"
+       "github.com/google/go-cmp/cmp"
+)
+
+// TestDynamicSplit tests that a dynamic split of an in-progress SDF succeeds
+// with valid input. It coordinates the two threads (processing and splitting)
+// to test what happens if operations happen in various orders. The test then
+// validates that the output of the SDF is correct according to the split.
+func TestDynamicSplit(t *testing.T) {
+       tests := []struct {
+               name string
+               // driver is a function determining how the processing and 
splitting
+               // threads are created and coordinated.
+               driver func(*Plan, DataContext, *splitTestSdf) (error, 
splitResult)
+       }{
+               {
+                       // Complete a split before beginning processing.
+                       name:   "Simple",
+                       driver: nonBlockingDriver,
+               },
+               {
+                       // Try claiming while blocked on a split.
+                       name:   "BlockOnSplit",
+                       driver: splitBlockingDriver,
+               },
+               {
+                       // Try splitting while blocked on a claim.
+                       name:   "BlockOnClaim",
+                       driver: claimBlockingDriver,
+               },
+       }
+       for _, test := range tests {
+               test := test
+               t.Run(test.name, func(t *testing.T) {
+                       // Create pipeline.
+                       sdf := newSplitTestSdf()
+                       dfn, err := graph.NewDoFn(sdf, 
graph.NumMainInputs(graph.MainSingle))
+                       if err != nil {
+                               t.Fatalf("invalid function: %v", err)
+                       }
+                       cdr := createSplitTestInCoder()
+                       plan, out := createSdfPlan(t, t.Name(), dfn, cdr)
+
+                       // Create thread to send element to pipeline.
+                       pr, pw := io.Pipe()
+                       elm := createElm()
+                       go writeElm(elm, cdr, pw)
+                       dc := DataContext{Data: &TestDataManager{R: pr}}
+
+                       // Call driver to coordinate processing & splitting 
threads.
+                       procRes, splitRes := test.driver(plan, dc, sdf)
+
+                       // Validate we get a valid split result, aside from 
split elements.
+                       if splitRes.err != nil {
+                               t.Fatalf("Plan.Split failed: %v", splitRes.err)
+                       }
+                       wantSplit := SplitResult{
+                               PI:   -1,
+                               RI:   1,
+                               PS:   nil,
+                               RS:   nil,
+                               TId:  testTransformId,
+                               InId: indexToInputId(0),
+                       }
+                       if diff := cmp.Diff(splitRes.split, wantSplit, 
cmpopts.IgnoreFields(SplitResult{}, "PS", "RS")); diff != "" {
+                               t.Errorf("Incorrect split result (ignoring 
split elements): %v", diff)
+                       }
+
+                       // Validate split elements are encoded correctly by 
decoding them
+                       // with the input coder to the path.
+                       // TODO(BEAM-10579) Switch to using splittable unit's 
input coder
+                       // once that is implemented.
+                       p, err := decodeDynSplitElm(splitRes.split.PS, cdr)
+                       if err != nil {
+                               t.Errorf("Failed decoding primary element 
split: %v", err)
+                       }
+                       _, err = decodeDynSplitElm(splitRes.split.RS, cdr)
+                       if err != nil {
+                               t.Errorf("Failed decoding residual element 
split: %v", err)
+                       }
+
+                       // Validate SDF output. Make sure each restriction 
matches the split result.
+                       if err := procRes; err != nil {
+                               t.Fatal(err)
+                       }
+                       pRest := 
p.Elm.(*FullValue).Elm2.(offsetrange.Restriction)
+                       if got, want := len(out.Elements), 
int(pRest.End-pRest.Start); got != want {
+                               t.Errorf("Unexpected number of elements: got: 
%v, want: %v", got, want)
+                       }
+                       for i, fv := range out.Elements {
+                               rest := fv.Elm.(offsetrange.Restriction)
+                               if got, want := rest, pRest; !cmp.Equal(got, 
want) {
+                                       t.Errorf("Output element %v had 
incorrect restriction: got: %v, want: %v", i, got, want)
+                               }
+                       }
+               })
+       }
+}
+
+// nonBlockingDriver performs a split before starting processing, so no thread
+// is forced to wait on a mutex.
+func nonBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) (procRes 
error, splitRes splitResult) {
+       // Begin processing pipeline.
+       procResCh := make(chan error)
+       go processPlan(plan, dc, procResCh)
+       rt := <-sdf.rt // Tracker is created first, retrieve that.
+
+       // Complete a split before unblocking processing.
+       splitResCh := make(chan splitResult)
+       go splitPlan(plan, splitResCh)
+       <-rt.split
+       <-rt.blockSplit
+       splitRes = <-splitResCh
+
+       // Unblock and finishing processing.
+       <-sdf.proc
+       <-rt.claim
+       <-rt.blockClaim
+       <-rt.endClaim
+       procRes = <-procResCh
+
+       return procRes, splitRes
+}
+
+// splitBlockingDriver blocks on a split request so that the SDF attempts to
+// claim while the split is occurring.
+func splitBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) 
(procRes error, splitRes splitResult) {
+       // Begin processing pipeline.
+       procResCh := make(chan error)
+       go processPlan(plan, dc, procResCh)
+       rt := <-sdf.rt // Tracker is created first, retrieve that.
+
+       // Start a split, but block on it so it holds the mutex.
+       splitResCh := make(chan splitResult)
+       go splitPlan(plan, splitResCh)
+       <-rt.split
+
+       // Start processing and start a claim, that'll be waiting for the mutex.
+       <-sdf.proc
+       <-rt.claim
+
+       // Unblock and finish splitting and free the mutex.
+       <-rt.blockSplit
+       splitRes = <-splitResCh
+
+       // Unblock and finish claiming and processing.
+       <-rt.blockClaim
+       <-rt.endClaim
+       procRes = <-procResCh
+
+       return procRes, splitRes
+}
+
+// claimBlockingDriver blocks on a claim request so that the SDF attempts to
+// split while the claim is occurring.
+func claimBlockingDriver(plan *Plan, dc DataContext, sdf *splitTestSdf) 
(procRes error, splitRes splitResult) {
+       // Begin processing pipeline.
+       procResCh := make(chan error)
+       go processPlan(plan, dc, procResCh)
+       rt := <-sdf.rt // Tracker is created first, retrieve that.
+
+       // Start a claim, but block on it so it holds the mutex.
+       <-sdf.proc
+       <-rt.claim
+
+       // Start a split that'll be waiting for the mutex.
+       splitResCh := make(chan splitResult)
+       go splitPlan(plan, splitResCh)
+       <-rt.split
+
+       // Unblock the claim, freeing the mutex (but not finishing processing 
yet).
+       <-rt.blockClaim
+
+       // Finish splitting, allowing processing to finish.
+       <-rt.blockSplit
+       splitRes = <-splitResCh
+       <-rt.endClaim // Delay the claim end so we don't process too much 
before splitting.
+       procRes = <-procResCh
+
+       return procRes, splitRes
+}
+
+// createElm creates the element for our test pipeline.
+func createElm() *FullValue {
+       return &FullValue{
+               Elm: &FullValue{
+                       Elm:  20,
+                       Elm2: offsetrange.Restriction{Start: 0, End: 20},
+               },
+               Elm2: float64(20),
+       }
+}
+
+// createSplitTestInCoder outputs the coder for inputs to our test pipeline,
+// (in particular, the DataSource transform of the pipeline). For the specific
+// element this is a coder for, see createElm.
+func createSplitTestInCoder() *coder.Coder {
+       restT := reflect.TypeOf((*offsetrange.Restriction)(nil)).Elem()
+       restCdr := coder.LookupCustomCoder(restT)
+
+       cdr := coder.NewW(
+               coder.NewKV([]*coder.Coder{
+                       coder.NewKV([]*coder.Coder{
+                               intCoder(reflectx.Int),
+                               {Kind: coder.Custom, T: typex.New(restT), 
Custom: restCdr},
+                       }),
+                       coder.NewDouble(),
+               }),
+               coder.NewGlobalWindow())
+       return cdr
+}
+
+// createSdfPlan creates a plan containing the test pipeline, which is
+// DataSource -> SDF.ProcessSizedElementsAndRestrictions -> CaptureNode.
+func createSdfPlan(t *testing.T, name string, fn *graph.DoFn, cdr 
*coder.Coder) (*Plan, *CaptureNode) {
+       out := &CaptureNode{UID: 0}
+       n := &ParDo{UID: 1, Fn: fn, Out: []Node{out}}
+       sdf := &ProcessSizedElementsAndRestrictions{PDo: n, TfId: 
testTransformId}
+       ds := &DataSource{
+               UID:   2,
+               SID:   StreamID{PtransformID: "DataSource"},
+               Name:  "name",
+               Coder: cdr,
+               Out:   sdf,
+       }
+       units := []Unit{ds, sdf, out}
+
+       p, err := NewPlan(name+"_plan", units)
+       if err != nil {
+               t.Fatalf("NewPlan failed: %v", err)
+       }
+       return p, out
+}
+
+// writeElm is meant to be the goroutine for feeding an element to the
+// DataSourc of the test pipeline.
+func writeElm(elm *FullValue, cdr *coder.Coder, pw *io.PipeWriter) {
+       wc := MakeWindowEncoder(cdr.Window)
+       ec := MakeElementEncoder(coder.SkipW(cdr))
+       if err := EncodeWindowedValueHeader(wc, window.SingleGlobalWindow, 
mtime.ZeroTimestamp, pw); err != nil {
+               panic("err")
+       }
+       if err := ec.Encode(elm, pw); err != nil {
+               panic("err")
+       }
+       if err := pw.Close(); err != nil {
+               panic("err")
+       }
+}
+
+func decodeDynSplitElm(elm []byte, cdr *coder.Coder) (*FullValue, error) {
+       wd := MakeWindowDecoder(cdr.Window)
+       ed := MakeElementDecoder(coder.SkipW(cdr))
+       b := bytes.NewBuffer(elm)
+       w, t, err := DecodeWindowedValueHeader(wd, b)
+       if err != nil {
+               return nil, err
+       }
+       e, err := ed.Decode(b)
+       if err != nil {
+               return nil, err
+       }
+       e.Windows = w
+       e.Timestamp = t
+       return e, nil
+}
+
+// processPlan is meant to be the goroutine representing the thread processing
+// the SDF.
+func processPlan(plan *Plan, dc DataContext, result chan error) {
+       if err := plan.Execute(context.Background(), plan.ID()+"_execute", dc); 
err != nil {
+               result <- errors.Wrap(err, "Plan.Execute failed")
+       }
+       if err := plan.Down(context.Background()); err != nil {
+               result <- errors.Wrap(err, "Plan.Down failed")
+       }
+       result <- nil
+}
+
+type splitResult struct {
+       split SplitResult
+       err   error
+}
+
+// splitPlan is meant to be the goroutine representing the thread handling a
+// split request for the SDF.
+func splitPlan(plan *Plan, result chan splitResult) {
+       // Fake splitting code.
+       //p, r, err := rt.TrySplit(0.5)

Review comment:
       Do we need to keep the commented out parts around now?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to