[ 
https://issues.apache.org/jira/browse/BEAM-3286?focusedWorklogId=136010&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-136010
 ]

ASF GitHub Bot logged work on BEAM-3286:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Aug/18 23:48
            Start Date: 18/Aug/18 23:48
    Worklog Time Spent: 10m 
      Work Description: herohde closed pull request #6197: [BEAM-3286] Add 
preliminary Go support for side input
URL: https://github.com/apache/beam/pull/6197
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/go/pkg/beam/core/runtime/exec/coder.go 
b/sdks/go/pkg/beam/core/runtime/exec/coder.go
index f4e2da4ace5..df90996b940 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/coder.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/coder.go
@@ -16,11 +16,12 @@
 package exec
 
 import (
-       "context"
        "fmt"
        "io"
        "reflect"
 
+       "bytes"
+
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph/mtime"
        "github.com/apache/beam/sdks/go/pkg/beam/core/graph/window"
@@ -29,45 +30,6 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/core/util/reflectx"
 )
 
-// Port represents the connection port of external operations.
-type Port struct {
-       URL string
-}
-
-// Target represents the target of external operations.
-type Target struct {
-       ID   string
-       Name string
-}
-
-// StreamID represents the information needed to identify a data stream.
-type StreamID struct {
-       Port   Port
-       Target Target
-       InstID string
-}
-
-func (id StreamID) String() string {
-       return fmt.Sprintf("S:%v:[%v:%v]:%v", id.Port.URL, id.Target.ID, 
id.Target.Name, id.InstID)
-}
-
-// DataReader is the interface for reading data elements from a particular 
stream.
-type DataReader interface {
-       OpenRead(ctx context.Context, id StreamID) (io.ReadCloser, error)
-}
-
-// DataWriter is the interface for writing data elements to a particular 
stream.
-type DataWriter interface {
-       OpenWrite(ctx context.Context, id StreamID) (io.WriteCloser, error)
-}
-
-// DataManager manages external data byte streams. Each data stream can be
-// opened by one consumer only.
-type DataManager interface {
-       DataReader
-       DataWriter
-}
-
 // NOTE(herohde) 4/30/2017: The main complication is CoGBK results, which have
 // nested streams. Hence, a simple read-one-element-at-a-time approach doesn't
 // work, because each "element" can be too large to fit into memory. Instead,
@@ -80,6 +42,16 @@ type ElementEncoder interface {
        Encode(FullValue, io.Writer) error
 }
 
+// EncodeElement is a convenience function for encoding a single element into a
+// byte slice.
+func EncodeElement(c ElementEncoder, val interface{}) ([]byte, error) {
+       var buf bytes.Buffer
+       if err := c.Encode(FullValue{Elm: val}, &buf); err != nil {
+               return nil, err
+       }
+       return buf.Bytes(), nil
+}
+
 // ElementDecoder handles FullValue deserialization from a byte stream. The 
decoder
 // can be reused, even if an error is encountered.  Concurrency-safe.
 type ElementDecoder interface {
@@ -284,6 +256,16 @@ type WindowEncoder interface {
        Encode([]typex.Window, io.Writer) error
 }
 
+// EncodeWindow is a convenience function for encoding a single window into a
+// byte slice.
+func EncodeWindow(c WindowEncoder, w typex.Window) ([]byte, error) {
+       var buf bytes.Buffer
+       if err := c.Encode([]typex.Window{w}, &buf); err != nil {
+               return nil, err
+       }
+       return buf.Bytes(), nil
+}
+
 // WindowDecoder handles Window deserialization from a byte stream. The decoder
 // can be reused, even if an error is encountered. Concurrency-safe.
 type WindowDecoder interface {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/cogbk.go 
b/sdks/go/pkg/beam/core/runtime/exec/cogbk.go
index 162068d387d..5da21f5b37a 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/cogbk.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/cogbk.go
@@ -47,7 +47,7 @@ func (n *Inject) Up(ctx context.Context) error {
        return nil
 }
 
-func (n *Inject) StartBundle(ctx context.Context, id string, data DataManager) 
error {
+func (n *Inject) StartBundle(ctx context.Context, id string, data DataContext) 
error {
        return n.Out.StartBundle(ctx, id, data)
 }
 
@@ -100,7 +100,7 @@ func (n *Expand) Up(ctx context.Context) error {
        return nil
 }
 
-func (n *Expand) StartBundle(ctx context.Context, id string, data DataManager) 
error {
+func (n *Expand) StartBundle(ctx context.Context, id string, data DataContext) 
error {
        return n.Out.StartBundle(ctx, id, data)
 }
 
@@ -131,8 +131,12 @@ type filterReStream struct {
        real ReStream
 }
 
-func (f *filterReStream) Open() Stream {
-       return &filterStream{n: f.n, dec: f.dec, real: f.real.Open()}
+func (f *filterReStream) Open() (Stream, error) {
+       real, err := f.real.Open()
+       if err != nil {
+               return nil, err
+       }
+       return &filterStream{n: f.n, dec: f.dec, real: real}, nil
 }
 
 type filterStream struct {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/combine.go 
b/sdks/go/pkg/beam/core/runtime/exec/combine.go
index 9e9698f3dd3..99d23343b7c 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/combine.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/combine.go
@@ -65,7 +65,7 @@ func (n *Combine) Up(ctx context.Context) error {
 }
 
 // StartBundle initializes processing this bundle for combines.
-func (n *Combine) StartBundle(ctx context.Context, id string, data 
DataManager) error {
+func (n *Combine) StartBundle(ctx context.Context, id string, data 
DataContext) error {
        if n.status != Up {
                return fmt.Errorf("invalid status for combine %v: %v", n.UID, 
n.status)
        }
@@ -93,7 +93,10 @@ func (n *Combine) ProcessElement(ctx context.Context, value 
FullValue, values ..
        }
        first := true
 
-       stream := values[0].Open()
+       stream, err := values[0].Open()
+       if err != nil {
+               return n.fail(err)
+       }
        defer stream.Close()
        for {
                v, err := stream.Read()
@@ -243,7 +246,7 @@ func (n *LiftedCombine) String() string {
 }
 
 // StartBundle initializes the in memory cache of keys to accumulators.
-func (n *LiftedCombine) StartBundle(ctx context.Context, id string, data 
DataManager) error {
+func (n *LiftedCombine) StartBundle(ctx context.Context, id string, data 
DataContext) error {
        if err := n.Combine.StartBundle(ctx, id, data); err != nil {
                return err
        }
@@ -334,7 +337,10 @@ func (n *MergeAccumulators) ProcessElement(ctx 
context.Context, value FullValue,
        }
        first := true
 
-       stream := values[0].Open()
+       stream, err := values[0].Open()
+       if err != nil {
+               return n.fail(err)
+       }
        defer stream.Close()
        for {
                v, err := stream.Read()
diff --git a/sdks/go/pkg/beam/core/runtime/exec/combine_test.go 
b/sdks/go/pkg/beam/core/runtime/exec/combine_test.go
index 43bb4f2dc92..02bd3fdcf6e 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/combine_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/combine_test.go
@@ -119,7 +119,7 @@ func constructAndExecutePlan(t *testing.T, us []Unit) {
                t.Fatalf("failed to construct plan: %v", err)
        }
 
-       if err := p.Execute(context.Background(), "1", nil); err != nil {
+       if err := p.Execute(context.Background(), "1", DataContext{}); err != 
nil {
                t.Fatalf("execute failed: %v", err)
        }
        if err := p.Down(context.Background()); err != nil {
@@ -245,7 +245,7 @@ func (n *simpleGBK) Up(ctx context.Context) error {
        return nil
 }
 
-func (n *simpleGBK) StartBundle(ctx context.Context, id string, data 
DataManager) error {
+func (n *simpleGBK) StartBundle(ctx context.Context, id string, data 
DataContext) error {
        return n.Out.StartBundle(ctx, id, data)
 }
 
diff --git a/sdks/go/pkg/beam/core/runtime/exec/data.go 
b/sdks/go/pkg/beam/core/runtime/exec/data.go
new file mode 100644
index 00000000000..c4d0850b79d
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/exec/data.go
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package exec
+
+import (
+       "context"
+       "fmt"
+       "io"
+)
+
+// Port represents the connection port of external operations.
+type Port struct {
+       URL string
+}
+
+// Target represents the static target of external operations.
+type Target struct {
+       // ID is the transform ID.
+       ID string
+       // Name is a local name in the context of the transform.
+       Name string
+}
+
+// StreamID represents the static information needed to identify
+// a data stream. Dynamic information, notably bundleID, is provided
+// implicitly by the managers.
+type StreamID struct {
+       Port   Port
+       Target Target
+}
+
+func (id StreamID) String() string {
+       return fmt.Sprintf("S[%v:%v@%v]", id.Target.ID, id.Target.Name, 
id.Port.URL)
+}
+
+// DataContext holds connectors to various data connections, incl. state and 
side input.
+type DataContext struct {
+       Data      DataManager
+       SideInput SideInputReader
+}
+
+// DataManager manages external data byte streams. Each data stream can be
+// opened by one consumer only.
+type DataManager interface {
+       // OpenRead opens a closable byte stream for reading.
+       OpenRead(ctx context.Context, id StreamID) (io.ReadCloser, error)
+       // OpenWrite opens a closable byte stream for writing.
+       OpenWrite(ctx context.Context, id StreamID) (io.WriteCloser, error)
+}
+
+// SideInputReader is the interface for reading side input data.
+type SideInputReader interface {
+       // Open opens a byte stream for reading iterable side input.
+       Open(ctx context.Context, id StreamID, key, w []byte) (io.ReadCloser, 
error)
+}
+
+// TODO(herohde) 7/20/2018: user state management
diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasink.go 
b/sdks/go/pkg/beam/core/runtime/exec/datasink.go
index 6fc905ab788..e5aad063a03 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasink.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasink.go
@@ -29,10 +29,9 @@ import (
 
 // DataSink is a Node.
 type DataSink struct {
-       UID    UnitID
-       Port   Port
-       Target Target
-       Coder  *coder.Coder
+       UID   UnitID
+       SID   StreamID
+       Coder *coder.Coder
 
        enc   ElementEncoder
        wEnc  WindowEncoder
@@ -51,10 +50,8 @@ func (n *DataSink) Up(ctx context.Context) error {
        return nil
 }
 
-func (n *DataSink) StartBundle(ctx context.Context, id string, data 
DataManager) error {
-       sid := StreamID{Port: n.Port, Target: n.Target, InstID: id}
-
-       w, err := data.OpenWrite(ctx, sid)
+func (n *DataSink) StartBundle(ctx context.Context, id string, data 
DataContext) error {
+       w, err := data.Data.OpenWrite(ctx, n.SID)
        if err != nil {
                return err
        }
@@ -92,6 +89,5 @@ func (n *DataSink) Down(ctx context.Context) error {
 }
 
 func (n *DataSink) String() string {
-       sid := StreamID{Port: n.Port, Target: n.Target}
-       return fmt.Sprintf("DataSink[%v] Coder:%v", sid, n.Coder)
+       return fmt.Sprintf("DataSink[%v] Coder:%v", n.SID, n.Coder)
 }
diff --git a/sdks/go/pkg/beam/core/runtime/exec/datasource.go 
b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
index e98d2f6d4e0..608120270c2 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/datasource.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/datasource.go
@@ -28,14 +28,12 @@ import (
 
 // DataSource is a Root execution unit.
 type DataSource struct {
-       UID    UnitID
-       Port   Port
-       Target Target
-       Coder  *coder.Coder
-       Out    Node
-
-       sid    StreamID
-       source DataReader
+       UID   UnitID
+       SID   StreamID
+       Coder *coder.Coder
+       Out   Node
+
+       source DataManager
        count  int64
        start  time.Time
 }
@@ -48,16 +46,15 @@ func (n *DataSource) Up(ctx context.Context) error {
        return nil
 }
 
-func (n *DataSource) StartBundle(ctx context.Context, id string, data 
DataManager) error {
-       n.sid = StreamID{Port: n.Port, Target: n.Target, InstID: id}
-       n.source = data
+func (n *DataSource) StartBundle(ctx context.Context, id string, data 
DataContext) error {
+       n.source = data.Data
        n.start = time.Now()
        atomic.StoreInt64(&n.count, 0)
        return n.Out.StartBundle(ctx, id, data)
 }
 
 func (n *DataSource) Process(ctx context.Context) error {
-       r, err := n.source.OpenRead(ctx, n.sid)
+       r, err := n.source.OpenRead(ctx, n.SID)
        if err != nil {
                return err
        }
@@ -176,20 +173,17 @@ func (n *DataSource) Process(ctx context.Context) error {
 
 func (n *DataSource) FinishBundle(ctx context.Context) error {
        log.Infof(ctx, "DataSource: %d elements in %d ns", 
atomic.LoadInt64(&n.count), time.Now().Sub(n.start))
-       n.sid = StreamID{}
        n.source = nil
        return n.Out.FinishBundle(ctx)
 }
 
 func (n *DataSource) Down(ctx context.Context) error {
-       n.sid = StreamID{}
        n.source = nil
        return nil
 }
 
 func (n *DataSource) String() string {
-       sid := StreamID{Port: n.Port, Target: n.Target}
-       return fmt.Sprintf("DataSource[%v] Coder:%v Out:%v", sid, n.Coder, 
n.Out.ID())
+       return fmt.Sprintf("DataSource[%v] Coder:%v Out:%v", n.SID, n.Coder, 
n.Out.ID())
 }
 
 // ProgressReportSnapshot captures the progress reading an input source.
@@ -203,5 +197,5 @@ func (n *DataSource) Progress() ProgressReportSnapshot {
        if n == nil {
                return ProgressReportSnapshot{}
        }
-       return ProgressReportSnapshot{n.sid.Target.ID, n.sid.Target.Name, 
atomic.LoadInt64(&n.count)}
+       return ProgressReportSnapshot{n.SID.Target.ID, n.SID.Target.Name, 
atomic.LoadInt64(&n.count)}
 }
diff --git a/sdks/go/pkg/beam/core/runtime/exec/discard.go 
b/sdks/go/pkg/beam/core/runtime/exec/discard.go
index df646c482ec..e094f3fbe62 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/discard.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/discard.go
@@ -32,7 +32,7 @@ func (d *Discard) Up(ctx context.Context) error {
        return nil
 }
 
-func (d *Discard) StartBundle(ctx context.Context, id string, data 
DataManager) error {
+func (d *Discard) StartBundle(ctx context.Context, id string, data 
DataContext) error {
        return nil
 }
 
diff --git a/sdks/go/pkg/beam/core/runtime/exec/flatten.go 
b/sdks/go/pkg/beam/core/runtime/exec/flatten.go
index c585667c1e8..2e8b21e6cf4 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/flatten.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/flatten.go
@@ -42,7 +42,7 @@ func (m *Flatten) Up(ctx context.Context) error {
        return nil
 }
 
-func (m *Flatten) StartBundle(ctx context.Context, id string, data 
DataManager) error {
+func (m *Flatten) StartBundle(ctx context.Context, id string, data 
DataContext) error {
        if m.active {
                return nil // ok: ignore multiple start bundles. We just want 
the first one.
        }
diff --git a/sdks/go/pkg/beam/core/runtime/exec/flatten_test.go 
b/sdks/go/pkg/beam/core/runtime/exec/flatten_test.go
index 045952af431..137353d594e 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/flatten_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/flatten_test.go
@@ -35,7 +35,7 @@ func TestFlatten(t *testing.T) {
                t.Fatalf("failed to construct plan: %v", err)
        }
 
-       if err := p.Execute(context.Background(), "1", nil); err != nil {
+       if err := p.Execute(context.Background(), "1", DataContext{}); err != 
nil {
                t.Fatalf("execute failed: %v", err)
        }
        if err := p.Down(context.Background()); err != nil {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/fn.go 
b/sdks/go/pkg/beam/core/runtime/exec/fn.go
index 23d7bbe583f..600c35d7303 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fn.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/fn.go
@@ -141,8 +141,6 @@ func (n *invoker) Invoke(ctx context.Context, ws 
[]typex.Window, ts typex.EventT
        }
 
        // (3) Precomputed side input and emitters (or other output).
-       // TODO(lostluck): 2018/07/10 extras (emitters and side inputs), are 
constant so we could
-       // initialize them once at construction time, and not clear them in 
Reset.
        for _, arg := range extra {
                args[in[i]] = arg
                i++
@@ -228,7 +226,7 @@ func makeEmitters(fn *funcx.Fn, nodes []Node) 
([]ReusableEmitter, error) {
 func makeSideInput(kind graph.InputKind, t reflect.Type, values ReStream) 
(ReusableInput, error) {
        switch kind {
        case graph.Singleton:
-               elms, err := ReadAll(values.Open())
+               elms, err := ReadAll(values)
                if err != nil {
                        return nil, err
                }
@@ -238,7 +236,7 @@ func makeSideInput(kind graph.InputKind, t reflect.Type, 
values ReStream) (Reusa
                return &fixedValue{val: Convert(elms[0].Elm, t)}, nil
 
        case graph.Slice:
-               elms, err := ReadAll(values.Open())
+               elms, err := ReadAll(values)
                if err != nil {
                        return nil, err
                }
diff --git a/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go 
b/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go
index ce17ba12d4a..771d8bc18fe 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go
@@ -52,9 +52,9 @@ type Stream interface {
        Read() (FullValue, error)
 }
 
-// ReStream is Stream factory.
+// ReStream is re-iterable stream, i.e., a Stream factory.
 type ReStream interface {
-       Open() Stream
+       Open() (Stream, error)
 }
 
 // FixedReStream is a simple in-memory ReSteam.
@@ -62,8 +62,8 @@ type FixedReStream struct {
        Buf []FullValue
 }
 
-func (n *FixedReStream) Open() Stream {
-       return &FixedStream{Buf: n.Buf}
+func (n *FixedReStream) Open() (Stream, error) {
+       return &FixedStream{Buf: n.Buf}, nil
 }
 
 // FixedStream is a simple in-memory Stream from a fixed array.
@@ -125,8 +125,12 @@ func Convert(v interface{}, to reflect.Type) interface{} {
        }
 }
 
-// ReadAll read the full stream and returns the result. It always closes the 
stream.
-func ReadAll(s Stream) ([]FullValue, error) {
+// ReadAll read a full restream and returns the result.
+func ReadAll(rs ReStream) ([]FullValue, error) {
+       s, err := rs.Open()
+       if err != nil {
+               return nil, err
+       }
        defer s.Close()
 
        var ret []FullValue
diff --git a/sdks/go/pkg/beam/core/runtime/exec/input.go 
b/sdks/go/pkg/beam/core/runtime/exec/input.go
index 60510a72f70..dff79fe1be9 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/input.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/input.go
@@ -126,7 +126,11 @@ func makeIter(t reflect.Type, s ReStream) ReusableInput {
 }
 
 func (v *iterValue) Init() error {
-       v.cur = v.s.Open()
+       cur, err := v.s.Open()
+       if err != nil {
+               return err
+       }
+       v.cur = cur
        return nil
 }
 
@@ -135,6 +139,9 @@ func (v *iterValue) Value() interface{} {
 }
 
 func (v *iterValue) Reset() error {
+       if v.cur == nil {
+               panic("Init() not called")
+       }
        if err := v.cur.Close(); err != nil {
                return err
        }
@@ -143,6 +150,9 @@ func (v *iterValue) Reset() error {
 }
 
 func (v *iterValue) invoke(args []reflect.Value) []reflect.Value {
+       if v.cur == nil {
+               panic("Init() not called")
+       }
        elm, err := v.cur.Read()
        if err != nil {
                if err == io.EOF {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/multiplex.go 
b/sdks/go/pkg/beam/core/runtime/exec/multiplex.go
index 3b5c5669163..df0fa3d5896 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/multiplex.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/multiplex.go
@@ -36,7 +36,7 @@ func (m *Multiplex) Up(ctx context.Context) error {
        return nil
 }
 
-func (m *Multiplex) StartBundle(ctx context.Context, id string, data 
DataManager) error {
+func (m *Multiplex) StartBundle(ctx context.Context, id string, data 
DataContext) error {
        return MultiStartBundle(ctx, id, data, m.Out...)
 }
 
diff --git a/sdks/go/pkg/beam/core/runtime/exec/multiplex_test.go 
b/sdks/go/pkg/beam/core/runtime/exec/multiplex_test.go
index f300b685d67..7f5e7c7d354 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/multiplex_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/multiplex_test.go
@@ -35,7 +35,7 @@ func TestMultiplex(t *testing.T) {
                t.Fatalf("failed to construct plan: %v", err)
        }
 
-       if err := p.Execute(context.Background(), "1", nil); err != nil {
+       if err := p.Execute(context.Background(), "1", DataContext{}); err != 
nil {
                t.Fatalf("execute failed: %v", err)
        }
        if err := p.Down(context.Background()); err != nil {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/optimized/inputs.go 
b/sdks/go/pkg/beam/core/runtime/exec/optimized/inputs.go
index e15807db985..93817a74b21 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/optimized/inputs.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/optimized/inputs.go
@@ -1050,7 +1050,11 @@ type iterNative struct {
 }
 
 func (v *iterNative) Init() error {
-       v.cur = v.s.Open()
+       cur, err := v.s.Open()
+       if err != nil {
+               return err
+       }
+       v.cur = cur
        return nil
 }
 
diff --git a/sdks/go/pkg/beam/core/runtime/exec/optimized/inputs.tmpl 
b/sdks/go/pkg/beam/core/runtime/exec/optimized/inputs.tmpl
index 24530abf42c..84475adeeb2 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/optimized/inputs.tmpl
+++ b/sdks/go/pkg/beam/core/runtime/exec/optimized/inputs.tmpl
@@ -47,7 +47,11 @@ type iterNative struct {
 }
 
 func (v *iterNative) Init() error {
-       v.cur = v.s.Open()
+    cur, err := v.s.Open()
+    if err != nil {
+       return err
+    }
+       v.cur = cur
        return nil
 }
 
diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo.go 
b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
index f0e62cc3b81..8623e8810ef 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/pardo.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/pardo.go
@@ -34,19 +34,26 @@ type ParDo struct {
        UID     UnitID
        Fn      *graph.DoFn
        Inbound []*graph.Inbound
-       Side    []ReStream
+       Side    []SideInputAdapter
        Out     []Node
 
-       PID       string
-       ready     bool
-       sideinput []ReusableInput
-       emitters  []ReusableEmitter
-       extra     []interface{}
+       PID      string
+       emitters []ReusableEmitter
+       ctx      context.Context
+       inv      *invoker
+
+       side  SideInputReader
+       cache *cacheElm
 
        status Status
        err    errorx.GuardedError
-       ctx    context.Context
-       inv    *invoker
+}
+
+// cacheElm holds per-window cached information about side input.
+type cacheElm struct {
+       key       typex.Window
+       sideinput []ReusableInput
+       extra     []interface{}
 }
 
 func (n *ParDo) ID() UnitID {
@@ -63,14 +70,21 @@ func (n *ParDo) Up(ctx context.Context) error {
        if _, err := InvokeWithoutEventTime(ctx, n.Fn.SetupFn(), nil); err != 
nil {
                return n.fail(err)
        }
+
+       emitters, err := makeEmitters(n.Fn.ProcessElementFn(), n.Out)
+       if err != nil {
+               return n.fail(err)
+       }
+       n.emitters = emitters
        return nil
 }
 
-func (n *ParDo) StartBundle(ctx context.Context, id string, data DataManager) 
error {
+func (n *ParDo) StartBundle(ctx context.Context, id string, data DataContext) 
error {
        if n.status != Up {
                return fmt.Errorf("invalid status for pardo %v: %v, want Up", 
n.UID, n.status)
        }
        n.status = Active
+       n.side = data.SideInput
        // Allocating contexts all the time is expensive, but we seldom 
re-write them,
        // and never accept modified contexts from users, so we will cache them 
per-bundle
        // per-unit, to avoid the constant allocation overhead.
@@ -80,16 +94,7 @@ func (n *ParDo) StartBundle(ctx context.Context, id string, 
data DataManager) er
                return n.fail(err)
        }
 
-       // TODO(BEAM-3303) 12/12/2017: recalculate side inputs per bundle. The 
results may
-       // not be valid in the presence of windowing.
-
-       // NOTE(herohde) 12/13/2017: we require that side input is available on 
StartBundle.
-
-       if err := n.initIfNeeded(); err != nil {
-               return n.fail(err)
-       }
-
-       // TODO(BEAM-3303): what to set for StartBundle/FinishBundle emitter 
timestamp?
+       // TODO(BEAM-3303): what to set for StartBundle/FinishBundle window and 
emitter timestamp?
 
        if _, err := n.invokeDataFn(n.ctx, window.SingleGlobalWindow, 
mtime.ZeroTimestamp, n.Fn.StartBundleFn(), nil); err != nil {
                return n.fail(err)
@@ -104,7 +109,7 @@ func (n *ParDo) ProcessElement(ctx context.Context, elm 
FullValue, values ...ReS
        // If the function observes windows, we must invoke it for each window. 
The expected fast path
        // is that either there is a single window or the function doesn't 
observes windows.
 
-       if !mustExplodeWindows(n.inv.fn, elm) {
+       if !mustExplodeWindows(n.inv.fn, elm, len(n.Side) > 0) {
                val, err := n.invokeProcessFn(n.ctx, elm.Windows, 
elm.Timestamp, &MainInput{Key: elm, Values: values})
                if err != nil {
                        return n.fail(err)
@@ -133,13 +138,14 @@ func (n *ParDo) ProcessElement(ctx context.Context, elm 
FullValue, values ...ReS
 }
 
 // mustExplodeWindows returns true iif we need to call the function
-// for each window.
-func mustExplodeWindows(fn *funcx.Fn, elm FullValue) bool {
+// for each window. It is needed if the function either observes the
+// window, either directly or indirectly via (windowed) side inputs.
+func mustExplodeWindows(fn *funcx.Fn, elm FullValue, usesSideInput bool) bool {
        if len(elm.Windows) < 2 {
                return false
        }
        _, explode := fn.Window()
-       return explode
+       return explode || usesSideInput
 }
 
 func (n *ParDo) FinishBundle(ctx context.Context) error {
@@ -152,6 +158,9 @@ func (n *ParDo) FinishBundle(ctx context.Context) error {
        if _, err := n.invokeDataFn(n.ctx, window.SingleGlobalWindow, 
mtime.ZeroTimestamp, n.Fn.FinishBundleFn(), nil); err != nil {
                return n.fail(err)
        }
+       n.side = nil
+       n.cache = nil
+
        if err := MultiFinishBundle(n.ctx, n.Out...); err != nil {
                return n.fail(err)
        }
@@ -163,6 +172,8 @@ func (n *ParDo) Down(ctx context.Context) error {
                return n.err.Error()
        }
        n.status = Down
+       n.side = nil
+       n.cache = nil
 
        if _, err := InvokeWithoutEventTime(ctx, n.Fn.TeardownFn(), nil); err 
!= nil {
                n.err.TrySetError(err)
@@ -170,31 +181,58 @@ func (n *ParDo) Down(ctx context.Context) error {
        return n.err.Error()
 }
 
-func (n *ParDo) initIfNeeded() error {
-       if n.ready {
+func (n *ParDo) initSideInput(ctx context.Context, w typex.Window) error {
+       if n.cache == nil {
+               // First time: init single-element cache. We know that side 
input
+               // must come before emitters in the signature.
+
+               sideCount := len(n.Side)
+               emitCount := len(n.emitters)
+
+               n.cache = &cacheElm{
+                       key:   w,
+                       extra: make([]interface{}, sideCount+emitCount, 
sideCount+emitCount),
+               }
+               for i, emit := range n.emitters {
+                       n.cache.extra[i+sideCount] = emit.Value()
+               }
+       } else if w.Equals(n.cache.key) {
+               // Fast path: same window. Just unwind the side inputs.
+
+               for _, s := range n.cache.sideinput {
+                       if err := s.Init(); err != nil {
+                               return err
+                       }
+               }
                return nil
        }
-       n.ready = true
 
-       // Setup reusable side input and emitters. It's currently a requirement 
that data
-       // processing methods consume the same side input/emitters.
+       // Slow path: init side input for the given window
 
-       var err error
-       n.sideinput, err = makeSideInputs(n.Fn.ProcessElementFn(), n.Inbound, 
n.Side)
-       if err != nil {
-               return n.fail(err)
+       streams := make([]ReStream, len(n.Side), len(n.Side))
+       for i, adapter := range n.Side {
+               s, err := adapter.NewIterable(ctx, n.side, w)
+               if err != nil {
+                       return err
+               }
+               streams[i] = s
        }
-       n.emitters, err = makeEmitters(n.Fn.ProcessElementFn(), n.Out)
+
+       sideinput, err := makeSideInputs(n.Fn.ProcessElementFn(), n.Inbound, 
streams)
        if err != nil {
-               return n.fail(err)
+               return err
        }
-       for _, s := range n.sideinput {
-               n.extra = append(n.extra, s.Value())
+       n.cache.sideinput = sideinput
+       for i := 0; i < len(n.Side); i++ {
+               n.cache.extra[i] = sideinput[i].Value()
        }
-       for _, e := range n.emitters {
-               n.extra = append(n.extra, e.Value())
+
+       for _, s := range n.cache.sideinput {
+               if err := s.Init(); err != nil {
+                       return err
+               }
        }
-       return err
+       return nil
 }
 
 // invokeDataFn handle non-per element invocations.
@@ -205,7 +243,7 @@ func (n *ParDo) invokeDataFn(ctx context.Context, ws 
[]typex.Window, ts typex.Ev
        if err := n.preInvoke(ctx, ws, ts); err != nil {
                return nil, err
        }
-       val, err := Invoke(ctx, ws, ts, fn, opt, n.extra...)
+       val, err := Invoke(ctx, ws, ts, fn, opt, n.cache.extra...)
        if err != nil {
                return nil, err
        }
@@ -220,7 +258,7 @@ func (n *ParDo) invokeProcessFn(ctx context.Context, ws 
[]typex.Window, ts typex
        if err := n.preInvoke(ctx, ws, ts); err != nil {
                return nil, err
        }
-       val, err := n.inv.Invoke(ctx, ws, ts, opt, n.extra...)
+       val, err := n.inv.Invoke(ctx, ws, ts, opt, n.cache.extra...)
        if err != nil {
                return nil, err
        }
@@ -236,16 +274,11 @@ func (n *ParDo) preInvoke(ctx context.Context, ws 
[]typex.Window, ts typex.Event
                        return err
                }
        }
-       for _, s := range n.sideinput {
-               if err := s.Init(); err != nil {
-                       return err
-               }
-       }
-       return nil
+       return n.initSideInput(ctx, ws[0])
 }
 
 func (n *ParDo) postInvoke() error {
-       for _, s := range n.sideinput {
+       for _, s := range n.cache.sideinput {
                if err := s.Reset(); err != nil {
                        return err
                }
diff --git a/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go 
b/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go
index 0df6961659b..c378bc69b73 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/pardo_test.go
@@ -67,11 +67,11 @@ func TestParDo(t *testing.T) {
 
        out := &CaptureNode{UID: 1}
        sum := &CaptureNode{UID: 2}
-       pardo := &ParDo{UID: 3, Fn: edge.DoFn, Inbound: edge.Input, Out: 
[]Node{out, sum}, Side: []ReStream{
-               &FixedReStream{Buf: makeValues(1)},       // a
-               &FixedReStream{Buf: makeValues(2, 3, 4)}, // b
-               &FixedReStream{Buf: makeValues(5, 6)},    // c
-               &FixedReStream{Buf: makeValues(7, 8, 9)}, // d
+       pardo := &ParDo{UID: 3, Fn: edge.DoFn, Inbound: edge.Input, Out: 
[]Node{out, sum}, Side: []SideInputAdapter{
+               &FixedSideInputAdapter{Val: &FixedReStream{Buf: 
makeValues(1)}},       // a
+               &FixedSideInputAdapter{Val: &FixedReStream{Buf: makeValues(2, 
3, 4)}}, // b
+               &FixedSideInputAdapter{Val: &FixedReStream{Buf: makeValues(5, 
6)}},    // c
+               &FixedSideInputAdapter{Val: &FixedReStream{Buf: makeValues(7, 
8, 9)}}, // d
        }}
        n := &FixedRoot{UID: 4, Elements: makeInput(10, 20, 30), Out: pardo}
 
@@ -80,7 +80,7 @@ func TestParDo(t *testing.T) {
                t.Fatalf("failed to construct plan: %v", err)
        }
 
-       if err := p.Execute(context.Background(), "1", nil); err != nil {
+       if err := p.Execute(context.Background(), "1", DataContext{}); err != 
nil {
                t.Fatalf("execute failed: %v", err)
        }
        if err := p.Down(context.Background()); err != nil {
@@ -131,7 +131,7 @@ func BenchmarkParDo_EmitSumFn(b *testing.B) {
                b.Fatalf("failed to construct plan: %v", err)
        }
        go func() {
-               if err := p.Execute(context.Background(), "1", nil); err != nil 
{
+               if err := p.Execute(context.Background(), "1", DataContext{}); 
err != nil {
                        b.Fatalf("execute failed: %v", err)
                }
                if err := p.Down(context.Background()); err != nil {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/plan.go 
b/sdks/go/pkg/beam/core/runtime/exec/plan.go
index 6fc2b363ae3..bae0c3cf9c2 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/plan.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/plan.go
@@ -83,7 +83,7 @@ func (p *Plan) ID() string {
 // Execute executes the plan with the given data context and bundle id. Units
 // are brought up on the first execution. If a bundle fails, the plan cannot
 // be reused for further bundles. Does not panic. Blocking.
-func (p *Plan) Execute(ctx context.Context, id string, manager DataManager) 
error {
+func (p *Plan) Execute(ctx context.Context, id string, manager DataContext) 
error {
        ctx = metrics.SetBundleID(ctx, p.id)
        if p.status == Initializing {
                for _, u := range p.units {
diff --git a/sdks/go/pkg/beam/core/runtime/exec/sideinput.go 
b/sdks/go/pkg/beam/core/runtime/exec/sideinput.go
new file mode 100644
index 00000000000..4d4a247ef5f
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/exec/sideinput.go
@@ -0,0 +1,150 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package exec
+
+import (
+       "context"
+       "fmt"
+       "io"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/graph/coder"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
+)
+
+// This file contains support for side input.
+
+// iterableSideInputKey is the fixed runtime key value for iterable side input.
+const iterableSideInputKey = ""
+
+// SideInputAdapter provides a concrete ReStream from a low-level side input 
reader. It
+// encapsulates StreamID and coding as needed.
+type SideInputAdapter interface {
+       NewIterable(ctx context.Context, reader SideInputReader, w 
typex.Window) (ReStream, error)
+}
+
+type sideInputAdapter struct {
+       sid StreamID
+       wc  WindowEncoder
+       kc  ElementEncoder
+       ec  ElementDecoder
+}
+
+// NewSideInputAdapter returns a side input adapter for the given StreamID and 
coder.
+// It expects a W<KV<K,V>> coder, because the protocol supports MultiSet 
access only.
+func NewSideInputAdapter(sid StreamID, c *coder.Coder) SideInputAdapter {
+       if !coder.IsW(c) || !coder.IsKV(coder.SkipW(c)) {
+               panic(fmt.Sprintf("expected WKV coder for side input: %v", c))
+       }
+
+       wc := MakeWindowEncoder(c.Window)
+       kc := MakeElementEncoder(coder.SkipW(c).Components[0])
+       ec := MakeElementDecoder(coder.SkipW(c).Components[1])
+       return &sideInputAdapter{sid: sid, wc: wc, kc: kc, ec: ec}
+}
+
+func (s *sideInputAdapter) NewIterable(ctx context.Context, reader 
SideInputReader, w typex.Window) (ReStream, error) {
+       key, err := EncodeElement(s.kc, []byte(iterableSideInputKey))
+       if err != nil {
+               return nil, err
+       }
+       win, err := EncodeWindow(s.wc, w)
+       if err != nil {
+               return nil, err
+       }
+       return &proxyReStream{
+               open: func() (Stream, error) {
+                       r, err := reader.Open(ctx, s.sid, key, win)
+                       if err != nil {
+                               return nil, err
+                       }
+                       return &elementStream{r: r, ec: s.ec}, nil
+               },
+       }, nil
+}
+
+func (s *sideInputAdapter) String() string {
+       return fmt.Sprintf("SideInputAdapter[%v]", s.sid)
+}
+
+// proxyReStream is a simple wrapper of an open function.
+type proxyReStream struct {
+       open func() (Stream, error)
+}
+
+func (p *proxyReStream) Open() (Stream, error) {
+       return p.open()
+}
+
+// elementStream exposes a Stream from decoding elements.
+type elementStream struct {
+       r  io.ReadCloser
+       ec ElementDecoder
+}
+
+func (s *elementStream) Close() error {
+       return s.r.Close()
+}
+
+func (s *elementStream) Read() (FullValue, error) {
+       // We should see a stream of unwindowed values -- no sizes, no key.
+       return s.ec.Decode(s.r)
+}
+
+// FixedKey transform any value into KV<K, V> for a fixed K.
+type FixedKey struct {
+       // UID is the unit identifier.
+       UID UnitID
+       // Key is the given key
+       Key interface{}
+       // Out is the successor node.
+       Out Node
+}
+
+func (n *FixedKey) ID() UnitID {
+       return n.UID
+}
+
+func (n *FixedKey) Up(ctx context.Context) error {
+       return nil
+}
+
+func (n *FixedKey) StartBundle(ctx context.Context, id string, data 
DataContext) error {
+       return n.Out.StartBundle(ctx, id, data)
+}
+
+func (n *FixedKey) ProcessElement(ctx context.Context, elm FullValue, values 
...ReStream) error {
+       // Transform: V to KV<K,V>
+
+       v := FullValue{
+               Elm:       n.Key,
+               Elm2:      elm,
+               Timestamp: elm.Timestamp,
+               Windows:   elm.Windows,
+       }
+       return n.Out.ProcessElement(ctx, v, values...)
+}
+
+func (n *FixedKey) FinishBundle(ctx context.Context) error {
+       return n.Out.FinishBundle(ctx)
+}
+
+func (n *FixedKey) Down(ctx context.Context) error {
+       return nil
+}
+
+func (n *FixedKey) String() string {
+       return fmt.Sprintf("FixedKey[%v]. Out:%v", n.Key, n.Out.ID())
+}
diff --git a/sdks/go/pkg/beam/core/runtime/exec/translate.go 
b/sdks/go/pkg/beam/core/runtime/exec/translate.go
index ab8f306b873..696297cea67 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/translate.go
@@ -62,10 +62,10 @@ func UnmarshalPlan(desc *fnpb.ProcessBundleDescriptor) 
(*Plan, error) {
                        return nil, err
                }
 
-               u := &DataSource{UID: b.idgen.New(), Port: port}
+               u := &DataSource{UID: b.idgen.New()}
 
                for key, pid := range transform.GetOutputs() {
-                       u.Target = Target{ID: id, Name: key}
+                       u.SID = StreamID{Target: Target{ID: id, Name: key}, 
Port: port}
 
                        u.Out, err = b.makePCollection(pid)
                        if err != nil {
@@ -381,12 +381,29 @@ func (b *builder) makeLink(from string, id linkID) (Node, 
error) {
                                }
                                // TODO(lostluck): 2018/03/22 Look into why 
transform.UniqueName isn't populated at this point, and switch n.PID to that 
instead.
                                n.PID = path.Base(n.Fn.Name())
-                               if len(in) == 1 {
-                                       u = n
-                                       break
+
+                               input := 
unmarshalKeyedValues(transform.GetInputs())
+                               for i := 1; i < len(input); i++ {
+                                       // TODO(herohde) 8/8/2018: handle 
different windows, view_fn and window_mapping_fn.
+                                       // For now, assume we don't need any 
information in the pardo payload.
+
+                                       ec, wc, err := 
b.makeCoderForPCollection(input[i])
+                                       if err != nil {
+                                               return nil, err
+                                       }
+
+                                       sid := StreamID{
+                                               Port: Port{URL: 
b.desc.GetStateApiServiceDescriptor().GetUrl()},
+                                               Target: Target{
+                                                       ID:   id.to,            
     // PTransformID
+                                                       Name: 
fmt.Sprintf("i%v", i), // SideInputID (= local id, "iN")
+                                               },
+                                       }
+                                       side := NewSideInputAdapter(sid, 
coder.NewW(ec, wc))
+                                       n.Side = append(n.Side, side)
                                }
+                               u = n
 
-                               panic("NYI: side input")
                        case graph.Combine:
                                cn := &Combine{UID: b.idgen.New(), Out: out[0]}
                                cn.Fn, err = graph.AsCombineFn(fn)
@@ -408,6 +425,9 @@ func (b *builder) makeLink(from string, id linkID) (Node, 
error) {
                                panic(fmt.Sprintf("Opcode should be one of 
ParDo or Combine, but it is: %v", op))
                        }
 
+               case graphx.URNIterableSideInputKey:
+                       u = &FixedKey{UID: b.idgen.New(), Key: 
[]byte(iterableSideInputKey), Out: out[0]}
+
                case graphx.URNInject:
                        c, _, err := b.makeCoderForPCollection(from)
                        if err != nil {
@@ -461,10 +481,10 @@ func (b *builder) makeLink(from string, id linkID) (Node, 
error) {
                        return nil, err
                }
 
-               sink := &DataSink{UID: b.idgen.New(), Port: port}
+               sink := &DataSink{UID: b.idgen.New()}
 
                for key, pid := range transform.GetInputs() {
-                       sink.Target = Target{ID: id.to, Name: key}
+                       sink.SID = StreamID{Target: Target{ID: id.to, Name: 
key}, Port: port}
 
                        if cid == "" {
                                c, wc, err := b.makeCoderForPCollection(pid)
diff --git a/sdks/go/pkg/beam/core/runtime/exec/unit.go 
b/sdks/go/pkg/beam/core/runtime/exec/unit.go
index 19a60c5b3fc..1ececc8b5ef 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/unit.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/unit.go
@@ -36,7 +36,7 @@ type Unit interface {
 
        // StartBundle signals that processing preconditions, such as 
availability
        // of side input, are met and starts the given bundle.
-       StartBundle(ctx context.Context, id string, data DataManager) error
+       StartBundle(ctx context.Context, id string, data DataContext) error
 
        // FinishBundle signals end of input and thus finishes the bundle. Any
        // data connections must be closed.
diff --git a/sdks/go/pkg/beam/core/runtime/exec/unit_test.go 
b/sdks/go/pkg/beam/core/runtime/exec/unit_test.go
index c05a94d907d..b8358e96038 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/unit_test.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/unit_test.go
@@ -18,6 +18,8 @@ package exec
 import (
        "context"
        "fmt"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
 )
 
 // CaptureNode is a test Node that captures all elements for verification. It 
also
@@ -41,7 +43,7 @@ func (n *CaptureNode) Up(ctx context.Context) error {
        return nil
 }
 
-func (n *CaptureNode) StartBundle(ctx context.Context, id string, data 
DataManager) error {
+func (n *CaptureNode) StartBundle(ctx context.Context, id string, data 
DataContext) error {
        if n.status != Up {
                return fmt.Errorf("invalid status for %v: %v, want Up", n.UID, 
n.status)
        }
@@ -89,7 +91,7 @@ func (n *FixedRoot) Up(ctx context.Context) error {
        return nil
 }
 
-func (n *FixedRoot) StartBundle(ctx context.Context, id string, data 
DataManager) error {
+func (n *FixedRoot) StartBundle(ctx context.Context, id string, data 
DataContext) error {
        return n.Out.StartBundle(ctx, id, data)
 }
 
@@ -110,6 +112,15 @@ func (n *FixedRoot) Down(ctx context.Context) error {
        return nil
 }
 
+// FixedSideInputAdapter is an adapter for a fixed ReStream.
+type FixedSideInputAdapter struct {
+       Val ReStream
+}
+
+func (a *FixedSideInputAdapter) NewIterable(ctx context.Context, reader 
SideInputReader, w typex.Window) (ReStream, error) {
+       return a.Val, nil
+}
+
 // BenchRoot is a test Root that emits elements through a channel for 
benchmarking purposes.
 type BenchRoot struct {
        UID      UnitID
@@ -125,7 +136,7 @@ func (n *BenchRoot) Up(ctx context.Context) error {
        return nil
 }
 
-func (n *BenchRoot) StartBundle(ctx context.Context, id string, data 
DataManager) error {
+func (n *BenchRoot) StartBundle(ctx context.Context, id string, data 
DataContext) error {
        return n.Out.StartBundle(ctx, id, data)
 }
 
diff --git a/sdks/go/pkg/beam/core/runtime/exec/util.go 
b/sdks/go/pkg/beam/core/runtime/exec/util.go
index 4b03f3e2708..1fdfd76508a 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/util.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/util.go
@@ -43,7 +43,7 @@ func callNoPanic(ctx context.Context, fn 
func(context.Context) error) (err error
 }
 
 // MultiStartBundle calls StartBundle on multiple nodes. Convenience function.
-func MultiStartBundle(ctx context.Context, id string, data DataManager, list 
...Node) error {
+func MultiStartBundle(ctx context.Context, id string, data DataContext, list 
...Node) error {
        for _, n := range list {
                if err := n.StartBundle(ctx, id, data); err != nil {
                        return err
diff --git a/sdks/go/pkg/beam/core/runtime/exec/window.go 
b/sdks/go/pkg/beam/core/runtime/exec/window.go
index 38ef08cd266..f77d3ac939d 100644
--- a/sdks/go/pkg/beam/core/runtime/exec/window.go
+++ b/sdks/go/pkg/beam/core/runtime/exec/window.go
@@ -40,7 +40,7 @@ func (w *WindowInto) Up(ctx context.Context) error {
        return nil
 }
 
-func (w *WindowInto) StartBundle(ctx context.Context, id string, data 
DataManager) error {
+func (w *WindowInto) StartBundle(ctx context.Context, id string, data 
DataContext) error {
        return w.Out.StartBundle(ctx, id, data)
 }
 
diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go 
b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
index d07e0b76d96..3bc67ae7e28 100644
--- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go
+++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go
@@ -39,7 +39,7 @@ const (
        URNCombinePerKey = "beam:transform:combine_per_key:v1"
        URNWindow        = "beam:transform:window:v1"
 
-       URNIterableSideInput = "beam:side_input:iterable:v1"
+       // URNIterableSideInput = "beam:side_input:iterable:v1"
        URNMultimapSideInput = "beam:side_input:multimap:v1"
 
        URNGlobalWindowsWindowFn  = "beam:windowfn:global_windows:v0.1"
@@ -54,6 +54,8 @@ const (
        // uses the model pipeline and no longer falls back to Java.
        URNJavaDoFn = "urn:beam:dofn:javasdk:0.1"
        URNDoFn     = "beam:go:transform:dofn:v1"
+
+       URNIterableSideInputKey = "beam:go:transform:iterablesideinputkey:v1"
 )
 
 // TODO(herohde) 11/6/2017: move some of the configuration into the graph 
during construction.
@@ -209,13 +211,131 @@ func (m *marshaller) addMultiEdge(edge NamedEdge) string 
{
                outputs[fmt.Sprintf("i%v", i)] = nodeID(out.To)
        }
 
+       var spec *pb.FunctionSpec
+       switch edge.Edge.Op {
+       case graph.Impulse:
+               // TODO(herohde) 7/18/2018: Encode data?
+               spec = &pb.FunctionSpec{Urn: URNImpulse}
+
+       case graph.ParDo:
+               si := make(map[string]*pb.SideInput)
+               for i, in := range edge.Edge.Input {
+                       switch in.Kind {
+                       case graph.Main:
+                               // ignore: not a side input
+
+                       case graph.Singleton, graph.Slice, graph.Iter, 
graph.ReIter:
+                               // The only supported form of side input is 
MultiMap, but we
+                               // want just iteration. So we must manually add 
a fixed key,
+                               // "", even if the input is already KV.
+
+                               out := fmt.Sprintf("%v_keyed%v_%v", 
nodeID(in.From), edgeID(edge.Edge), i)
+                               m.makeNode(out, 
m.coders.Add(makeBytesKeyedCoder(in.From.Coder)), in.From)
+
+                               payload := &pb.ParDoPayload{
+                                       DoFn: &pb.SdkFunctionSpec{
+                                               Spec: &pb.FunctionSpec{
+                                                       Urn: 
URNIterableSideInputKey,
+                                                       Payload: 
[]byte(protox.MustEncodeBase64(&v1.TransformPayload{
+                                                               Urn: 
URNIterableSideInputKey,
+                                                       })),
+                                               },
+                                               EnvironmentId: 
m.addDefaultEnv(),
+                                       },
+                               }
+
+                               keyedID := fmt.Sprintf("%v_keyed%v", 
edgeID(edge.Edge), i)
+                               keyed := &pb.PTransform{
+                                       UniqueName: keyedID,
+                                       Spec: &pb.FunctionSpec{
+                                               Urn:     URNParDo,
+                                               Payload: 
protox.MustEncode(payload),
+                                       },
+                                       Inputs:  map[string]string{"i0": 
nodeID(in.From)},
+                                       Outputs: map[string]string{"i0": out},
+                               }
+                               m.transforms[keyedID] = keyed
+
+                               // Fixup input map
+                               inputs[fmt.Sprintf("i%v", i)] = out
+
+                               si[fmt.Sprintf("i%v", i)] = &pb.SideInput{
+                                       AccessPattern: &pb.FunctionSpec{
+                                               Urn: URNMultimapSideInput,
+                                       },
+                                       ViewFn: &pb.SdkFunctionSpec{
+                                               Spec: &pb.FunctionSpec{
+                                                       Urn: "foo",
+                                               },
+                                               EnvironmentId: 
m.addDefaultEnv(),
+                                       },
+                                       WindowMappingFn: &pb.SdkFunctionSpec{
+                                               Spec: &pb.FunctionSpec{
+                                                       Urn: "bar",
+                                               },
+                                               EnvironmentId: 
m.addDefaultEnv(),
+                                       },
+                               }
+
+                       case graph.Map, graph.MultiMap:
+                               panic("NYI")
+
+                       default:
+                               panic(fmt.Sprintf("unexpected input kind: %v", 
edge))
+                       }
+               }
+
+               payload := &pb.ParDoPayload{
+                       DoFn: &pb.SdkFunctionSpec{
+                               Spec: &pb.FunctionSpec{
+                                       Urn:     URNJavaDoFn,
+                                       Payload: 
[]byte(mustEncodeMultiEdgeBase64(edge.Edge)),
+                               },
+                               EnvironmentId: m.addDefaultEnv(),
+                       },
+                       SideInputs: si,
+               }
+               spec = &pb.FunctionSpec{Urn: URNParDo, Payload: 
protox.MustEncode(payload)}
+
+       case graph.Combine:
+               payload := &pb.ParDoPayload{
+                       DoFn: &pb.SdkFunctionSpec{
+                               Spec: &pb.FunctionSpec{
+                                       Urn:     URNJavaDoFn,
+                                       Payload: 
[]byte(mustEncodeMultiEdgeBase64(edge.Edge)),
+                               },
+                               EnvironmentId: m.addDefaultEnv(),
+                       },
+               }
+               spec = &pb.FunctionSpec{Urn: URNParDo, Payload: 
protox.MustEncode(payload)}
+
+       case graph.Flatten:
+               spec = &pb.FunctionSpec{Urn: URNFlatten}
+
+       case graph.CoGBK:
+               spec = &pb.FunctionSpec{Urn: URNGBK}
+
+       case graph.WindowInto:
+               payload := &pb.WindowIntoPayload{
+                       WindowFn: &pb.SdkFunctionSpec{
+                               Spec: makeWindowFn(edge.Edge.WindowFn),
+                       },
+               }
+               spec = &pb.FunctionSpec{Urn: URNWindow, Payload: 
protox.MustEncode(payload)}
+
+       case graph.External:
+               spec = &pb.FunctionSpec{Urn: edge.Edge.Payload.URN, Payload: 
edge.Edge.Payload.Data}
+
+       default:
+               panic(fmt.Sprintf("Unexpected opcode: %v", edge.Edge.Op))
+       }
+
        transform := &pb.PTransform{
                UniqueName: edge.Name,
-               Spec:       m.makePayload(edge.Edge),
+               Spec:       spec,
                Inputs:     inputs,
                Outputs:    outputs,
        }
-
        m.transforms[id] = transform
        return id
 }
@@ -292,7 +412,7 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
        gbkID := fmt.Sprintf("%v_gbk", id)
        gbk := &pb.PTransform{
                UniqueName: gbkID,
-               Spec:       m.makePayload(edge.Edge),
+               Spec:       &pb.FunctionSpec{Urn: URNGBK},
                Inputs:     map[string]string{"i0": out},
                Outputs:    map[string]string{"i0": gbkOut},
        }
@@ -337,76 +457,6 @@ func (m *marshaller) expandCoGBK(edge NamedEdge) string {
        return id
 }
 
-func (m *marshaller) makePayload(edge *graph.MultiEdge) *pb.FunctionSpec {
-       switch edge.Op {
-       case graph.Impulse:
-               // TODO(herohde) 7/18/2018: Encode data?
-               return &pb.FunctionSpec{Urn: URNImpulse}
-
-       case graph.ParDo:
-               si := make(map[string]*pb.SideInput)
-               for i, in := range edge.Input {
-                       switch in.Kind {
-                       case graph.Main:
-                               // ignore: not a side input
-                       case graph.Singleton, graph.Slice, graph.Iter, 
graph.ReIter:
-                               si[fmt.Sprintf("i%v", i)] = &pb.SideInput{
-                                       AccessPattern: &pb.FunctionSpec{Urn: 
URNIterableSideInput},
-                                       // TODO(herohde) 7/16/2018: side input 
data
-                               }
-                       case graph.Map, graph.MultiMap:
-                               panic("NYI")
-                       default:
-                               panic(fmt.Sprintf("unexpected input kind: %v", 
edge))
-                       }
-               }
-
-               payload := &pb.ParDoPayload{
-                       DoFn: &pb.SdkFunctionSpec{
-                               Spec: &pb.FunctionSpec{
-                                       Urn:     URNJavaDoFn,
-                                       Payload: 
[]byte(mustEncodeMultiEdgeBase64(edge)),
-                               },
-                               EnvironmentId: m.addDefaultEnv(),
-                       },
-                       SideInputs: si,
-               }
-               return &pb.FunctionSpec{Urn: URNParDo, Payload: 
protox.MustEncode(payload)}
-
-       case graph.Combine:
-               payload := &pb.ParDoPayload{
-                       DoFn: &pb.SdkFunctionSpec{
-                               Spec: &pb.FunctionSpec{
-                                       Urn:     URNJavaDoFn,
-                                       Payload: 
[]byte(mustEncodeMultiEdgeBase64(edge)),
-                               },
-                               EnvironmentId: m.addDefaultEnv(),
-                       },
-               }
-               return &pb.FunctionSpec{Urn: URNParDo, Payload: 
protox.MustEncode(payload)}
-
-       case graph.Flatten:
-               return &pb.FunctionSpec{Urn: URNFlatten}
-
-       case graph.CoGBK:
-               return &pb.FunctionSpec{Urn: URNGBK}
-
-       case graph.WindowInto:
-               payload := &pb.WindowIntoPayload{
-                       WindowFn: &pb.SdkFunctionSpec{
-                               Spec: makeWindowFn(edge.WindowFn),
-                       },
-               }
-               return &pb.FunctionSpec{Urn: URNWindow, Payload: 
protox.MustEncode(payload)}
-
-       case graph.External:
-               return &pb.FunctionSpec{Urn: edge.Payload.URN, Payload: 
edge.Payload.Data}
-
-       default:
-               panic(fmt.Sprintf("Unexpected opcode: %v", edge.Op))
-       }
-}
-
 func (m *marshaller) addNode(n *graph.Node) string {
        id := nodeID(n)
        if _, exists := m.pcollections[id]; exists {
@@ -544,6 +594,12 @@ func mustEncodeMultiEdgeBase64(edge *graph.MultiEdge) 
string {
        })
 }
 
+// makeBytesKeyedCoder returns KV<[]byte,A,> for any coder,
+// even if the coder is already a KV coder.
+func makeBytesKeyedCoder(c *coder.Coder) *coder.Coder {
+       return coder.NewKV([]*coder.Coder{coder.NewBytes(), c})
+}
+
 func edgeID(edge *graph.MultiEdge) string {
        return fmt.Sprintf("e%v", edge.ID())
 }
diff --git a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go 
b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
index b986d178643..856c0defda2 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/datamgr.go
@@ -25,7 +25,6 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
        "github.com/apache/beam/sdks/go/pkg/beam/log"
        pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
-       "google.golang.org/grpc"
 )
 
 const (
@@ -33,38 +32,72 @@ const (
        bufElements = 20       // Number of chunks buffered per reader.
 )
 
-// This is a reduced version of the full gRPC interface to help with testing.
-// TODO(wcn): need a compile-time assertion to make sure this stays synced 
with what's
-// in pb.BeamFnData_DataClient
-type dataClient interface {
-       Send(*pb.Elements) error
-       Recv() (*pb.Elements, error)
+// ScopedDataManager scopes the global gRPC data manager to a single 
instruction.
+// The indirection makes it easier to control access.
+type ScopedDataManager struct {
+       mgr    *DataChannelManager
+       instID string
+
+       // TODO(herohde) 7/20/2018: capture and force close open reads/writes. 
However,
+       // we would need the underlying Close to be idempotent or a separate 
method.
+       closed bool
+       mu     sync.Mutex
 }
 
-// DataManager manages data channels to the FnHarness. A fixed number of 
channels
-// are generally used, each managing multiple logical byte streams.
-type DataManager struct {
-       ports map[string]*DataChannel
-       mu    sync.Mutex // guards the ports map
+// NewScopedDataManager returns a ScopedDataManager for the given instruction.
+func NewScopedDataManager(mgr *DataChannelManager, instID string) 
*ScopedDataManager {
+       return &ScopedDataManager{mgr: mgr, instID: instID}
 }
 
-func (m *DataManager) OpenRead(ctx context.Context, id exec.StreamID) 
(io.ReadCloser, error) {
-       ch, err := m.open(ctx, id.Port)
+func (s *ScopedDataManager) OpenRead(ctx context.Context, id exec.StreamID) 
(io.ReadCloser, error) {
+       ch, err := s.open(ctx, id.Port)
        if err != nil {
                return nil, err
        }
-       return ch.OpenRead(ctx, id)
+       return ch.OpenRead(ctx, id.Target, s.instID), nil
 }
 
-func (m *DataManager) OpenWrite(ctx context.Context, id exec.StreamID) 
(io.WriteCloser, error) {
-       ch, err := m.open(ctx, id.Port)
+func (s *ScopedDataManager) OpenWrite(ctx context.Context, id exec.StreamID) 
(io.WriteCloser, error) {
+       ch, err := s.open(ctx, id.Port)
        if err != nil {
                return nil, err
        }
-       return ch.OpenWrite(ctx, id)
+       return ch.OpenWrite(ctx, id.Target, s.instID), nil
 }
 
-func (m *DataManager) open(ctx context.Context, port exec.Port) (*DataChannel, 
error) {
+func (s *ScopedDataManager) open(ctx context.Context, port exec.Port) 
(*DataChannel, error) {
+       s.mu.Lock()
+       if s.closed {
+               s.mu.Unlock()
+               return nil, fmt.Errorf("instruction %v no longer processing", 
s.instID)
+       }
+       local := s.mgr
+       s.mu.Unlock()
+
+       return local.Open(ctx, port) // don't hold lock over potentially slow 
operation
+}
+
+func (s *ScopedDataManager) Close() error {
+       s.mu.Lock()
+       s.closed = true
+       s.mgr = nil
+       s.mu.Unlock()
+       return nil
+}
+
+// DataChannelManager manages data channels over the Data API. A fixed number 
of channels
+// are generally used, each managing multiple logical byte streams. 
Thread-safe.
+type DataChannelManager struct {
+       ports map[string]*DataChannel
+       mu    sync.Mutex // guards the ports map
+}
+
+// Open opens a R/W DataChannel over the given port.
+func (m *DataChannelManager) Open(ctx context.Context, port exec.Port) 
(*DataChannel, error) {
+       if port.URL == "" {
+               panic("empty port")
+       }
+
        m.mu.Lock()
        defer m.mu.Unlock()
 
@@ -75,7 +108,7 @@ func (m *DataManager) open(ctx context.Context, port 
exec.Port) (*DataChannel, e
                return con, nil
        }
 
-       ch, err := NewDataChannel(ctx, port)
+       ch, err := newDataChannel(ctx, port)
        if err != nil {
                return nil, err
        }
@@ -83,20 +116,35 @@ func (m *DataManager) open(ctx context.Context, port 
exec.Port) (*DataChannel, e
        return ch, nil
 }
 
-// DataChannel manages a single grpc connection to the FnHarness.
+// clientID identifies a client of a connected channel.
+type clientID struct {
+       target exec.Target
+       instID string
+}
+
+// This is a reduced version of the full gRPC interface to help with testing.
+// TODO(wcn): need a compile-time assertion to make sure this stays synced 
with what's
+// in pb.BeamFnData_DataClient
+type dataClient interface {
+       Send(*pb.Elements) error
+       Recv() (*pb.Elements, error)
+}
+
+// DataChannel manages a single multiplexed gRPC connection over the Data API. 
Data is
+// pushed over the channel, so data for a reader may arrive before the reader 
connects.
+// Thread-safe.
 type DataChannel struct {
-       cc     *grpc.ClientConn
+       id     string
        client dataClient
-       port   exec.Port
 
-       writers map[string]*dataWriter
-       readers map[string]*dataReader
+       writers map[clientID]*dataWriter
+       readers map[clientID]*dataReader
        // TODO: early/late closed, bad instructions, finer locks, reconnect?
 
        mu sync.Mutex // guards both the readers and writers maps.
 }
 
-func NewDataChannel(ctx context.Context, port exec.Port) (*DataChannel, error) 
{
+func newDataChannel(ctx context.Context, port exec.Port) (*DataChannel, error) 
{
        cc, err := dial(ctx, port.URL, 15*time.Second)
        if err != nil {
                return nil, fmt.Errorf("failed to connect: %v", err)
@@ -106,41 +154,40 @@ func NewDataChannel(ctx context.Context, port exec.Port) 
(*DataChannel, error) {
                cc.Close()
                return nil, fmt.Errorf("failed to connect to data service: %v", 
err)
        }
-       return makeDataChannel(ctx, cc, client, port)
+       return makeDataChannel(ctx, port.URL, client), nil
 }
 
-func makeDataChannel(ctx context.Context, cc *grpc.ClientConn, client 
dataClient, port exec.Port) (*DataChannel, error) {
+func makeDataChannel(ctx context.Context, id string, client dataClient) 
*DataChannel {
        ret := &DataChannel{
-               cc:      cc,
+               id:      id,
                client:  client,
-               port:    port,
-               writers: make(map[string]*dataWriter),
-               readers: make(map[string]*dataReader),
+               writers: make(map[clientID]*dataWriter),
+               readers: make(map[clientID]*dataReader),
        }
        go ret.read(ctx)
 
-       return ret, nil
+       return ret
 }
 
-func (c *DataChannel) OpenRead(ctx context.Context, id exec.StreamID) 
(io.ReadCloser, error) {
-       return c.makeReader(ctx, id), nil
+func (c *DataChannel) OpenRead(ctx context.Context, target exec.Target, instID 
string) io.ReadCloser {
+       return c.makeReader(ctx, clientID{target: target, instID: instID})
 }
 
-func (c *DataChannel) OpenWrite(ctx context.Context, id exec.StreamID) 
(io.WriteCloser, error) {
-       return c.makeWriter(ctx, id), nil
+func (c *DataChannel) OpenWrite(ctx context.Context, target exec.Target, 
instID string) io.WriteCloser {
+       return c.makeWriter(ctx, clientID{target: target, instID: instID})
 }
 
 func (c *DataChannel) read(ctx context.Context) {
-       cache := make(map[string]*dataReader)
+       cache := make(map[clientID]*dataReader)
        for {
                msg, err := c.client.Recv()
                if err != nil {
                        if err == io.EOF {
                                // TODO(herohde) 10/12/2017: can this happen 
before shutdown? Reconnect?
-                               log.Warnf(ctx, "DataChannel %v closed", c.port)
+                               log.Warnf(ctx, "DataChannel %v closed", c.id)
                                return
                        }
-                       panic(fmt.Errorf("channel %v bad: %v", c.port, err))
+                       panic(fmt.Errorf("channel %v bad: %v", c.id, err))
                }
 
                recordStreamReceive(msg)
@@ -150,17 +197,16 @@ func (c *DataChannel) read(ctx context.Context) {
                // to reduce lock contention.
 
                for _, elm := range msg.GetData() {
-                       id := exec.StreamID{Port: c.port, Target: 
exec.Target{ID: elm.GetTarget().PrimitiveTransformReference, Name: 
elm.GetTarget().GetName()}, InstID: elm.GetInstructionReference()}
-                       sid := id.String()
+                       id := clientID{target: exec.Target{ID: 
elm.GetTarget().PrimitiveTransformReference, Name: elm.GetTarget().GetName()}, 
instID: elm.GetInstructionReference()}
 
                        // log.Printf("Chan read (%v): %v\n", sid, 
elm.GetData())
 
                        var r *dataReader
-                       if local, ok := cache[sid]; ok {
+                       if local, ok := cache[id]; ok {
                                r = local
                        } else {
                                r = c.makeReader(ctx, id)
-                               cache[sid] = r
+                               cache[id] = r
                        }
 
                        if r.completed {
@@ -178,14 +224,14 @@ func (c *DataChannel) read(ctx context.Context) {
                                // for it again. We have to be careful not to 
remove the real
                                // one, because readers may be initialized 
after we've seen
                                // the full stream.
-                               delete(cache, sid)
+                               delete(cache, id)
                                continue
                        }
 
                        // This send is deliberately blocking, if we exceed the 
buffering for
                        // a reader. We can't buffer the entire main input, if 
some user code
                        // is slow (or gets stuck). If the local side closes, 
the reader
-                       // will be marked as completed and further remote data 
will be ingored.
+                       // will be marked as completed and further remote data 
will be ignored.
                        select {
                        case r.buf <- elm.GetData():
                        case <-r.done:
@@ -195,42 +241,40 @@ func (c *DataChannel) read(ctx context.Context) {
        }
 }
 
-func (c *DataChannel) makeReader(ctx context.Context, id exec.StreamID) 
*dataReader {
+func (c *DataChannel) makeReader(ctx context.Context, id clientID) *dataReader 
{
        c.mu.Lock()
        defer c.mu.Unlock()
 
-       sid := id.String()
-       if r, ok := c.readers[sid]; ok {
+       if r, ok := c.readers[id]; ok {
                return r
        }
 
-       r := &dataReader{id: sid, buf: make(chan []byte, bufElements), done: 
make(chan bool, 1), channel: c}
-       c.readers[sid] = r
+       r := &dataReader{id: id, buf: make(chan []byte, bufElements), done: 
make(chan bool, 1), channel: c}
+       c.readers[id] = r
        return r
 }
 
-func (c *DataChannel) removeReader(id string) {
+func (c *DataChannel) removeReader(id clientID) {
        c.mu.Lock()
        delete(c.readers, id)
        c.mu.Unlock()
 }
 
-func (c *DataChannel) makeWriter(ctx context.Context, id exec.StreamID) 
*dataWriter {
+func (c *DataChannel) makeWriter(ctx context.Context, id clientID) *dataWriter 
{
        c.mu.Lock()
        defer c.mu.Unlock()
 
-       sid := id.String()
-       if w, ok := c.writers[sid]; ok {
+       if w, ok := c.writers[id]; ok {
                return w
        }
 
        w := &dataWriter{ch: c, id: id}
-       c.writers[sid] = w
+       c.writers[id] = w
        return w
 }
 
 type dataReader struct {
-       id        string
+       id        clientID
        buf       chan []byte
        done      chan bool
        cur       []byte
@@ -264,10 +308,13 @@ func (r *dataReader) Read(buf []byte) (int, error) {
        return n, nil
 }
 
+// TODO(herohde) 7/20/2018: we should probably either not be tracking writers 
or
+// make dataWriter threadsafe. Either case is likely a corruption generator.
+
 type dataWriter struct {
        buf []byte
 
-       id exec.StreamID
+       id clientID
        ch *DataChannel
 }
 
@@ -281,12 +328,12 @@ func (w *dataWriter) Close() error {
        // Now acquire the locks since we're sending.
        w.ch.mu.Lock()
        defer w.ch.mu.Unlock()
-       delete(w.ch.writers, w.id.String())
-       target := &pb.Target{PrimitiveTransformReference: w.id.Target.ID, Name: 
w.id.Target.Name}
+       delete(w.ch.writers, w.id)
+       target := &pb.Target{PrimitiveTransformReference: w.id.target.ID, Name: 
w.id.target.Name}
        msg := &pb.Elements{
                Data: []*pb.Elements_Data{
                        {
-                               InstructionReference: w.id.InstID,
+                               InstructionReference: w.id.instID,
                                Target:               target,
                                // Empty data == sentinel
                        },
@@ -307,11 +354,11 @@ func (w *dataWriter) Flush() error {
                return nil
        }
 
-       target := &pb.Target{PrimitiveTransformReference: w.id.Target.ID, Name: 
w.id.Target.Name}
+       target := &pb.Target{PrimitiveTransformReference: w.id.target.ID, Name: 
w.id.target.Name}
        msg := &pb.Elements{
                Data: []*pb.Elements_Data{
                        {
-                               InstructionReference: w.id.InstID,
+                               InstructionReference: w.id.instID,
                                Target:               target,
                                Data:                 w.buf,
                        },
diff --git a/sdks/go/pkg/beam/core/runtime/harness/datamgr_test.go 
b/sdks/go/pkg/beam/core/runtime/harness/datamgr_test.go
index 1305efd1f48..2a4b2d7fdd2 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/datamgr_test.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/datamgr_test.go
@@ -80,19 +80,16 @@ func TestDataChannelTerminateOnClose(t *testing.T) {
        log.SetOutput(ioutil.Discard)
        done := make(chan bool, 1)
        client := &fakeClient{t: t, done: done}
-       c, err := makeDataChannel(context.Background(), nil, client, 
exec.Port{})
-       if err != nil {
-               t.Errorf("Unexpected error in makeDataChannel: %v", err)
-       }
+       c := makeDataChannel(context.Background(), "id", client)
 
-       r, err := c.OpenRead(context.Background(), exec.StreamID{Port: 
exec.Port{URL: ""}, Target: exec.Target{ID: "ptr", Name: "instruction_name"}, 
InstID: "inst_ref"})
+       r := c.OpenRead(context.Background(), exec.Target{ID: "ptr", Name: 
"instruction_name"}, "inst_ref")
        var read = make([]byte, 4)
 
        // We don't read up all the buffered data, but immediately close the 
reader.
        // Previously, since nothing was consuming the incoming gRPC data, the 
whole
        // data channel would get stuck, and the client.Recv() call was 
eventually
        // no longer called.
-       _, err = r.Read(read)
+       _, err := r.Read(read)
        if err != nil {
                t.Errorf("Unexpected error from read: %v", err)
        }
diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go 
b/sdks/go/pkg/beam/core/runtime/harness/harness.go
index cbda27c4275..08ee7dc3d72 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/harness.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go
@@ -83,7 +83,8 @@ func Main(ctx context.Context, loggingEndpoint, 
controlEndpoint string) error {
        ctrl := &control{
                plans:  make(map[string]*exec.Plan),
                active: make(map[string]*exec.Plan),
-               data:   &DataManager{},
+               data:   &DataChannelManager{},
+               state:  &StateChannelManager{},
        }
 
        // gRPC requires all readers of a stream be the same goroutine, so this 
goroutine
@@ -138,7 +139,8 @@ type control struct {
        active map[string]*exec.Plan // protected by mu
        mu     sync.Mutex
 
-       data *DataManager
+       data  *DataChannelManager
+       state *StateChannelManager
 }
 
 func (c *control) handleInstruction(ctx context.Context, req 
*fnpb.InstructionRequest) *fnpb.InstructionResponse {
@@ -190,7 +192,12 @@ func (c *control) handleInstruction(ctx context.Context, 
req *fnpb.InstructionRe
                        return fail(id, "execution plan for %v not found", ref)
                }
 
-               err := plan.Execute(ctx, id, c.data)
+               data := NewScopedDataManager(c.data, id)
+               side := NewScopedSideInputReader(c.state, id)
+               err := plan.Execute(ctx, id, exec.DataContext{Data: data, 
SideInput: side})
+               data.Close()
+               side.Close()
+
                m := plan.Metrics()
                // Move the plan back to the candidate state
                c.mu.Lock()
diff --git a/sdks/go/pkg/beam/core/runtime/harness/session.go 
b/sdks/go/pkg/beam/core/runtime/harness/session.go
index 8854a94a63c..88807ba353d 100644
--- a/sdks/go/pkg/beam/core/runtime/harness/session.go
+++ b/sdks/go/pkg/beam/core/runtime/harness/session.go
@@ -170,7 +170,7 @@ func recordHeader() error {
                        Msg: &session.Entry_Header{
                                Header: &session.Header{
                                        Version:   "0.0.1",
-                                       MaxMsgLen: 4000000, // TODO(wcn): get 
from DataManager.
+                                       MaxMsgLen: 4000000, // TODO(wcn): get 
from DataChannelManager.
                                },
                        },
                })
diff --git a/sdks/go/pkg/beam/core/runtime/harness/statemgr.go 
b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
new file mode 100644
index 00000000000..6445618b66b
--- /dev/null
+++ b/sdks/go/pkg/beam/core/runtime/harness/statemgr.go
@@ -0,0 +1,311 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package harness
+
+import (
+       "context"
+       "fmt"
+       "io"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
+       "github.com/apache/beam/sdks/go/pkg/beam/log"
+       pb "github.com/apache/beam/sdks/go/pkg/beam/model/fnexecution_v1"
+       "github.com/golang/protobuf/proto"
+       "github.com/pkg/errors"
+)
+
+// ScopedSideInputReader scopes the global gRPC state manager to a single 
instruction
+// for side input use. The indirection makes it easier to control access.
+type ScopedSideInputReader struct {
+       mgr    *StateChannelManager
+       instID string
+
+       opened []io.Closer // track open readers to force close all
+       closed bool
+       mu     sync.Mutex
+}
+
+// NewScopedSideInputReader returns a ScopedSideInputReader for the given 
instruction.
+func NewScopedSideInputReader(mgr *StateChannelManager, instID string) 
*ScopedSideInputReader {
+       return &ScopedSideInputReader{mgr: mgr, instID: instID}
+}
+
+func (s *ScopedSideInputReader) Open(ctx context.Context, id exec.StreamID, 
key, w []byte) (io.ReadCloser, error) {
+       ch, err := s.open(ctx, id.Port)
+       if err != nil {
+               return nil, err
+       }
+
+       s.mu.Lock()
+       if s.closed {
+               s.mu.Unlock()
+               return nil, fmt.Errorf("instruction %v no longer processing", 
s.instID)
+       }
+       ret := newSideInputReader(ch, id.Target, s.instID, key, w)
+       s.opened = append(s.opened, ret)
+       s.mu.Unlock()
+       return ret, nil
+}
+
+func (s *ScopedSideInputReader) open(ctx context.Context, port exec.Port) 
(*StateChannel, error) {
+       s.mu.Lock()
+       if s.closed {
+               s.mu.Unlock()
+               return nil, fmt.Errorf("instruction %v no longer processing", 
s.instID)
+       }
+       local := s.mgr
+       s.mu.Unlock()
+
+       return local.Open(ctx, port) // don't hold lock over potentially slow 
operation
+}
+
+func (s *ScopedSideInputReader) Close() error {
+       s.mu.Lock()
+       s.closed = true
+       s.mgr = nil
+       for _, r := range s.opened {
+               r.Close() // force close all opened readers
+       }
+       s.opened = nil
+       s.mu.Unlock()
+       return nil
+}
+
+type sideInputReader struct {
+       instID string
+       key    *pb.StateKey
+
+       token []byte
+       buf   []byte
+       eof   bool
+
+       ch     *StateChannel
+       closed bool
+       mu     sync.Mutex
+}
+
+func newSideInputReader(ch *StateChannel, target exec.Target, instID string, 
k, w []byte) *sideInputReader {
+       key := &pb.StateKey{
+               Type: &pb.StateKey_MultimapSideInput_{
+                       MultimapSideInput: &pb.StateKey_MultimapSideInput{
+                               PtransformId: target.ID,
+                               SideInputId:  target.Name,
+                               Window:       w,
+                               Key:          k,
+                       },
+               },
+       }
+       return &sideInputReader{
+               instID: instID,
+               key:    key,
+               ch:     ch,
+       }
+}
+
+func (r *sideInputReader) Read(buf []byte) (int, error) {
+       if r.buf == nil {
+               if r.eof {
+                       return 0, io.EOF
+               }
+
+               // Buffer empty. Get next segment.
+
+               r.mu.Lock()
+               if r.closed {
+                       r.mu.Unlock()
+                       return 0, fmt.Errorf("side input closed")
+               }
+               local := r.ch
+               r.mu.Unlock()
+
+               req := &pb.StateRequest{
+                       // Id: set by channel
+                       InstructionReference: r.instID,
+                       StateKey:             r.key,
+                       Request: &pb.StateRequest_Get{
+                               Get: &pb.StateGetRequest{
+                                       ContinuationToken: r.token,
+                               },
+                       },
+               }
+               resp, err := local.Send(req)
+               if err != nil {
+                       return 0, err
+               }
+               get := resp.GetGet()
+               r.token = get.ContinuationToken
+               r.buf = get.Data
+
+               if r.token == nil {
+                       r.eof = true // no token == this is the last segment.
+               }
+       }
+
+       n := copy(buf, r.buf)
+
+       if len(r.buf) == n {
+               r.buf = nil
+       } else {
+               r.buf = r.buf[n:]
+       }
+       return n, nil
+}
+
+func (r *sideInputReader) Close() error {
+       r.mu.Lock()
+       r.closed = true
+       r.ch = nil
+       r.mu.Unlock()
+       return nil
+}
+
+// StateChannelManager manages data channels over the State API. A fixed 
number of channels
+// are generally used, each managing multiple logical byte streams. 
Thread-safe.
+type StateChannelManager struct {
+       ports map[string]*StateChannel
+       mu    sync.Mutex
+}
+
+// Open opens a R/W StateChannel over the given port.
+func (m *StateChannelManager) Open(ctx context.Context, port exec.Port) 
(*StateChannel, error) {
+       m.mu.Lock()
+       defer m.mu.Unlock()
+
+       if m.ports == nil {
+               m.ports = make(map[string]*StateChannel)
+       }
+       if con, ok := m.ports[port.URL]; ok {
+               return con, nil
+       }
+
+       ch, err := newStateChannel(ctx, port)
+       if err != nil {
+               return nil, err
+       }
+       m.ports[port.URL] = ch
+       return ch, nil
+}
+
+// StateChannel manages state transactions over a single gRPC connection.
+// It does not need to track readers and writers as carefully as the
+// DataChannel, because the state protocol is request-based.
+type StateChannel struct {
+       id     string
+       client pb.BeamFnState_StateClient
+
+       requests      chan *pb.StateRequest
+       nextRequestNo int32
+
+       responses map[string]chan<- *pb.StateResponse
+       mu        sync.Mutex
+}
+
+func newStateChannel(ctx context.Context, port exec.Port) (*StateChannel, 
error) {
+       cc, err := dial(ctx, port.URL, 15*time.Second)
+       if err != nil {
+               return nil, fmt.Errorf("failed to connect: %v", err)
+       }
+       client, err := pb.NewBeamFnStateClient(cc).State(ctx)
+       if err != nil {
+               cc.Close()
+               return nil, fmt.Errorf("failed to connect to data service: %v", 
err)
+       }
+
+       ret := &StateChannel{
+               id:        port.URL,
+               client:    client,
+               requests:  make(chan *pb.StateRequest, 10),
+               responses: make(map[string]chan<- *pb.StateResponse),
+       }
+       go ret.read(ctx)
+       go ret.write(ctx)
+
+       return ret, nil
+}
+
+func (c *StateChannel) read(ctx context.Context) {
+       for {
+               msg, err := c.client.Recv()
+               if err != nil {
+                       if err == io.EOF {
+                               // TODO(herohde) 10/12/2017: can this happen 
before shutdown? Reconnect?
+                               log.Warnf(ctx, "StateChannel %v closed", c.id)
+                               return
+                       }
+                       panic(fmt.Errorf("state channel %v bad: %v", c.id, err))
+               }
+
+               c.mu.Lock()
+               ch, ok := c.responses[msg.Id]
+               delete(c.responses, msg.Id)
+               c.mu.Unlock()
+               if !ok {
+                       // This can happen if Send returns an error that write 
handles, but
+                       // the message was actually sent.
+                       log.Errorf(ctx, "no consumer for state response: %v", 
proto.MarshalTextString(msg))
+                       continue
+               }
+
+               select {
+               case ch <- msg:
+                       // ok
+               default:
+                       panic(fmt.Sprintf("failed to consume state response: 
%v", proto.MarshalTextString(msg)))
+               }
+       }
+}
+
+func (c *StateChannel) write(ctx context.Context) {
+       for req := range c.requests {
+               err := c.client.Send(req)
+               if err == nil {
+                       continue // ok
+               }
+
+               // Failed to send. Return error.
+               c.mu.Lock()
+               ch, ok := c.responses[req.Id]
+               delete(c.responses, req.Id)
+               c.mu.Unlock()
+
+               if ok {
+                       ch <- &pb.StateResponse{Id: req.Id, Error: 
fmt.Sprintf("failed to send: %v", err)}
+               } // else ignore: already received response due to race
+       }
+}
+
+// Send sends a state request and returns the response.
+func (c *StateChannel) Send(req *pb.StateRequest) (*pb.StateResponse, error) {
+       id := fmt.Sprintf("r%v", atomic.AddInt32(&c.nextRequestNo, 1))
+       req.Id = id
+
+       ch := make(chan *pb.StateResponse, 1)
+       c.mu.Lock()
+       c.responses[id] = ch
+       c.mu.Unlock()
+
+       c.requests <- req
+
+       // TODO(herohde) 7/21/2018: time out?
+       resp := <-ch
+       if resp.Error != "" {
+               return nil, errors.New(resp.Error)
+       }
+       return resp, nil
+}
diff --git a/sdks/go/pkg/beam/core/runtime/pipelinex/clone.go 
b/sdks/go/pkg/beam/core/runtime/pipelinex/clone.go
index 56dc20eae0f..444af56005b 100644
--- a/sdks/go/pkg/beam/core/runtime/pipelinex/clone.go
+++ b/sdks/go/pkg/beam/core/runtime/pipelinex/clone.go
@@ -54,3 +54,41 @@ func ShallowClonePTransform(t *pb.PTransform) *pb.PTransform 
{
        ret.Outputs, _ = reflectx.ShallowClone(t.Outputs).(map[string]string)
        return ret
 }
+
+// ShallowCloneParDoPayload makes a shallow copy of the given ParDoPayload.
+func ShallowCloneParDoPayload(p *pb.ParDoPayload) *pb.ParDoPayload {
+       if p == nil {
+               return nil
+       }
+
+       ret := &pb.ParDoPayload{
+               DoFn:               p.DoFn,
+               Splittable:         p.Splittable,
+               RestrictionCoderId: p.RestrictionCoderId,
+       }
+       ret.Parameters, _ = 
reflectx.ShallowClone(p.Parameters).([]*pb.Parameter)
+       ret.SideInputs, _ = 
reflectx.ShallowClone(p.SideInputs).(map[string]*pb.SideInput)
+       ret.StateSpecs, _ = 
reflectx.ShallowClone(p.StateSpecs).(map[string]*pb.StateSpec)
+       ret.TimerSpecs, _ = 
reflectx.ShallowClone(p.TimerSpecs).(map[string]*pb.TimerSpec)
+       return ret
+}
+
+// ShallowCloneSideInput makes a shallow copy of the given SideInput.
+func ShallowCloneSideInput(p *pb.SideInput) *pb.SideInput {
+       if p == nil {
+               return nil
+       }
+       var ret pb.SideInput
+       ret = *p
+       return &ret
+}
+
+// ShallowCloneFunctionSpec makes a shallow copy of the given FunctionSpec.
+func ShallowCloneFunctionSpec(p *pb.FunctionSpec) *pb.FunctionSpec {
+       if p == nil {
+               return nil
+       }
+       var ret pb.FunctionSpec
+       ret = *p
+       return &ret
+}
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
index 010e7fc1aa1..15e36d01b1f 100644
--- a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/execute.go
@@ -31,7 +31,7 @@ import (
 )
 
 // Execute submits a pipeline as a Dataflow job.
-func Execute(ctx context.Context, p *pb.Pipeline, opts *JobOptions, workerURL, 
modelURL, endpoint string, async bool) (string, error) {
+func Execute(ctx context.Context, raw *pb.Pipeline, opts *JobOptions, 
workerURL, modelURL, endpoint string, async bool) (string, error) {
        // (1) Upload Go binary to GCS.
 
        bin := opts.Worker
@@ -63,8 +63,10 @@ func Execute(ctx context.Context, p *pb.Pipeline, opts 
*JobOptions, workerURL, m
 
        // (2) Fixup and upload model to GCS
 
-       // TODO(herohde): fixup
-
+       p, err := Fixup(raw)
+       if err != nil {
+               return "", err
+       }
        log.Info(ctx, proto.MarshalTextString(p))
 
        if err := StageModel(ctx, opts.Project, modelURL, 
protox.MustEncode(p)); err != nil {
diff --git a/sdks/go/pkg/beam/runners/dataflow/dataflowlib/fixup.go 
b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/fixup.go
new file mode 100644
index 00000000000..4c2a101232c
--- /dev/null
+++ b/sdks/go/pkg/beam/runners/dataflow/dataflowlib/fixup.go
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package dataflowlib
+
+import (
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/pipelinex"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/util/protox"
+       pb "github.com/apache/beam/sdks/go/pkg/beam/model/pipeline_v1"
+       "github.com/golang/protobuf/proto"
+)
+
+// Fixup proto pipeline with Dataflow quirks.
+func Fixup(p *pb.Pipeline) (*pb.Pipeline, error) {
+       upd := make(map[string]*pb.PTransform)
+
+       for id, t := range p.GetComponents().GetTransforms() {
+               if t.GetSpec().GetUrn() != graphx.URNParDo {
+                       continue
+               }
+               var payload pb.ParDoPayload
+               if err := proto.Unmarshal(t.GetSpec().GetPayload(), &payload); 
err != nil {
+                       continue // ignore: unexpected payload
+               }
+               if len(payload.SideInputs) == 0 {
+                       continue
+               }
+
+               // ParDo w/ side input. Fixup URN.
+
+               fixedPayload := pipelinex.ShallowCloneParDoPayload(&payload)
+               for k, v := range payload.SideInputs {
+                       fixedV := pipelinex.ShallowCloneSideInput(v)
+                       fixedV.AccessPattern = 
pipelinex.ShallowCloneFunctionSpec(v.AccessPattern)
+                       fixedV.AccessPattern.Urn = 
"urn:beam:sideinput:materialization:multimap:0.1"
+
+                       fixedPayload.SideInputs[k] = fixedV
+               }
+               fixed := pipelinex.ShallowClonePTransform(t)
+               fixed.Spec = pipelinex.ShallowCloneFunctionSpec(t.Spec)
+               fixed.Spec.Payload = protox.MustEncode(fixedPayload)
+
+               upd[id] = fixed
+       }
+       return pipelinex.Update(p, &pb.Components{Transforms: upd})
+}
diff --git a/sdks/go/pkg/beam/runners/direct/buffer.go 
b/sdks/go/pkg/beam/runners/direct/buffer.go
index 81e7fa2969d..e8dcd85033d 100644
--- a/sdks/go/pkg/beam/runners/direct/buffer.go
+++ b/sdks/go/pkg/beam/runners/direct/buffer.go
@@ -20,10 +20,11 @@ import (
        "fmt"
 
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/exec"
+       "github.com/apache/beam/sdks/go/pkg/beam/core/typex"
        "github.com/apache/beam/sdks/go/pkg/beam/log"
 )
 
-// buffer buffers all input and notifies on FinishBundle. It is also a 
ReStream.
+// buffer buffers all input and notifies on FinishBundle. It is also a 
SideInputAdapter.
 // It is used as a guard for the wait node to buffer data used as side input.
 type buffer struct {
        uid    exec.UnitID
@@ -43,7 +44,7 @@ func (n *buffer) Up(ctx context.Context) error {
        return nil
 }
 
-func (n *buffer) StartBundle(ctx context.Context, id string, data 
exec.DataManager) error {
+func (n *buffer) StartBundle(ctx context.Context, id string, data 
exec.DataContext) error {
        n.buf = nil
        n.done = false
        return nil
@@ -63,11 +64,11 @@ func (n *buffer) Down(ctx context.Context) error {
        return nil
 }
 
-func (n *buffer) Open() exec.Stream {
+func (n *buffer) NewIterable(ctx context.Context, reader exec.SideInputReader, 
w typex.Window) (exec.ReStream, error) {
        if !n.done {
                panic(fmt.Sprintf("buffer[%v] incomplete: %v", n.uid, 
len(n.buf)))
        }
-       return &exec.FixedStream{Buf: n.buf}
+       return &exec.FixedReStream{Buf: n.buf}, nil
 }
 
 func (n *buffer) String() string {
@@ -83,7 +84,7 @@ type wait struct {
        next exec.Node
 
        instID string
-       mgr    exec.DataManager
+       mgr    exec.DataContext
 
        buf   []exec.FullValue
        ready int  // guards ready
@@ -131,7 +132,7 @@ func (w *wait) Up(ctx context.Context) error {
        return nil
 }
 
-func (w *wait) StartBundle(ctx context.Context, id string, data 
exec.DataManager) error {
+func (w *wait) StartBundle(ctx context.Context, id string, data 
exec.DataContext) error {
        return nil // done in notify
 }
 
diff --git a/sdks/go/pkg/beam/runners/direct/direct.go 
b/sdks/go/pkg/beam/runners/direct/direct.go
index 4d8b1d845d2..1d50aadcd40 100644
--- a/sdks/go/pkg/beam/runners/direct/direct.go
+++ b/sdks/go/pkg/beam/runners/direct/direct.go
@@ -49,7 +49,7 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
        }
        log.Info(ctx, plan)
 
-       if err = plan.Execute(ctx, "", nil); err != nil {
+       if err = plan.Execute(ctx, "", exec.DataContext{}); err != nil {
                plan.Down(ctx) // ignore any teardown errors
                return err
        }
diff --git a/sdks/go/pkg/beam/runners/direct/gbk.go 
b/sdks/go/pkg/beam/runners/direct/gbk.go
index 4090f032677..caa23b0c067 100644
--- a/sdks/go/pkg/beam/runners/direct/gbk.go
+++ b/sdks/go/pkg/beam/runners/direct/gbk.go
@@ -52,7 +52,7 @@ func (n *CoGBK) Up(ctx context.Context) error {
        return nil
 }
 
-func (n *CoGBK) StartBundle(ctx context.Context, id string, data 
exec.DataManager) error {
+func (n *CoGBK) StartBundle(ctx context.Context, id string, data 
exec.DataContext) error {
        return n.Out.StartBundle(ctx, id, data)
 }
 
@@ -123,7 +123,7 @@ func (n *Inject) Up(ctx context.Context) error {
        return nil
 }
 
-func (n *Inject) StartBundle(ctx context.Context, id string, data 
exec.DataManager) error {
+func (n *Inject) StartBundle(ctx context.Context, id string, data 
exec.DataContext) error {
        return n.Out.StartBundle(ctx, id, data)
 }
 
diff --git a/sdks/go/pkg/beam/runners/direct/impulse.go 
b/sdks/go/pkg/beam/runners/direct/impulse.go
index 02feb2a6f1e..b2904752a80 100644
--- a/sdks/go/pkg/beam/runners/direct/impulse.go
+++ b/sdks/go/pkg/beam/runners/direct/impulse.go
@@ -39,7 +39,7 @@ func (n *Impulse) Up(ctx context.Context) error {
        return nil
 }
 
-func (n *Impulse) StartBundle(ctx context.Context, id string, data 
exec.DataManager) error {
+func (n *Impulse) StartBundle(ctx context.Context, id string, data 
exec.DataContext) error {
        return n.Out.StartBundle(ctx, id, data)
 }
 
diff --git a/sdks/go/pkg/beam/runners/universal/universal.go 
b/sdks/go/pkg/beam/runners/universal/universal.go
index e3344692fa3..fe6866ae599 100644
--- a/sdks/go/pkg/beam/runners/universal/universal.go
+++ b/sdks/go/pkg/beam/runners/universal/universal.go
@@ -25,8 +25,10 @@ import (
        "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/graphx"
        // Importing to get the side effect of the remote execution hook. See 
init().
        _ "github.com/apache/beam/sdks/go/pkg/beam/core/runtime/harness/init"
+       "github.com/apache/beam/sdks/go/pkg/beam/log"
        "github.com/apache/beam/sdks/go/pkg/beam/options/jobopts"
        "github.com/apache/beam/sdks/go/pkg/beam/runners/universal/runnerlib"
+       "github.com/golang/protobuf/proto"
 )
 
 func init() {
@@ -50,6 +52,8 @@ func Execute(ctx context.Context, p *beam.Pipeline) error {
                return fmt.Errorf("failed to generate model pipeline: %v", err)
        }
 
+       log.Info(ctx, proto.MarshalTextString(pipeline))
+
        opt := &runnerlib.JobOptions{
                Name:        jobopts.GetJobName(),
                Experiments: jobopts.GetExperiments(),


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 136010)
    Time Spent: 2h 20m  (was: 2h 10m)

> Go SDK support for portable side input
> --------------------------------------
>
>                 Key: BEAM-3286
>                 URL: https://issues.apache.org/jira/browse/BEAM-3286
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-go
>            Reporter: Henning Rohde
>            Assignee: Henning Rohde
>            Priority: Major
>              Labels: portability
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to