[ 
https://issues.apache.org/jira/browse/BEAM-4141?focusedWorklogId=94861&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-94861
 ]

ASF GitHub Bot logged work on BEAM-4141:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 24/Apr/18 23:25
            Start Date: 24/Apr/18 23:25
    Worklog Time Spent: 10m 
      Work Description: tgroh closed pull request #5184: BEAM-4141: Drain 
source when user function processing fails.
URL: https://github.com/apache/beam/pull/5184
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go 
b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
index ffd82779570..89f6d7b65c8 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
@@ -28,7 +28,18 @@ import (
        "google.golang.org/grpc"
 )
 
-const chunkSize = int(4e6) // Bytes to put in a single gRPC message. Max is 
slightly higher.
+const (
+       chunkSize   = int(4e6) // Bytes to put in a single gRPC message. Max is 
slightly higher.
+       bufElements = 20       // Number of chunks buffered per reader.
+)
+
+// This is a reduced version of the full gRPC interface to help with testing.
+// TODO(wcn): need a compile-time assertion to make sure this stays synced 
with what's
+// in pb.BeamFnData_DataClient
+type dataClient interface {
+       Send(*pb.Elements) error
+       Recv() (*pb.Elements, error)
+}
 
 // DataManager manages data channels to the FnHarness. A fixed number of 
channels
 // are generally used, each managing multiple logical byte streams.
@@ -75,7 +86,7 @@ func (m *DataManager) open(ctx context.Context, port 
exec.Port) (*DataChannel, e
 // DataChannel manages a single grpc connection to the FnHarness.
 type DataChannel struct {
        cc     *grpc.ClientConn
-       client pb.BeamFnData_DataClient
+       client dataClient
        port   exec.Port
 
        writers map[string]*dataWriter
@@ -95,7 +106,10 @@ func NewDataChannel(ctx context.Context, port exec.Port) 
(*DataChannel, error) {
                cc.Close()
                return nil, fmt.Errorf("failed to connect to data service: %v", 
err)
        }
+       return makeDataChannel(ctx, cc, client, port)
+}
 
+func makeDataChannel(ctx context.Context, cc *grpc.ClientConn, client 
dataClient, port exec.Port) (*DataChannel, error) {
        ret := &DataChannel{
                cc:      cc,
                client:  client,
@@ -108,25 +122,25 @@ func NewDataChannel(ctx context.Context, port exec.Port) 
(*DataChannel, error) {
        return ret, nil
 }
 
-func (m *DataChannel) OpenRead(ctx context.Context, id exec.StreamID) 
(io.ReadCloser, error) {
-       return m.makeReader(ctx, id), nil
+func (c *DataChannel) OpenRead(ctx context.Context, id exec.StreamID) 
(io.ReadCloser, error) {
+       return c.makeReader(ctx, id), nil
 }
 
-func (m *DataChannel) OpenWrite(ctx context.Context, id exec.StreamID) 
(io.WriteCloser, error) {
-       return m.makeWriter(ctx, id), nil
+func (c *DataChannel) OpenWrite(ctx context.Context, id exec.StreamID) 
(io.WriteCloser, error) {
+       return c.makeWriter(ctx, id), nil
 }
 
-func (m *DataChannel) read(ctx context.Context) {
+func (c *DataChannel) read(ctx context.Context) {
        cache := make(map[string]*dataReader)
        for {
-               msg, err := m.client.Recv()
+               msg, err := c.client.Recv()
                if err != nil {
                        if err == io.EOF {
                                // TODO(herohde) 10/12/2017: can this happen 
before shutdown? Reconnect?
-                               log.Warnf(ctx, "DataChannel %v closed", m.port)
+                               log.Warnf(ctx, "DataChannel %v closed", c.port)
                                return
                        }
-                       panic(fmt.Errorf("channel %v bad: %v", m.port, err))
+                       panic(fmt.Errorf("channel %v bad: %v", c.port, err))
                }
 
                recordStreamReceive(msg)
@@ -136,19 +150,26 @@ func (m *DataChannel) read(ctx context.Context) {
                // to reduce lock contention.
 
                for _, elm := range msg.GetData() {
-                       id := exec.StreamID{Port: m.port, Target: 
exec.Target{ID: elm.GetTarget().PrimitiveTransformReference, Name: 
elm.GetTarget().GetName()}, InstID: elm.GetInstructionReference()}
+                       id := exec.StreamID{Port: c.port, Target: 
exec.Target{ID: elm.GetTarget().PrimitiveTransformReference, Name: 
elm.GetTarget().GetName()}, InstID: elm.GetInstructionReference()}
                        sid := id.String()
 
-                       // log.Printf("Chan read (%v): %v", sid, elm.GetData())
+                       // log.Printf("Chan read (%v): %v\n", sid, 
elm.GetData())
 
                        var r *dataReader
                        if local, ok := cache[sid]; ok {
                                r = local
                        } else {
-                               r = m.makeReader(ctx, id)
+                               r = c.makeReader(ctx, id)
                                cache[sid] = r
                        }
 
+                       if r.completed {
+                               // The local reader has closed but the remote 
is still sending data.
+                               // Just ignore it. We keep the reader config in 
the cache so we don't
+                               // treat it as a new reader. Eventually the 
stream will finish and go
+                               // through normal teardown.
+                               continue
+                       }
                        if len(elm.GetData()) == 0 {
                                // Sentinel EOF segment for stream. Close 
buffer to signal EOF.
                                close(r.buf)
@@ -163,48 +184,61 @@ func (m *DataChannel) read(ctx context.Context) {
 
                        // This send is deliberately blocking, if we exceed the 
buffering for
                        // a reader. We can't buffer the entire main input, if 
some user code
-                       // is slow (or gets stuck).
-                       r.buf <- elm.GetData()
+                       // is slow (or gets stuck). If the local side closes, 
the reader
+                       // will be marked as completed and further remote data 
will be ingored.
+                       select {
+                       case r.buf <- elm.GetData():
+                       case <-r.done:
+                               r.completed = true
+                       }
                }
        }
 }
 
-func (m *DataChannel) makeReader(ctx context.Context, id exec.StreamID) 
*dataReader {
-       m.mu.Lock()
-       defer m.mu.Unlock()
+func (c *DataChannel) makeReader(ctx context.Context, id exec.StreamID) 
*dataReader {
+       c.mu.Lock()
+       defer c.mu.Unlock()
 
        sid := id.String()
-       if r, ok := m.readers[sid]; ok {
+       if r, ok := c.readers[sid]; ok {
                return r
        }
 
-       r := &dataReader{buf: make(chan []byte, 20)}
-       m.readers[sid] = r
+       r := &dataReader{id: sid, buf: make(chan []byte, bufElements), done: 
make(chan bool, 1), channel: c}
+       c.readers[sid] = r
        return r
 }
 
-func (m *DataChannel) makeWriter(ctx context.Context, id exec.StreamID) 
*dataWriter {
-       m.mu.Lock()
-       defer m.mu.Unlock()
+func (c *DataChannel) removeReader(id string) {
+       delete(c.readers, id)
+}
+
+func (c *DataChannel) makeWriter(ctx context.Context, id exec.StreamID) 
*dataWriter {
+       c.mu.Lock()
+       defer c.mu.Unlock()
 
        sid := id.String()
-       if w, ok := m.writers[sid]; ok {
+       if w, ok := c.writers[sid]; ok {
                return w
        }
 
-       w := &dataWriter{ch: m, id: id}
-       m.writers[sid] = w
+       w := &dataWriter{ch: c, id: id}
+       c.writers[sid] = w
        return w
 }
 
 type dataReader struct {
-       buf chan []byte
-       cur []byte
+       id        string
+       buf       chan []byte
+       done      chan bool
+       cur       []byte
+       channel   *DataChannel
+       completed bool
 }
 
 func (r *dataReader) Close() error {
-       // TODO(herohde) 6/27/2017: allow early close to throw away data async. 
We also need
-       // to garbage collect readers.
+       r.done <- true
+       r.channel.removeReader(r.id)
        return nil
 }
 
@@ -302,7 +336,3 @@ func (w *dataWriter) Write(p []byte) (n int, err error) {
        w.buf = append(w.buf, p...)
        return len(p), nil
 }
-
-type DataConnectionContext struct {
-       InstID string `beam:"opt"`
-}
diff --git a/sdks/go/pkg/beam/core/runtime/harness/datamgr_test.go 
b/sdks/go/pkg/beam/core/runtime/harness/datamgr_test.go
new file mode 100644
index 00000000000..1305efd1f48
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/harness/datamgr_test.go
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package harness
+
+import (
+       "context"
+       "io"
+       "io/ioutil"
+       "log"
+       "testing"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
+       pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+)
+
+type fakeClient struct {
+       t     *testing.T
+       done  chan bool
+       calls int
+}
+
+func (f *fakeClient) Recv() (*pb.Elements, error) {
+       f.calls++
+       data := []byte{1, 2, 3, 4}
+       elemData := pb.Elements_Data{
+               InstructionReference: "inst_ref",
+               Data:                 data,
+               Target: &pb.Target{
+                       PrimitiveTransformReference: "ptr",
+                       Name: "instruction_name",
+               },
+       }
+
+       msg := pb.Elements{}
+
+       for i := 0; i < bufElements+1; i++ {
+               msg.Data = append(msg.Data, &elemData)
+       }
+
+       // The first two calls fill up the buffer completely to stimulate the 
deadlock
+       // The third call ends the data stream normally.
+       // Subsequent calls return no data.
+       switch f.calls {
+       case 1:
+               return &msg, nil
+       case 2:
+               return &msg, nil
+       case 3:
+               elemData.Data = []byte{}
+               msg.Data = []*pb.Elements_Data{&elemData}
+               // Broadcasting done here means that this code providing 
messages
+               // has not been blocked by the bug blocking the dataReader
+               // from getting more messages.
+               return &msg, nil
+       default:
+               f.done <- true
+               return nil, io.EOF
+       }
+}
+
+func (f *fakeClient) Send(*pb.Elements) error {
+       return nil
+}
+
+func TestDataChannelTerminateOnClose(t *testing.T) {
+       // The logging of channels closed is quite noisy for this test
+       log.SetOutput(ioutil.Discard)
+       done := make(chan bool, 1)
+       client := &fakeClient{t: t, done: done}
+       c, err := makeDataChannel(context.Background(), nil, client, 
exec.Port{})
+       if err != nil {
+               t.Errorf("Unexpected error in makeDataChannel: %v", err)
+       }
+
+       r, err := c.OpenRead(context.Background(), exec.StreamID{Port: 
exec.Port{URL: ""}, Target: exec.Target{ID: "ptr", Name: "instruction_name"}, 
InstID: "inst_ref"})
+       var read = make([]byte, 4)
+
+       // We don't read up all the buffered data, but immediately close the 
reader.
+       // Previously, since nothing was consuming the incoming gRPC data, the 
whole
+       // data channel would get stuck, and the client.Recv() call was 
eventually
+       // no longer called.
+       _, err = r.Read(read)
+       if err != nil {
+               t.Errorf("Unexpected error from read: %v", err)
+       }
+       r.Close()
+
+       // If done is signaled, that means client.Recv() has been called to 
flush the
+       // channel, meaning consumer code isn't stuck.
+       <-done
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 94861)
    Time Spent: 2h 50m  (was: 2h 40m)

> Data channel deadlocks when user function fails
> -----------------------------------------------
>
>                 Key: BEAM-4141
>                 URL: https://issues.apache.org/jira/browse/BEAM-4141
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-go
>    Affects Versions: Not applicable
>            Reporter: Bill Neubauer
>            Assignee: Bill Neubauer
>            Priority: Major
>             Fix For: Not applicable
>
>          Time Spent: 2h 50m
>  Remaining Estimate: 0h
>
> There is a deadlock condition in the data channel code that occurs when a 
> user function fails while processing an element. The producer for the data 
> channel is continuing to send information across a channel, but the intended 
> consumer has stopped listening. Unfortunately, this channel blocks the entire 
> data channel, blocking data for any other DoFn that might be running, causing 
> the whole worker to deadlock.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to