[ https://issues.apache.org/jira/browse/BEAM-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Robert Burke reassigned BEAM-4832: ---------------------------------- Assignee: Robert Burke (was: Henning Rohde) > Concurrent Writes in Data channels > ---------------------------------- > > Key: BEAM-4832 > URL: https://issues.apache.org/jira/browse/BEAM-4832 > Project: Beam > Issue Type: Bug > Components: sdk-go > Reporter: Robert Burke > Assignee: Robert Burke > Priority: Major > > A user was having issue with streaming Go pipelines on Dataflow. > Investigation yeilded the panic below, which triggered concurrent > modifications of the data channel maps. > > The fix is properly guarding all writes to the Data channel maps, in > particular: > > func > ([c|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=212&gs=kythe%253A%252F%252Fgoogle3%253Flang%253Dgo%253Fpath%253Dthird_party%252Fgolang%252Fapache_beam%252Fpkg%252Fbeam%252Fcore%252Fruntime%252Fharness%252Fharness%2523param%252520DataChannel.removeReader%25253Ac&gsn=c&ct=xref_usages] > > *[DataChannel|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=87&ct=xref_jump_to_def&gsn=DataChannel&rcl=205012539]) > > [removeReader|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=212&gs=kythe%253A%252F%252Fgoogle3%253Flang%253Dgo%253Fpath%253Dthird_party%252Fgolang%252Fapache_beam%252Fpkg%252Fbeam%252Fcore%252Fruntime%252Fharness%252Fharness%2523method%252520DataChannel.removeReader&gsn=removeReader&ct=xref_usages]([id|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=212&gs=kythe%253A%252F%252Fgoogle3%253Flang%253Dgo%253Fpath%253Dthird_party%252Fgolang%252Fapache_beam%252Fpkg%252Fbeam%252Fcore%252Fruntime%252Fharness%252Fharness%2523param%252520DataChannel.removeReader%25253Aid&gsn=id&ct=xref_usages] > string) { > delete([c|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=212&ct=xref_jump_to_def&gsn=c&rcl=205012539].[readers|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=93&ct=xref_jump_to_def&gsn=readers&rcl=205012539], > > [id|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=212&ct=xref_jump_to_def&gsn=id&rcl=205012539])} > Should be > > func > ([c|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=212&gs=kythe%253A%252F%252Fgoogle3%253Flang%253Dgo%253Fpath%253Dthird_party%252Fgolang%252Fapache_beam%252Fpkg%252Fbeam%252Fcore%252Fruntime%252Fharness%252Fharness%2523param%252520DataChannel.removeReader%25253Ac&gsn=c&ct=xref_usages] > > *[DataChannel|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=87&ct=xref_jump_to_def&gsn=DataChannel&rcl=205012539]) > > [removeReader|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=212&gs=kythe%253A%252F%252Fgoogle3%253Flang%253Dgo%253Fpath%253Dthird_party%252Fgolang%252Fapache_beam%252Fpkg%252Fbeam%252Fcore%252Fruntime%252Fharness%252Fharness%2523method%252520DataChannel.removeReader&gsn=removeReader&ct=xref_usages]([id|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=212&gs=kythe%253A%252F%252Fgoogle3%253Flang%253Dgo%253Fpath%253Dthird_party%252Fgolang%252Fapache_beam%252Fpkg%252Fbeam%252Fcore%252Fruntime%252Fharness%252Fharness%2523param%252520DataChannel.removeReader%25253Aid&gsn=id&ct=xref_usages] > string) { > [c|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=198&ct=xref_jump_to_def&gsn=c&rcl=205012539].[mu|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=96&ct=xref_jump_to_def&gsn=mu&rcl=205012539].[Lock|https://cs.corp.google.com/piper///depot/google3/third_party/go/gc/src/sync/mutex.go?l=72&ct=xref_jump_to_def&gsn=Lock&rcl=205012539]() > > delete([c|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=212&ct=xref_jump_to_def&gsn=c&rcl=205012539].[readers|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=93&ct=xref_jump_to_def&gsn=readers&rcl=205012539], > > [id|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=212&ct=xref_jump_to_def&gsn=id&rcl=205012539]) > > [c|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=198&ct=xref_jump_to_def&gsn=c&rcl=205012539].[mu|https://cs.corp.google.com/piper///depot/google3/third_party/golang/apache_beam/pkg/beam/core/runtime/harness/datamgr.go?l=96&ct=xref_jump_to_def&gsn=mu&rcl=205012539].[Unlock|https://cs.corp.google.com/piper///depot/google3/third_party/go/gc/src/sync/mutex.go?l=175&ct=xref_jump_to_def&gsn=Unlock&rcl=205012539]() > } > > > I fatal error: concurrent map writes > I > I goroutine 3277 [running]: > I runtime.throw(0xf880d0, 0x15) > I GOROOT/src/runtime/panic.go:616 +0x81 fp=0xc4212eb6d8 sp=0xc4212eb6b8 > pc=0x42be31 > I runtime.mapdelete_faststr(0xe18160, 0xc4202ba7b0, 0xc4213dc0e0, 0x20) > I GOROOT/src/runtime/hashmap_fast.go:892 +0x28d fp=0xc4212eb738 > sp=0xc4212eb6d8 pc=0x40e45d > I > google/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.(*DataChannel).removeReader(...) > > I > vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/datamgr.go:213 > > I > google/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.(*dataReader).Close(0xc420ba8c80, > 0xc42031e0c0, 0xf) > I > vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/datamgr.go:241 > +0x6e fp=0xc4212eb768 sp=0xc4212eb738 pc=0xc5059e > I > google/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*DataSource).Process(0xc4203ae540, > 0x10708c0, 0xc420464e70, 0x0, 0x0) > I > vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/datasource.go:156 > +0x1302 fp=0xc4212ebbd8 sp=0xc4212eb768 pc=0x894612 > I > google/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(Root).Process-fm(0x10708c0, > 0xc420464e70, 0xc420b7bc58, 0x0) > I > vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:112 > +0x43 fp=0xc4212ebc10 sp=0xc4212ebbd8 pc=0x8a4c23 > I > google/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.callNoPanic(0x10708c0, > 0xc420464e70, 0xc420b7bcc8, 0x0, 0x0) > I > vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:42 > +0x6c fp=0xc4212ebc40 sp=0xc4212ebc10 pc=0x8a37ac > I > google/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec.(*Plan).Execute(0xc420030770, > 0x10708c0, 0xc420464e70, 0xc4201a85e8, 0x5, 0x1064720, 0xc4202097e0, > 0xfb5320, 0xc4206ff940) > I > vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:112 > +0x3bd fp=0xc4212ebd58 sp=0xc4212ebc40 pc=0x89d4ed > I > google/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.(*control).handleInstruction(0xc4201a7820, > 0x10708c0, 0xc420464e10, 0xc420d5e340, 0xc420038018) > I > vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:193 > +0x729 fp=0xc4212ebf40 sp=0xc4212ebd58 pc=0xc51ef9 > I > google/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main.func2(0x1070840, > 0xc420038018, 0xc420d5e340) > I > vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:113 > +0x167 fp=0xc4212ebfc8 sp=0xc4212ebf40 pc=0xc54b17 > I runtime.goexit() > I > bazel-out/k8-fastbuild/bin/external/io_bazel_rules_go/linux_amd64_stripped/stdlib~/src/runtime/asm_amd64.s:2361 > +0x1 fp=0xc4212ebfd0 sp=0xc4212ebfc8 pc=0x45a591 > I created by > google/vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness.Main > > I > vendor/github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:126 > +0x5cc > I -- This message was sent by Atlassian JIRA (v7.6.3#76005)