[ 
https://issues.apache.org/jira/browse/BEAM-4141?focusedWorklogId=94413&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94413
 ]

ASF GitHub Bot logged work on BEAM-4141:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Apr/18 01:17
            Start Date: 24/Apr/18 01:17
    Worklog Time Spent: 10m 
      Work Description: herohde commented on a change in pull request #5184: 
BEAM-4141: Drain source when user function processing fails.
URL: https://github.com/apache/beam/pull/5184#discussion_r183581212
 
 

 ##########
 File path: sdks/go/pkg/beam/core/runtime/harness/datamgr_test.go
 ##########
 @@ -0,0 +1,89 @@
+package harness
+
+import (
+       "context"
+       "io"
+       "io/ioutil"
+       "log"
+       "testing"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
+       pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+)
+
+type fakeClient struct {
+       t     *testing.T
+       done  chan bool
+       calls int
+}
+
+func (f *fakeClient) Recv() (*pb.Elements, error) {
+       f.calls++
+       data := []byte{1, 2, 3, 4}
+       elemData := pb.Elements_Data{
+               InstructionReference: "inst_ref",
+               Data:                 data,
+               Target: &pb.Target{
+                       PrimitiveTransformReference: "ptr",
+                       Name: "instruction_name",
+               },
+       }
+
+       msg := pb.Elements{}
+
+       for i := 0; i < bufElements+1; i++ {
+               msg.Data = append(msg.Data, &elemData)
+       }
+
+       // The first two calls fill up the buffer completely to stimulate the 
deadlock
+       // The third call ends the data stream normally.
+       // Subsequent calls return no data.
+       switch f.calls {
+       case 1:
+               return &msg, nil
+       case 2:
+               return &msg, nil
+       case 3:
+               elemData.Data = []byte{}
+               msg.Data = []*pb.Elements_Data{&elemData}
+               // Broadcasting done here means that this code providing 
messages
+               // has not been blocked by the bug blocking the dataReader
+               // from getting more messages.
+               return &msg, nil
+       default:
+               f.done <- true
+               return nil, io.EOF
+       }
+}
+
+func (f *fakeClient) Send(*pb.Elements) error {
+       return nil
+}
+
+func TestDataChannelTerminateOnClose(t *testing.T) {
+       // The logging of channels closed is quite noisy for this test
+       log.SetOutput(ioutil.Discard)
+       done := make(chan bool, 1)
+       client := &fakeClient{t: t, done: done}
+       c, err := makeDataChannel(context.Background(), nil, client, 
exec.Port{})
+       if err != nil {
+               t.Errorf("Unexpected error in makeDataChannel: %v", err)
+       }
+
+       r, err := c.OpenRead(context.Background(), exec.StreamID{Port: 
exec.Port{URL: ""}, Target: exec.Target{ID: "ptr", Name: "instruction_name"}, 
InstID: "inst_ref"})
+       var read = make([]byte, 4)
+
+       // We don't read up all the buffered data, but immediately close the 
reader.
+       // Previously, since nothing was consuming the incoming gRPC data, the 
whole
+       // data channel would get stuck, and the client.Recv() call was 
eventually
+       // no longer called.
+       _, err = r.Read(read)
+       if err != nil {
+               t.Errorf("Unexpected error from read: %v", err)
+       }
+       r.Close()
+
+       // If done is signaled, that means client.Recv() has been called to 
flush the
+       // channel, meaning consumer code isn't stuck.
+       <-done
 
 Review comment:
   Maybe a timeout to catch that we're stuck and fail the test with an explicit 
error?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 94413)
    Time Spent: 1h 10m  (was: 1h)

> Data channel deadlocks when user function fails
> -----------------------------------------------
>
>                 Key: BEAM-4141
>                 URL: https://issues.apache.org/jira/browse/BEAM-4141
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-go
>    Affects Versions: Not applicable
>            Reporter: Bill Neubauer
>            Assignee: Bill Neubauer
>            Priority: Major
>             Fix For: Not applicable
>
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> There is a deadlock condition in the data channel code that occurs when a 
> user function fails while processing an element. The producer for the data 
> channel is continuing to send information across a channel, but the intended 
> consumer has stopped listening. Unfortunately, this channel blocks the entire 
> data channel, blocking data for any other DoFn that might be running, causing 
> the whole worker to deadlock.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to