diff --git a/sdks/go/pkg/beam/model/fnexecution_v1/beam_fn_api.pb.go 
new file mode 100644
index 0000000..9a31a57
--- /dev/null
+++ b/sdks/go/pkg/beam/model/fnexecution_v1/beam_fn_api.pb.go
@@ -0,0 +1,2729 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// source: beam_fn_api.proto
+Package fnexecution_v1 is a generated protocol buffer package.
+It is generated from these files:
+       beam_fn_api.proto
+       beam_provision_api.proto
+It has these top-level messages:
+       Target
+       RemoteGrpcPort
+       InstructionRequest
+       InstructionResponse
+       RegisterRequest
+       RegisterResponse
+       ProcessBundleDescriptor
+       ProcessBundleRequest
+       ProcessBundleResponse
+       ProcessBundleProgressRequest
+       Metrics
+       ProcessBundleProgressResponse
+       ProcessBundleSplitRequest
+       ElementCountRestriction
+       ElementCountSkipRestriction
+       PrimitiveTransformSplit
+       ProcessBundleSplitResponse
+       Elements
+       StateRequest
+       StateResponse
+       StateKey
+       StateGetRequest
+       StateGetResponse
+       StateAppendRequest
+       StateAppendResponse
+       StateClearRequest
+       StateClearResponse
+       LogEntry
+       LogControl
+       DockerContainer
+       GetProvisionInfoRequest
+       GetProvisionInfoResponse
+       ProvisionInfo
+       Resources
+package fnexecution_v1
+import proto ""
+import fmt "fmt"
+import math "math"
+import org_apache_beam_model_pipeline_v1 
+import org_apache_beam_model_pipeline_v11 
+import google_protobuf1 ""
+import (
+       context ""
+       grpc ""
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
+type LogEntry_Severity_Enum int32
+const (
+       LogEntry_Severity_UNSPECIFIED LogEntry_Severity_Enum = 0
+       // Trace level information, also the default log level unless
+       // another severity is specified.
+       LogEntry_Severity_TRACE LogEntry_Severity_Enum = 1
+       // Debugging information.
+       LogEntry_Severity_DEBUG LogEntry_Severity_Enum = 2
+       // Normal events.
+       LogEntry_Severity_INFO LogEntry_Severity_Enum = 3
+       // Normal but significant events, such as start up, shut down, or
+       // configuration.
+       LogEntry_Severity_NOTICE LogEntry_Severity_Enum = 4
+       // Warning events might cause problems.
+       LogEntry_Severity_WARN LogEntry_Severity_Enum = 5
+       // Error events are likely to cause problems.
+       LogEntry_Severity_ERROR LogEntry_Severity_Enum = 6
+       // Critical events cause severe problems or brief outages and may
+       // indicate that a person must take action.
+       LogEntry_Severity_CRITICAL LogEntry_Severity_Enum = 7
+var LogEntry_Severity_Enum_name = map[int32]string{
+       0: "UNSPECIFIED",
+       1: "TRACE",
+       2: "DEBUG",
+       3: "INFO",
+       4: "NOTICE",
+       5: "WARN",
+       6: "ERROR",
+       7: "CRITICAL",
+var LogEntry_Severity_Enum_value = map[string]int32{
+       "UNSPECIFIED": 0,
+       "TRACE":       1,
+       "DEBUG":       2,
+       "INFO":        3,
+       "NOTICE":      4,
+       "WARN":        5,
+       "ERROR":       6,
+       "CRITICAL":    7,
+func (x LogEntry_Severity_Enum) String() string {
+       return proto.EnumName(LogEntry_Severity_Enum_name, int32(x))
+func (LogEntry_Severity_Enum) EnumDescriptor() ([]byte, []int) {
+       return fileDescriptor0, []int{27, 1, 0}
+// A representation of an input or output definition on a primitive transform.
+// Stable
+type Target struct {
+       // (Required) The id of the PrimitiveTransform which is the target.
+       PrimitiveTransformReference string 
+       // (Required) The local name of an input or output defined on the 
+       // transform.
+       Name string `protobuf:"bytes,2,opt,name=name" json:"name,omitempty"`
+func (m *Target) Reset()                    { *m = Target{} }
+func (m *Target) String() string            { return 
proto.CompactTextString(m) }
+func (*Target) ProtoMessage()               {}
+func (*Target) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} 
+func (m *Target) GetPrimitiveTransformReference() string {
+       if m != nil {
+               return m.PrimitiveTransformReference
+       }
+       return ""
+func (m *Target) GetName() string {
+       if m != nil {
+               return m.Name
+       }
+       return ""
+// A repeated list of target definitions.
+type Target_List struct {
+       Target []*Target `protobuf:"bytes,1,rep,name=target" 
+func (m *Target_List) Reset()                    { *m = Target_List{} }
+func (m *Target_List) String() string            { return 
proto.CompactTextString(m) }
+func (*Target_List) ProtoMessage()               {}
+func (*Target_List) Descriptor() ([]byte, []int) { return fileDescriptor0, 
[]int{0, 0} }
+func (m *Target_List) GetTarget() []*Target {
+       if m != nil {
+               return m.Target
+       }
+       return nil
+// A descriptor for connecting to a remote port using the Beam Fn Data API.
+// Allows for communication between two environments (for example between the
+// runner and the SDK).
+// Stable
+type RemoteGrpcPort struct {
+       // (Required) An API descriptor which describes where to
+       // connect to including any authentication that is required.
+       ApiServiceDescriptor 
+func (m *RemoteGrpcPort) Reset()                    { *m = RemoteGrpcPort{} }
+func (m *RemoteGrpcPort) String() string            { return 
proto.CompactTextString(m) }
+func (*RemoteGrpcPort) ProtoMessage()               {}
+func (*RemoteGrpcPort) Descriptor() ([]byte, []int) { return fileDescriptor0, 
[]int{1} }
+func (m *RemoteGrpcPort) GetApiServiceDescriptor() 
*org_apache_beam_model_pipeline_v11.ApiServiceDescriptor {
+       if m != nil {
+               return m.ApiServiceDescriptor
+       }
+       return nil
+// A request sent by a runner which the SDK is asked to fulfill.
+// For any unsupported request type, an error should be returned with a
+// matching instruction id.
+// Stable
+type InstructionRequest struct {
+       // (Required) An unique identifier provided by the runner which 
+       // this requests execution. The InstructionResponse MUST have the 
matching id.
+       InstructionId string 
+       // (Required) A request that the SDK Harness needs to interpret.
+       //
+       // Types that are valid to be assigned to Request:
+       //      *InstructionRequest_Register
+       //      *InstructionRequest_ProcessBundle
+       //      *InstructionRequest_ProcessBundleProgress
+       //      *InstructionRequest_ProcessBundleSplit
+       Request isInstructionRequest_Request `protobuf_oneof:"request"`
+func (m *InstructionRequest) Reset()                    { *m = 
InstructionRequest{} }
+func (m *InstructionRequest) String() string            { return 
proto.CompactTextString(m) }
+func (*InstructionRequest) ProtoMessage()               {}
+func (*InstructionRequest) Descriptor() ([]byte, []int) { return 
fileDescriptor0, []int{2} }
+type isInstructionRequest_Request interface {
+       isInstructionRequest_Request()
+type InstructionRequest_Register struct {
+       Register *RegisterRequest 
+type InstructionRequest_ProcessBundle struct {
+       ProcessBundle *ProcessBundleRequest 
+type InstructionRequest_ProcessBundleProgress struct {
+       ProcessBundleProgress *ProcessBundleProgressRequest 
+type InstructionRequest_ProcessBundleSplit struct {
+       ProcessBundleSplit *ProcessBundleSplitRequest 
+func (*InstructionRequest_Register) isInstructionRequest_Request()             
+func (*InstructionRequest_ProcessBundle) isInstructionRequest_Request()        
+func (*InstructionRequest_ProcessBundleProgress) 
isInstructionRequest_Request() {}
+func (*InstructionRequest_ProcessBundleSplit) isInstructionRequest_Request()   
+func (m *InstructionRequest) GetRequest() isInstructionRequest_Request {
+       if m != nil {
+               return m.Request
+       }
+       return nil
+func (m *InstructionRequest) GetInstructionId() string {
+       if m != nil {
+               return m.InstructionId
+       }
+       return ""
+func (m *InstructionRequest) GetRegister() *RegisterRequest {
+       if x, ok := m.GetRequest().(*InstructionRequest_Register); ok {
+               return x.Register
+       }
+       return nil
+func (m *InstructionRequest) GetProcessBundle() *ProcessBundleRequest {
+       if x, ok := m.GetRequest().(*InstructionRequest_ProcessBundle); ok {
+               return x.ProcessBundle
+       }
+       return nil
+func (m *InstructionRequest) GetProcessBundleProgress() 
*ProcessBundleProgressRequest {
+       if x, ok := m.GetRequest().(*InstructionRequest_ProcessBundleProgress); 
ok {
+               return x.ProcessBundleProgress
+       }
+       return nil
+func (m *InstructionRequest) GetProcessBundleSplit() 
*ProcessBundleSplitRequest {
+       if x, ok := m.GetRequest().(*InstructionRequest_ProcessBundleSplit); ok 
+               return x.ProcessBundleSplit
+       }
+       return nil
+// XXX_OneofFuncs is for the internal use of the proto package.
+func (*InstructionRequest) XXX_OneofFuncs() (func(msg proto.Message, b 
*proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) 
(bool, error), func(msg proto.Message) (n int), []interface{}) {
+       return _InstructionRequest_OneofMarshaler, 
_InstructionRequest_OneofUnmarshaler, _InstructionRequest_OneofSizer, 
+               (*InstructionRequest_Register)(nil),
+               (*InstructionRequest_ProcessBundle)(nil),
+               (*InstructionRequest_ProcessBundleProgress)(nil),
+               (*InstructionRequest_ProcessBundleSplit)(nil),
+       }
+func _InstructionRequest_OneofMarshaler(msg proto.Message, b *proto.Buffer) 
error {
+       m := msg.(*InstructionRequest)
+       // request
+       switch x := m.Request.(type) {
+       case *InstructionRequest_Register:
+               b.EncodeVarint(1000<<3 | proto.WireBytes)
+               if err := b.EncodeMessage(x.Register); err != nil {
+                       return err
+               }
+       case *InstructionRequest_ProcessBundle:
+               b.EncodeVarint(1001<<3 | proto.WireBytes)
+               if err := b.EncodeMessage(x.ProcessBundle); err != nil {
+                       return err
+               }
+       case *InstructionRequest_ProcessBundleProgress:
+               b.EncodeVarint(1002<<3 | proto.WireBytes)
+               if err := b.EncodeMessage(x.ProcessBundleProgress); err != nil {
+                       return err
+               }
+       case *InstructionRequest_ProcessBundleSplit:
+               b.EncodeVarint(1003<<3 | proto.WireBytes)
+               if err := b.EncodeMessage(x.ProcessBundleSplit); err != nil {
+                       return err
+               }
+       case nil:
+       default:
+               return fmt.Errorf("InstructionRequest.Request has unexpected 
type %T", x)
+       }
+       return nil
+func _InstructionRequest_OneofUnmarshaler(msg proto.Message, tag, wire int, b 
*proto.Buffer) (bool, error) {
+       m := msg.(*InstructionRequest)
+       switch tag {
+       case 1000: // request.register
+               if wire != proto.WireBytes {
+                       return true, proto.ErrInternalBadWireType
+               }
+               msg := new(RegisterRequest)
+               err := b.DecodeMessage(msg)
+               m.Request = &InstructionRequest_Register{msg}
+               return true, err
+       case 1001: // request.process_bundle
+               if wire != proto.WireBytes {
+                       return true, proto.ErrInternalBadWireType
+               }
+               msg := new(ProcessBundleRequest)
+               err := b.DecodeMessage(msg)
+               m.Request = &InstructionRequest_ProcessBundle{msg}
+               return true, err
+       case 1002: // request.process_bundle_progress
+               if wire != proto.WireBytes {
+                       return true, proto.ErrInternalBadWireType
+               }
+               msg := new(ProcessBundleProgressRequest)
+               err := b.DecodeMessage(msg)
+               m.Request = &InstructionRequest_ProcessBundleProgress{msg}
+               return true, err
+       case 1003: // request.process_bundle_split
+               if wire != proto.WireBytes {
+                       return true, proto.ErrInternalBadWireType
+               }
+               msg := new(ProcessBundleSplitRequest)
+               err := b.DecodeMessage(msg)
+               m.Request = &InstructionRequest_ProcessBundleSplit{msg}
+               return true, err
+       default:
+               return false, nil
+       }
+func _InstructionRequest_OneofSizer(msg proto.Message) (n int) {
+       m := msg.(*InstructionRequest)
+       // request
+       switch x := m.Request.(type) {
+       case *InstructionRequest_Register:
+               s := proto.Size(x.Register)
+               n += proto.SizeVarint(1000<<3 | proto.WireBytes)
+               n += proto.SizeVarint(uint64(s))
+               n += s
+       case *InstructionRequest_ProcessBundle:
+               s := proto.Size(x.ProcessBundle)
+               n += proto.SizeVarint(1001<<3 | proto.WireBytes)
+               n += proto.SizeVarint(uint64(s))
+               n += s
+       case *InstructionRequest_ProcessBundleProgress:
+               s := proto.Size(x.ProcessBundleProgress)
+               n += proto.SizeVarint(1002<<3 | proto.WireBytes)
+               n += proto.SizeVarint(uint64(s))
+               n += s
+       case *InstructionRequest_ProcessBundleSplit:
+               s := proto.Size(x.ProcessBundleSplit)
+               n += proto.SizeVarint(1003<<3 | proto.WireBytes)
+               n += proto.SizeVarint(uint64(s))
+               n += s
+       case nil:
+       default:
+               panic(fmt.Sprintf("proto: unexpected type %T in oneof", x))
+       }
+       return n
+// The response for an associated request the SDK had been asked to fulfill.
+// Stable
+type InstructionResponse struct {
+       // (Required) A reference provided by the runner which represents a 
+       // execution. The InstructionResponse MUST have the matching id when
+       // responding to the runner.
+       InstructionId string 
+       // If this is specified, then this instruction has failed.
+       // A human readable string representing the reason as to why processing 
+       // failed.
+       Error string `protobuf:"bytes,2,opt,name=error" json:"error,omitempty"`
+       // If the instruction did not fail, it is required to return an 
+       // response type depending on the request this matches.
+       //
+       // Types that are valid to be assigned to Response:
+       //      *InstructionResponse_Register
+       //      *InstructionResponse_ProcessBundle
+       //      *InstructionResponse_ProcessBundleProgress
+       //      *InstructionResponse_ProcessBundleSplit
+       Response isInstructionResponse_Response `protobuf_oneof:"response"`
+func (m *InstructionResponse) Reset()                    { *m = 
InstructionResponse{} }
+func (m *InstructionResponse) String() string            { return 
proto.CompactTextString(m) }
+func (*InstructionResponse) ProtoMessage()               {}
+func (*InstructionResponse) Descriptor() ([]byte, []int) { return 
fileDescriptor0, []int{3} }
+type isInstructionResponse_Response interface {
+       isInstructionResponse_Response()
+type InstructionResponse_Register struct {
+       Register *RegisterResponse 
+type InstructionResponse_ProcessBundle struct {
+       ProcessBundle *ProcessBundleResponse 
+type InstructionResponse_ProcessBundleProgress struct {
+       ProcessBundleProgress *ProcessBundleProgressResponse 
+type InstructionResponse_ProcessBundleSplit struct {
+       ProcessBundleSplit *ProcessBundleSplitResponse 
+func (*InstructionResponse_Register) isInstructionResponse_Response()          
+func (*InstructionResponse_ProcessBundle) isInstructionResponse_Response()     
+func (*InstructionResponse_ProcessBundleProgress) 
isInstructionResponse_Response() {}
+func (*InstructionResponse_ProcessBundleSplit) 
isInstructionResponse_Response()    {}
+func (m *InstructionResponse) GetResponse() isInstructionResponse_Response {
+       if m != nil {
+               return m.Response
+       }
+       return nil
+func (m *InstructionResponse) GetInstructionId() string {
+       if m != nil {
+               return m.InstructionId
+       }
+       return ""
+func (m *InstructionResponse) GetError() string {
+       if m != nil {
+               return m.Error
+       }
+       return ""
+func (m *InstructionResponse) GetRegister() *RegisterResponse {
+       if x, ok := m.GetResponse().(*InstructionResponse_Register); ok {
+               return x.Register
+       }
+       return nil
+func (m *InstructionResponse) GetProcessBundle() *ProcessBundleResponse {
+       if x, ok := m.GetResponse().(*InstructionResponse_ProcessBundle); ok {
+               return x.ProcessBundle
+       }
+       return nil
+func (m *InstructionResponse) GetProcessBundleProgress() 
*ProcessBundleProgressResponse {
+       if x, ok := 
m.GetResponse().(*InstructionResponse_ProcessBundleProgress); ok {
+               return x.ProcessBundleProgress
+       }
+       return nil
+func (m *InstructionResponse) GetProcessBundleSplit() 
*ProcessBundleSplitResponse {
+       if x, ok := m.GetResponse().(*InstructionResponse_ProcessBundleSplit); 
ok {
+               return x.ProcessBundleSplit
+       }
+       return nil
+// XXX_OneofFuncs is for the internal use of the proto package.
+func (*InstructionResponse) XXX_OneofFuncs() (func(msg proto.Message, b 
*proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) 
(bool, error), func(msg proto.Message) (n int), []interface{}) {
+       return _InstructionResponse_OneofMarshaler, 
_InstructionResponse_OneofUnmarshaler, _InstructionResponse_OneofSizer, 
+               (*InstructionResponse_Register)(nil),
+               (*InstructionResponse_ProcessBundle)(nil),
+               (*InstructionResponse_ProcessBundleProgress)(nil),
+               (*InstructionResponse_ProcessBundleSplit)(nil),
+       }
+func _InstructionResponse_OneofMarshaler(msg proto.Message, b *proto.Buffer) 
error {
+       m := msg.(*InstructionResponse)
+       // response
+       switch x := m.Response.(type) {
+       case *InstructionResponse_Register:
+               b.EncodeVarint(1000<<3 | proto.WireBytes)
+               if err := b.EncodeMessage(x.Register); err != nil {
+                       return err
+               }
+       case *InstructionResponse_ProcessBundle:
+               b.EncodeVarint(1001<<3 | proto.WireBytes)
+               if err := b.EncodeMessage(x.ProcessBundle); err != nil {
+                       return err
+               }
+       case *InstructionResponse_ProcessBundleProgress:
+               b.EncodeVarint(1002<<3 | proto.WireBytes)
+               if err := b.EncodeMessage(x.ProcessBundleProgress); err != nil {
+                       return err
+               }
+       case *InstructionResponse_ProcessBundleSplit:
+               b.EncodeVarint(1003<<3 | proto.WireBytes)
+               if err := b.EncodeMessage(x.ProcessBundleSplit); err != nil {
+                       return err
+               }
+       case nil:
+       default:
+               return fmt.Errorf("InstructionResponse.Response has unexpected 
type %T", x)
+       }
+       return nil
+func _InstructionResponse_OneofUnmarshaler(msg proto.Message, tag, wire int, b 
*proto.Buffer) (bool, error) {
+       m := msg.(*InstructionResponse)
+       switch tag {
+       case 1000: // response.register
+               if wire != proto.WireBytes {
+                       return true, proto.ErrInternalBadWireType
+               }
+               msg := new(RegisterResponse)
+               err := b.DecodeMessage(msg)
+               m.Response = &InstructionResponse_Register{msg}
+               return true, err
+       case 1001: // response.process_bundle
+               if wire != proto.WireBytes {
+                       return true, proto.ErrInternalBadWireType
+               }
+               msg := new(ProcessBundleResponse)
+               err := b.DecodeMessage(msg)
+               m.Response = &InstructionResponse_ProcessBundle{msg}
+               return true, err
+       case 1002: // response.process_bundle_progress
+               if wire != proto.WireBytes {
+                       return true, proto.ErrInternalBadWireType
+               }
+               msg := new(ProcessBundleProgressResponse)
+               err := b.DecodeMessage(msg)
+               m.Response = &InstructionResponse_ProcessBundleProgress{msg}
+               return true, err
+       case 1003: // response.process_bundle_split
+               if wire != proto.WireBytes {
+                       return true, proto.ErrInternalBadWireType
+               }
+               msg := new(ProcessBundleSplitResponse)
+               err := b.DecodeMessage(msg)
+               m.Response = &InstructionResponse_ProcessBundleSplit{msg}
+               return true, err
+       default:
+               return false, nil
+       }
+func _InstructionResponse_OneofSizer(msg proto.Message) (n int) {
+       m := msg.(*InstructionResponse)
+       // response
+       switch x := m.Response.(type) {
+       case *InstructionResponse_Register:
+               s := proto.Size(x.Register)
+               n += proto.SizeVarint(1000<<3 | proto.WireBytes)
+               n += proto.SizeVarint(uint64(s))
+               n += s
+       case *InstructionResponse_ProcessBundle:
+               s := proto.Size(x.ProcessBundle)
+               n += proto.SizeVarint(1001<<3 | proto.WireBytes)
+               n += proto.SizeVarint(uint64(s))
+               n += s
+       case *InstructionResponse_ProcessBundleProgress:
+               s := proto.Size(x.ProcessBundleProgress)
+               n += proto.SizeVarint(1002<<3 | proto.WireBytes)
+               n += proto.SizeVarint(uint64(s))
+               n += s
+       case *InstructionResponse_ProcessBundleSplit:
+               s := proto.Size(x.ProcessBundleSplit)
+               n += proto.SizeVarint(1003<<3 | proto.WireBytes)
+               n += proto.SizeVarint(uint64(s))
+               n += s
+       case nil:
+       default:
+               panic(fmt.Sprintf("proto: unexpected type %T in oneof", x))
+       }
+       return n
+// A list of objects which can be referred to by the runner in
+// future requests.
+// Stable
+type RegisterRequest struct {
+       // (Optional) The set of descriptors used to process bundles.
+       ProcessBundleDescriptor []*ProcessBundleDescriptor 
+func (m *RegisterRequest) Reset()                    { *m = RegisterRequest{} }
+func (m *RegisterRequest) String() string            { return 
proto.CompactTextString(m) }
+func (*RegisterRequest) ProtoMessage()               {}
+func (*RegisterRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, 
[]int{4} }
+func (m *RegisterRequest) GetProcessBundleDescriptor() 
[]*ProcessBundleDescriptor {
+       if m != nil {
+               return m.ProcessBundleDescriptor
+       }
+       return nil
+// Stable
+type RegisterResponse struct {
+func (m *RegisterResponse) Reset()                    { *m = 
RegisterResponse{} }
+func (m *RegisterResponse) String() string            { return 
proto.CompactTextString(m) }
+func (*RegisterResponse) ProtoMessage()               {}
+func (*RegisterResponse) Descriptor() ([]byte, []int) { return 
fileDescriptor0, []int{5} }
+// Definitions that should be used to construct the bundle processing graph.
+type ProcessBundleDescriptor struct {
+       // (Required) A pipeline level unique id which can be used as a 
reference to
+       // refer to this.
+       Id string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"`
+       // (Required) A map from pipeline-scoped id to PTransform.
+       Transforms map[string]*org_apache_beam_model_pipeline_v1.PTransform 
`protobuf:"bytes,2,rep,name=transforms" json:"transforms,omitempty" 
protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
+       // (Required) A map from pipeline-scoped id to PCollection.
+       Pcollections map[string]*org_apache_beam_model_pipeline_v1.PCollection 
`protobuf:"bytes,3,rep,name=pcollections" json:"pcollections,omitempty" 
protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
+       // (Required) A map from pipeline-scoped id to WindowingStrategy.
+       WindowingStrategies 
json:"windowing_strategies,omitempty" protobuf_key:"bytes,1,opt,name=key" 
+       // (Required) A map from pipeline-scoped id to Coder.
+       Coders map[string]*org_apache_beam_model_pipeline_v1.Coder 
`protobuf:"bytes,5,rep,name=coders" json:"coders,omitempty" 
protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
+       // (Required) A map from pipeline-scoped id to Environment.
+       Environments map[string]*org_apache_beam_model_pipeline_v1.Environment 
`protobuf:"bytes,6,rep,name=environments" json:"environments,omitempty" 
protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
+       // A descriptor describing the end point to use for State API
+       // calls. Required if the Runner intends to send remote references over 
+       // data plane or if any of the transforms rely on user state or side 
+       StateApiServiceDescriptor 
+func (m *ProcessBundleDescriptor) Reset()                    { *m = 
ProcessBundleDescriptor{} }
+func (m *ProcessBundleDescriptor) String() string            { return 
proto.CompactTextString(m) }
+func (*ProcessBundleDescriptor) ProtoMessage()               {}
+func (*ProcessBundleDescriptor) Descriptor() ([]byte, []int) { return 
fileDescriptor0, []int{6} }
+func (m *ProcessBundleDescriptor) GetId() string {
+       if m != nil {
+               return m.Id
+       }
+       return ""
+func (m *ProcessBundleDescriptor) GetTransforms() 
map[string]*org_apache_beam_model_pipeline_v1.PTransform {
+       if m != nil {
+               return m.Transforms
+       }
+       return nil
+func (m *ProcessBundleDescriptor) GetPcollections() 
map[string]*org_apache_beam_model_pipeline_v1.PCollection {
+       if m != nil {
+               return m.Pcollections
+       }
+       return nil
+func (m *ProcessBundleDescriptor) GetWindowingStrategies() 
map[string]*org_apache_beam_model_pipeline_v1.WindowingStrategy {
+       if m != nil {
+               return m.WindowingStrategies
+       }
+       return nil
+func (m *ProcessBundleDescriptor) GetCoders() 
map[string]*org_apache_beam_model_pipeline_v1.Coder {
+       if m != nil {
+               return m.Coders
+       }
+       return nil
+func (m *ProcessBundleDescriptor) GetEnvironments() 
map[string]*org_apache_beam_model_pipeline_v1.Environment {
+       if m != nil {
+               return m.Environments
+       }
+       return nil
+func (m *ProcessBundleDescriptor) GetStateApiServiceDescriptor() 
*org_apache_beam_model_pipeline_v11.ApiServiceDescriptor {
+       if m != nil {
+               return m.StateApiServiceDescriptor
+       }
+       return nil
+// A request to process a given bundle.
+// Stable
+type ProcessBundleRequest struct {
+       // (Required) A reference to the process bundle descriptor that must be
+       // instantiated and executed by the SDK harness.
+       ProcessBundleDescriptorReference string 
+       // (Optional) A list of cache tokens that can be used by an SDK to reuse
+       // cached data returned by the State API across multiple bundles.
+       CacheTokens [][]byte 
+func (m *ProcessBundleRequest) Reset()                    { *m = 
ProcessBundleRequest{} }
+func (m *ProcessBundleRequest) String() string            { return 
proto.CompactTextString(m) }
+func (*ProcessBundleRequest) ProtoMessage()               {}
+func (*ProcessBundleRequest) Descriptor() ([]byte, []int) { return 
fileDescriptor0, []int{7} }
+func (m *ProcessBundleRequest) GetProcessBundleDescriptorReference() string {
+       if m != nil {
+               return m.ProcessBundleDescriptorReference
+       }
+       return ""
+func (m *ProcessBundleRequest) GetCacheTokens() [][]byte {
+       if m != nil {
+               return m.CacheTokens
+       }
+       return nil
+// Stable
+type ProcessBundleResponse struct {
+       // (Optional) If metrics reporting is supported by the SDK, this 
+       // the final metrics to record for this bundle.
+       Metrics *Metrics `protobuf:"bytes,1,opt,name=metrics" 
+func (m *ProcessBundleResponse) Reset()                    { *m = 
ProcessBundleResponse{} }
+func (m *ProcessBundleResponse) String() string            { return 
proto.CompactTextString(m) }
+func (*ProcessBundleResponse) ProtoMessage()               {}
+func (*ProcessBundleResponse) Descriptor() ([]byte, []int) { return 
fileDescriptor0, []int{8} }
+func (m *ProcessBundleResponse) GetMetrics() *Metrics {
+       if m != nil {
+               return m.Metrics
+       }
+       return nil
+// A request to report progress information for a given bundle.
+// This is an optional request to be handled and is used to support advanced
+// SDK features such as SplittableDoFn, user level metrics etc.
+type ProcessBundleProgressRequest struct {
+       // (Required) A reference to an active process bundle request with the 
+       // instruction id.
+       InstructionReference string 
+func (m *ProcessBundleProgressRequest) Reset()                    { *m = 
ProcessBundleProgressRequest{} }
+func (m *ProcessBundleProgressRequest) String() string            { return 
proto.CompactTextString(m) }
+func (*ProcessBundleProgressRequest) ProtoMessage()               {}
+func (*ProcessBundleProgressRequest) Descriptor() ([]byte, []int) { return 
fileDescriptor0, []int{9} }
+func (m *ProcessBundleProgressRequest) GetInstructionReference() string {
+       if m != nil {
+               return m.InstructionReference
+       }
+       return ""
+type Metrics struct {
+       Ptransforms map[string]*Metrics_PTransform 
`protobuf:"bytes,1,rep,name=ptransforms" json:"ptransforms,omitempty" 
protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
+       User        map[string]*Metrics_User       
`protobuf:"bytes,2,rep,name=user" json:"user,omitempty" 
protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"`
+func (m *Metrics) Reset()                    { *m = Metrics{} }
+func (m *Metrics) String() string            { return 
proto.CompactTextString(m) }
+func (*Metrics) ProtoMessage()               {}
+func (*Metrics) Descriptor() ([]byte, []int) { return fileDescriptor0, 
[]int{10} }
+func (m *Metrics) GetPtransforms() map[string]*Metrics_PTransform {
+       if m != nil {
+               return m.Ptransforms
+       }
+       return nil
+func (m *Metrics) GetUser() map[string]*Metrics_User {
+       if m != nil {
+               return m.User
+       }
+       return nil
+// PTransform level metrics.
+// These metrics are split into processed and active element groups for
+// progress reporting purposes. This allows a Runner to see what is measured,
+// what is estimated and what can be extrapolated to be able to accurately
+// estimate the backlog of remaining work.
+type Metrics_PTransform struct {
+       // (Required): Metrics for processed elements.
+       ProcessedElements *Metrics_PTransform_ProcessedElements 
+       // (Required): Metrics for active elements.
+       ActiveElements *Metrics_PTransform_ActiveElements 
+       // (Optional): Map from local output name to its watermark.
+       // The watermarks reported are tentative, to get a better sense of 
+       // while processing a bundle but before it is committed. At bundle 
+       // time, a Runner needs to also take into account the timers set to 
+       // the actual watermarks.
+       Watermarks map[string]int64 `protobuf:"bytes,3,rep,name=watermarks" 
json:"watermarks,omitempty" protobuf_key:"bytes,1,opt,name=key" 
+func (m *Metrics_PTransform) Reset()                    { *m = 
Metrics_PTransform{} }
+func (m *Metrics_PTransform) String() string            { return 
proto.CompactTextString(m) }
+func (*Metrics_PTransform) ProtoMessage()               {}
+func (*Metrics_PTransform) Descriptor() ([]byte, []int) { return 
fileDescriptor0, []int{10, 0} }
+func (m *Metrics_PTransform) GetProcessedElements() 
*Metrics_PTransform_ProcessedElements {
+       if m != nil {
+               return m.ProcessedElements
+       }
+       return nil
+func (m *Metrics_PTransform) GetActiveElements() 
*Metrics_PTransform_ActiveElements {
+       if m != nil {
+               return m.ActiveElements
+       }
+       return nil
+func (m *Metrics_PTransform) GetWatermarks() map[string]int64 {
+       if m != nil {
+               return m.Watermarks
+       }
+       return nil
+// Metrics that are measured for processed and active element groups.
+type Metrics_PTransform_Measured struct {
+       // (Optional) Map from local input name to number of elements processed
+       // from this input.
+       // If unset, assumed to be the sum of the outputs of all producers to
+       // this transform (for ProcessedElements) and 0 (for ActiveElements).
+       InputElementCounts map[string]int64 
json:"input_element_counts,omitempty" protobuf_key:"bytes,1,opt,name=key" 
+       // (Required) Map from local output name to number of elements produced
+       // for this output.
+       OutputElementCounts map[string]int64 
json:"output_element_counts,omitempty" protobuf_key:"bytes,1,opt,name=key" 
+       // (Optional) The total time spent so far in processing the elements in
+       // this group, in seconds.
+       TotalTimeSpent float64 
+func (m *Metrics_PTransform_Measured) Reset()         { *m = 
Metrics_PTransform_Measured{} }
+func (m *Metrics_PTransform_Measured) String() string { return 
proto.CompactTextString(m) }
+func (*Metrics_PTransform_Measured) ProtoMessage()    {}
+func (*Metrics_PTransform_Measured) Descriptor() ([]byte, []int) {
+       return fileDescriptor0, []int{10, 0, 0}
+func (m *Metrics_PTransform_Measured) GetInputElementCounts() map[string]int64 
+       if m != nil {
+               return m.InputElementCounts
+       }
+       return nil
+func (m *Metrics_PTransform_Measured) GetOutputElementCounts() 
map[string]int64 {
+       if m != nil {
+               return m.OutputElementCounts
+       }
+       return nil
+func (m *Metrics_PTransform_Measured) GetTotalTimeSpent() float64 {
+       if m != nil {
+               return m.TotalTimeSpent
+       }
+       return 0
+// Metrics for fully processed elements.
+type Metrics_PTransform_ProcessedElements struct {
+       // (Required)
+       Measured *Metrics_PTransform_Measured 
`protobuf:"bytes,1,opt,name=measured" json:"measured,omitempty"`
+func (m *Metrics_PTransform_ProcessedElements) Reset()         { *m = 
Metrics_PTransform_ProcessedElements{} }
+func (m *Metrics_PTransform_ProcessedElements) String() string { return 
proto.CompactTextString(m) }
+func (*Metrics_PTransform_ProcessedElements) ProtoMessage()    {}
+func (*Metrics_PTransform_ProcessedElements) Descriptor() ([]byte, []int) {
+       return fileDescriptor0, []int{10, 0, 1}
+func (m *Metrics_PTransform_ProcessedElements) GetMeasured() 
*Metrics_PTransform_Measured {
+       if m != nil {
+               return m.Measured
+       }
+       return nil
+// Metrics for active elements.
+// An element is considered active if the SDK has started but not finished
+// processing it yet.
+type Metrics_PTransform_ActiveElements struct {
+       // (Required)
+       Measured *Metrics_PTransform_Measured 
`protobuf:"bytes,1,opt,name=measured" json:"measured,omitempty"`
+       // (Optional) Sum of estimated fraction of known work remaining for all
+       // active elements, as reported by this transform.
+       // If not reported, a Runner could extrapolate this from the processed
+       // elements.
+       // TODO: Handle the case when known work is infinite.
+       FractionRemaining float64 
+       // (Optional) Map from local output name to sum of estimated number
+       // of elements remaining for this output from all active elements,
+       // as reported by this transform.
+       // If not reported, a Runner could extrapolate this from the processed
+       // elements.
+       OutputElementsRemaining map[string]int64 
 json:"output_elements_remaining,omitempty" protobuf_key:"bytes,1,opt,name=key" 
+func (m *Metrics_PTransform_ActiveElements) Reset()         { *m = 
Metrics_PTransform_ActiveElements{} }
+func (m *Metrics_PTransform_ActiveElements) String() string { return 
proto.CompactTextString(m) }
+func (*Metrics_PTransform_ActiveElements) ProtoMessage()    {}
+func (*Metrics_PTransform_ActiveElements) Descriptor() ([]byte, []int) {
+       return fileDescriptor0, []int{10, 0, 2}
+func (m *Metrics_PTransform_ActiveElements) GetMeasured() 
*Metrics_PTransform_Measured {
+       if m != nil {
+               return m.Measured
+       }
+       return nil
+func (m *Metrics_PTransform_ActiveElements) GetFractionRemaining() float64 {
+       if m != nil {
+               return m.FractionRemaining
+       }
+       return 0
+func (m *Metrics_PTransform_ActiveElements) GetOutputElementsRemaining() 
map[string]int64 {
+       if m != nil {
+               return m.OutputElementsRemaining
+       }
+       return nil
+// User defined metrics
+type Metrics_User struct {
+func (m *Metrics_User) Reset()                    { *m = Metrics_User{} }
+func (m *Metrics_User) String() string            { return 
proto.CompactTextString(m) }
+func (*Metrics_User) ProtoMessage()               {}
+func (*Metrics_User) Descriptor() ([]byte, []int) { return fileDescriptor0, 
[]int{10, 1} }
+type ProcessBundleProgressResponse struct {
+       // (Required)
+       Metrics *Metrics `protobuf:"bytes,1,opt,name=metrics" 
+func (m *ProcessBundleProgressResponse) Reset()                    { *m = 
ProcessBundleProgressResponse{} }
+func (m *ProcessBundleProgressResponse) String() string            { return 
proto.CompactTextString(m) }
+func (*ProcessBundleProgressResponse) ProtoMessage()               {}
+func (*ProcessBundleProgressResponse) Descriptor() ([]byte, []int) { return 
fileDescriptor0, []int{11} }
+func (m *ProcessBundleProgressResponse) GetMetrics() *Metrics {
+       if m != nil {
+               return m.Metrics
+       }
+       return nil
+type ProcessBundleSplitRequest struct {
+       // (Required) A reference to an active process bundle request with the 
+       // instruction id.
+       InstructionReference string 
+       // (Required) The fraction of work (when compared to the known amount 
of work)
+       // the process bundle request should try to split at.
+       Fraction float64 `protobuf:"fixed64,2,opt,name=fraction" 
+func (m *ProcessBundleSplitRequest) Reset()                    { *m = 
ProcessBundleSplitRequest{} }
+func (m *ProcessBundleSplitRequest) String() string            { return 
proto.CompactTextString(m) }
+func (*ProcessBundleSplitRequest) ProtoMessage()               {}
+func (*ProcessBundleSplitRequest) Descriptor() ([]byte, []int) { return 
fileDescriptor0, []int{12} }
+func (m *ProcessBundleSplitRequest) GetInstructionReference() string {
+       if m != nil {
+               return m.InstructionReference
+       }
+       return ""
+func (m *ProcessBundleSplitRequest) GetFraction() float64 {
+       if m != nil {
+               return m.Fraction
+       }
+       return 0
+// urn:org.apache.beam:restriction:element-count:1.0
+type ElementCountRestriction struct {
+       // A restriction representing the number of elements that should be 
+       // Effectively the range [0, count]
+       Count int64 `protobuf:"varint,1,opt,name=count" json:"count,omitempty"`
+func (m *ElementCountRestriction) Reset()                    { *m = 
ElementCountRestriction{} }
+func (m *ElementCountRestriction) String() string            { return 
proto.CompactTextString(m) }
+func (*ElementCountRestriction) ProtoMessage()               {}
+func (*ElementCountRestriction) Descriptor() ([]byte, []int) { return 
fileDescriptor0, []int{13} }
+func (m *ElementCountRestriction) GetCount() int64 {
+       if m != nil {
+               return m.Count
+       }
+       return 0
+// urn:org.apache.beam:restriction:element-count-skip:1.0
+type ElementCountSkipRestriction struct {
+       // A restriction representing the number of elements that should be 
+       // Effectively the range (count, infinity]
+       Count int64 `protobuf:"varint,1,opt,name=count" json:"count,omitempty"`
+func (m *ElementCountSkipRestriction) Reset()                    { *m = 
ElementCountSkipRestriction{} }
+func (m *ElementCountSkipRestriction) String() string            { return 
proto.CompactTextString(m) }
+func (*ElementCountSkipRestriction) ProtoMessage()               {}
+func (*ElementCountSkipRestriction) Descriptor() ([]byte, []int) { return 
fileDescriptor0, []int{14} }
+func (m *ElementCountSkipRestriction) GetCount() int64 {
+       if m != nil {
+               return m.Count
+       }
+       return 0
+// Each primitive transform that is splittable is defined by a restriction
+// it is currently processing. During splitting, that currently active
+// restriction (R_initial) is split into 2 components:
+//   * a restriction (R_done) representing all elements that will be fully
+//     processed
+//   * a restriction (R_todo) representing all elements that will not be fully
+//     processed
+// where:
+//   R_initial = R_done ⋃ R_todo
+type PrimitiveTransformSplit struct {
+       // (Required) A reference to a primitive transform with the given id 
+       // is part of the active process bundle request with the given 
+       // id.
+       PrimitiveTransformReference string 
+       // (Required) A function specification describing the restriction
+       // that has been completed by the primitive transform.
+       //
+       // For example, a remote GRPC source will have a specific urn and data
+       // block containing an ElementCountRestriction.
+       CompletedRestriction *org_apache_beam_model_pipeline_v1.FunctionSpec 
+       // (Required) A function specification describing the restriction
+       // representing the remainder of work for the primitive transform.
+       //
+       // FOr example, a remote GRPC source will have a specific urn and data
+       // block contain an ElemntCountSkipRestriction.
+       RemainingRestriction *org_apache_beam_model_pipeline_v1.FunctionSpec 
+func (m *PrimitiveTransformSplit) Reset()                    { *m = 
PrimitiveTransformSplit{} }
+func (m *PrimitiveTransformSplit) String() string            { return 
proto.CompactTextString(m) }
+func (*PrimitiveTransformSplit) ProtoMessage()               {}
+func (*PrimitiveTransformSplit) Descriptor() ([]byte, []int) { return 
fileDescriptor0, []int{15} }
+func (m *PrimitiveTransformSplit) GetPrimitiveTransformReference() string {
+       if m != nil {
+               return m.PrimitiveTransformReference
+       }
+       return ""
+func (m *PrimitiveTransformSplit) GetCompletedRestriction() 
*org_apache_beam_model_pipeline_v1.FunctionSpec {
+       if m != nil {
+               return m.CompletedRestriction
+       }
+       return nil
+func (m *PrimitiveTransformSplit) GetRemainingRestriction() 
*org_apache_beam_model_pipeline_v1.FunctionSpec {
+       if m != nil {
+               return m.RemainingRestriction
+       }
+       return nil
+type ProcessBundleSplitResponse struct {
+       // If primitive transform B and C are siblings and descendants of A and 
A, B,
+       // and C report a split. Then B and C's restrictions are relative to 
+       //   R = A_done
+       //     ⋃ (A_boundary ⋂ B_done)
+       //     ⋃ (A_boundary ⋂ B_todo)
+       //     ⋃ (A_boundary ⋂ B_todo)
+       //     ⋃ (A_boundary ⋂ C_todo)
+       //     ⋃ A_todo
+       // If there is no descendant of B or C also reporting a split, than
+       //   B_boundary = ∅ and C_boundary = ∅
+       //
+       // This restriction is processed and completed by the currently active 
+       // bundle request:
+       //   A_done ⋃ (A_boundary ⋂ B_done)
+       //          ⋃ (A_boundary ⋂ C_done)
+       // and these restrictions will be processed by future process bundle 
+       //   A_boundary ⋂ B_todo (passed to SDF B directly)
+       //   A_boundary ⋂ C_todo (passed to SDF C directly)
+       //   A_todo (passed to SDF A directly)
+       //
+       // Note that descendants splits should only be reported if it is 
+       // to compute the boundary restriction intersected with descendants 
+       // Also note, that the boundary restriction may represent a set of 
+       // produced by a parent primitive transform which can not be split at 
+       // element or that there are intermediate unsplittable primitive 
+       // between an ancestor splittable function and a descendant splittable
+       // function which may have more than one output per element. Finally 
+       // that the descendant splits should only be reported if the split
+       // information is relatively compact.
+       Splits []*PrimitiveTransformSplit `protobuf:"bytes,1,rep,name=splits" 
+func (m *ProcessBundleSplitResponse) Reset()                    { *m = 
ProcessBundleSplitResponse{} }
+func (m *ProcessBundleSplitResponse) String() string            { return 
proto.CompactTextString(m) }
+func (*ProcessBundleSplitResponse) ProtoMessage()               {}
+func (*ProcessBundleSplitResponse) Descriptor() ([]byte, []int) { return 
fileDescriptor0, []int{16} }
+func (m *ProcessBundleSplitResponse) GetSplits() []*PrimitiveTransformSplit {
+       if m != nil {
+               return m.Splits
+       }
+       return nil
+// Messages used to represent logical byte streams.
+// Stable
+type Elements struct {
+       // (Required) A list containing parts of logical byte streams.
+       Data []*Elements_Data `protobuf:"bytes,1,rep,name=data" 
+func (m *Elements) Reset()                    { *m = Elements{} }
+func (m *Elements) String() string            { return 
proto.CompactTextString(m) }
+func (*Elements) ProtoMessage()               {}
+func (*Elements) Descriptor() ([]byte, []int) { return fileDescriptor0, 
[]int{17} }
+func (m *Elements) GetData() []*Elements_Data {
+       if m != nil {
+               return m.Data
+       }
+       return nil
+// Represents multiple encoded elements in nested context for a given named
+// instruction and target.
+type Elements_Data struct {
+       // (Required) A reference to an active instruction request with the 
+       // instruction id.
+       InstructionReference string 
+       // (Required) A definition representing a consumer or producer of this 
+       // If received by a harness, this represents the consumer within that
+       // harness that should consume these bytes. If sent by a harness, this
+       // represents the producer of these bytes.
+       //
+       // Note that a single element may span multiple Data messages.
+       //
+       // Note that a sending/receiving pair should share the same target
+       // identifier.
+       Target *Target `protobuf:"bytes,2,opt,name=target" 
+       // (Optional) Represents a part of a logical byte stream. Elements 
+       // the logical byte stream are encoded in the nested context and
+       // concatenated together.
+       //
+       // An empty data block represents the end of stream for the given
+       // instruction and target.
+       Data []byte `protobuf:"bytes,3,opt,name=data,proto3" 
+func (m *Elements_Data) Reset()                    { *m = Elements_Data{} }
+func (m *Elements_Data) String() string            { return 
proto.CompactTextString(m) }
+func (*Elements_Data) ProtoMessage()               {}
+func (*Elements_Data) Descriptor() ([]byte, []int) { return fileDescriptor0, 
[]int{17, 0} }
+func (m *Elements_Data) GetInstructionReference() string {
+       if m != nil {
+               return m.InstructionReference
+       }
+       return ""
+func (m *Elements_Data) GetTarget() *Target {
+       if m != nil {
+               return m.Target
+       }
+       return nil
+func (m *Elements_Data) GetData() []byte {
+       if m != nil {
+               return m.Data
+       }
+       return nil
+type StateRequest struct {
+       // (Required) An unique identifier provided by the SDK which represents 
+       // requests execution. The StateResponse corresponding with this request
+       // will have the matching id.
+       Id string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"`
+       // (Required) The associated instruction id of the work that is 
+       // being processed. This allows for the runner to associate any 
+       // to state to be committed with the appropriate work execution.
+       InstructionReference string 
+       // (Required) The state key this request is for.
+       StateKey *StateKey `protobuf:"bytes,3,opt,name=state_key,json=stateKey" 
+       // (Required) The action to take on this request.
+       //
+       // Types that are valid to be assigned to Request:
+       //      *StateRequest_Get
+       //      *StateRequest_Append
+       //      *StateRequest_Clear
+       Request isStateRequest_Request `protobuf_oneof:"request"`
+func (m *StateRequest) Reset()                    { *m = StateRequest{} }
+func (m *StateRequest) String() string            { return 
proto.CompactTextString(m) }
+func (*StateRequest) ProtoMessage()               {}
+func (*StateRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, 
[]int{18} }
+type isStateRequest_Request interface {
+       isStateRequest_Request()
+type StateRequest_Get struct {
+       Get *StateGetRequest `protobuf:"bytes,1000,opt,name=get,oneof"`
+type StateRequest_Append struct {
+       Append *StateAppendRequest `protobuf:"bytes,1001,opt,name=append,oneof"`
+type StateRequest_Clear struct {
+       Clear *StateClearRequest `protobuf:"bytes,1002,opt,name=clear,oneof"`
+func (*StateRequest_Get) isStateRequest_Request()    {}
+func (*StateRequest_Append) isStateRequest_Request() {}
+func (*StateRequest_Clear) isStateRequest_Request()  {}
+func (m *StateRequest) GetRequest() isStateRequest_Request {
+       if m != nil {
+               return m.Request
+       }
+       return nil
+func (m *StateRequest) GetId() string {
+       if m != nil {
+               return m.Id
+       }
+       return ""
+func (m *StateRequest) GetInstructionReference() string {
+       if m != nil {
+               return m.InstructionReference
+       }
+       return ""
+func (m *StateRequest) GetStateKey() *StateKey {
+       if m != nil {
+               return m.StateKey
+       }
+       return nil
+func (m *StateRequest) GetGet() *StateGetRequest {
+       if x, ok := m.GetRequest().(*StateRequest_Get); ok {
+               return x.Get
+       }
+       return nil
+func (m *StateRequest) GetAppend() *StateAppendRequest {
+       if x, ok := m.GetRequest().(*StateRequest_Append); ok {
+               return x.Append
+       }
+       return nil
+func (m *StateRequest) GetClear() *StateClearRequest {
+       if x, ok := m.GetRequest().(*StateRequest_Clear); ok {
+               return x.Clear
+       }
+       return nil
+// XXX_OneofFuncs is for the internal use of the proto package.
+func (*StateRequest) XXX_OneofFuncs() (func(msg proto.Message, b 
*proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) 
(bool, error), func(msg proto.Message) (n int), []interface{}) {
+       return _StateRequest_OneofMarshaler, _StateRequest_OneofUnmarshaler, 
_StateRequest_OneofSizer, []interface{}{
+               (*StateRequest_Get)(nil),
+               (*StateRequest_Append)(nil),
+               (*StateRequest_Clear)(nil),
+       }
+func _StateRequest_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
+       m := msg.(*StateRequest)
+       // request
+       switch x := m.Request.(type) {
+       case *StateRequest_Get:
+               b.EncodeVarint(1000<<3 | proto.WireBytes)
+               if err := b.EncodeMessage(x.Get); err != nil {
+                       return err
+               }
+       case *StateRequest_Append:
+               b.EncodeVarint(1001<<3 | proto.WireBytes)
+               if err := b.EncodeMessage(x.Append); err != nil {
+                       return err
+               }
+       case *StateRequest_Clear:
+               b.EncodeVarint(1002<<3 | proto.WireBytes)
+               if err := b.EncodeMessage(x.Clear); err != nil {
+                       return err
+               }
+       case nil:
+       default:
+               return fmt.Errorf("StateRequest.Request has unexpected type 
%T", x)
+       }
+       return nil
+func _StateRequest_OneofUnmarshaler(msg proto.Message, tag, wire int, b 
*proto.Buffer) (bool, error) {
+       m := msg.(*StateRequest)
+       switch tag {
+       case 1000: // request.get
+               if wire != proto.WireBytes {
+                       return true, proto.ErrInternalBadWireType
+               }
+               msg := new(StateGetRequest)
+               err := b.DecodeMessage(msg)
+               m.Request = &StateRequest_Get{msg}
+               return true, err
+       case 1001: // request.append
+               if wire != proto.WireBytes {
+                       return true, proto.ErrInternalBadWireType
+               }
+               msg := new(StateAppendRequest)
+               err := b.DecodeMessage(msg)
+               m.Request = &StateRequest_Append{msg}
+               return true, err
+       case 1002: // request.clear
+               if wire != proto.WireBytes {
+                       return true, proto.ErrInternalBadWireType
+               }
+               msg := new(StateClearRequest)
+               err := b.DecodeMessage(msg)
+               m.Request = &StateRequest_Clear{msg}
+               return true, err
+       default:
+               return false, nil
+       }
+func _StateRequest_OneofSizer(msg proto.Message) (n int) {
+       m := msg.(*StateRequest)
+       // request
+       switch x := m.Request.(type) {
+       case *StateRequest_Get:
+               s := proto.Size(x.Get)
+               n += proto.SizeVarint(1000<<3 | proto.WireBytes)
+               n += proto.SizeVarint(uint64(s))
+               n += s
+       case *StateRequest_Append:
+               s := proto.Size(x.Append)
+               n += proto.SizeVarint(1001<<3 | proto.WireBytes)
+               n += proto.SizeVarint(uint64(s))
+               n += s
+       case *StateRequest_Clear:
+               s := proto.Size(x.Clear)
+               n += proto.SizeVarint(1002<<3 | proto.WireBytes)
+               n += proto.SizeVarint(uint64(s))
+               n += s
+       case nil:
+       default:
+               panic(fmt.Sprintf("proto: unexpected type %T in oneof", x))
+       }
+       return n
+type StateResponse struct {
+       // (Required) A reference provided by the SDK which represents a 
+       // execution. The StateResponse must have the matching id when 
+       // to the SDK.
+       Id string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"`
+       // (Optional) If this is specified, then the state request has failed.
+       // A human readable string representing the reason as to why the request
+       // failed.
+       Error string `protobuf:"bytes,2,opt,name=error" json:"error,omitempty"`
+       // (Optional) If this is specified, then the result of this state 
+       // can be cached using the supplied token.
+       CacheToken []byte 
+       // A corresponding response matching the request will be populated.
+       //
+       // Types that are valid to be assigned to Response:
+       //      *StateResponse_Get
+       //      *StateResponse_Append
+       //      *StateResponse_Clear
+       Response isStateResponse_Response `protobuf_oneof:"response"`
+func (m *StateResponse) Reset()                    { *m = StateResponse{} }
+func (m *StateResponse) String() string            { return 
proto.CompactTextString(m) }
+func (*StateResponse) ProtoMessage()               {}
+func (*StateResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, 
[]int{19} }
+type isStateResponse_Response interface {
+       isStateResponse_Response()
+type StateResponse_Get struct {
+       Get *StateGetResponse `protobuf:"bytes,1000,opt,name=get,oneof"`
+type StateResponse_Append struct {
+       Append *StateAppendResponse 
+type StateResponse_Clear struct {
+       Clear *StateClearResponse `protobuf:"bytes,1002,opt,name=clear,oneof"`
+func (*StateResponse_Get) isStateResponse_Response()    {}
+func (*StateResponse_Append) isStateResponse_Response() {}
+func (*StateResponse_Clear) isStateResponse_Response()  {}
+func (m *StateResponse) GetResponse() isStateResponse_Response {
+       if m != nil {
+               return m.Response
+       }
+       return nil
+func (m *StateResponse) GetId() string {
+       if m != nil {
+               return m.Id
+       }
+       return ""
+func (m *StateResponse) GetError() string {
+       if m != nil {
+               return m.Error
+       }
+       return ""
+func (m *StateResponse) GetCacheToken() []byte {
+       if m != nil {
+               return m.CacheToken
+       }
+       return nil
+func (m *StateResponse) GetGet() *StateGetResponse {
+       if x, ok := m.GetResponse().(*StateResponse_Get); ok {
+               return x.Get
+       }
+       return nil
+func (m *StateResponse) GetAppend() *StateAppendResponse {
+       if x, ok := m.GetResponse().(*StateResponse_Append); ok {
+               return x.Append
+       }
+       return nil
+func (m *StateResponse) GetClear() *StateClearResponse {
+       if x, ok := m.GetResponse().(*StateResponse_Clear); ok {
+               return x.Clear
+       }
+       return nil
+// XXX_OneofFuncs is for the internal use of the proto package.
+func (*StateResponse) XXX_OneofFuncs() (func(msg proto.Message, b 
*proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) 
(bool, error), func(msg proto.Message) (n int), []interface{}) {
+       return _StateResponse_OneofMarshaler, _StateResponse_OneofUnmarshaler, 
_StateResponse_OneofSizer, []interface{}{
+               (*StateResponse_Get)(nil),
+               (*StateResponse_Append)(nil),
+               (*StateResponse_Clear)(nil),
+       }
+func _StateResponse_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
+       m := msg.(*StateResponse)
+       // response
+       switch x := m.Response.(type) {
+       case *StateResponse_Get:
+               b.EncodeVarint(1000<<3 | proto.WireBytes)
+               if err := b.EncodeMessage(x.Get); err != nil {
+                       return err
+               }
+       case *StateResponse_Append:
+               b.EncodeVarint(1001<<3 | proto.WireBytes)
+               if err := b.EncodeMessage(x.Append); err != nil {
+                       return err
+               }
+       case *StateResponse_Clear:
+               b.EncodeVarint(1002<<3 | proto.WireBytes)
+               if err := b.EncodeMessage(x.Clear); err != nil {
+                       return err
+               }
+       case nil:
+       default:
+               return fmt.Errorf("StateResponse.Response has unexpected type 
%T", x)
+       }
+       return nil
+func _StateResponse_OneofUnmarshaler(msg proto.Message, tag, wire int, b 
*proto.Buffer) (bool, error) {
+       m := msg.(*StateResponse)
+       switch tag {
+       case 1000: // response.get
+               if wire != proto.WireBytes {
+                       return true, proto.ErrInternalBadWireType
+               }
+               msg := new(StateGetResponse)
+               err := b.DecodeMessage(msg)
+               m.Response = &StateResponse_Get{msg}
+               return true, err
+       case 1001: // response.append
+               if wire != proto.WireBytes {
+                       return true, proto.ErrInternalBadWireType
+               }
+               msg := new(StateAppendResponse)
+               err := b.DecodeMessage(msg)
+               m.Response = &StateResponse_Append{msg}
+               return true, err
+       case 1002: // response.clear
+               if wire != proto.WireBytes {
+                       return true, proto.ErrInternalBadWireType
+               }
+               msg := new(StateClearResponse)
+               err := b.DecodeMessage(msg)
+               m.Response = &StateResponse_Clear{msg}
+               return true, err
+       default:
+               return false, nil
+       }
+func _StateResponse_OneofSizer(msg proto.Message) (n int) {
+       m := msg.(*StateResponse)
+       // response
+       switch x := m.Response.(type) {
+       case *StateResponse_Get:
+               s := proto.Size(x.Get)
+               n += proto.SizeVarint(1000<<3 | proto.WireBytes)
+               n += proto.SizeVarint(uint64(s))
+               n += s
+       case *StateResponse_Append:
+               s := proto.Size(x.Append)
+               n += proto.SizeVarint(1001<<3 | proto.WireBytes)
+               n += proto.SizeVarint(uint64(s))
+               n += s
+       case *StateResponse_Clear:
+               s := proto.Size(x.Clear)
+               n += proto.SizeVarint(1002<<3 | proto.WireBytes)
+               n += proto.SizeVarint(uint64(s))
+               n += s
+       case nil:
+       default:
+               panic(fmt.Sprintf("proto: unexpected type %T in oneof", x))
+       }
+       return n
+type StateKey struct {
+       // (Required) One of the following state keys must be set.
+       //
+       // Types that are valid to be assigned to Type:
+       //      *StateKey_Runner_
+       //      *StateKey_MultimapSideInput_
+       //      *StateKey_BagUserState_
+       Type isStateKey_Type `protobuf_oneof:"type"`
+func (m *StateKey) Reset()                    { *m = StateKey{} }
+func (m *StateKey) String() string            { return 
proto.CompactTextString(m) }
+func (*StateKey) ProtoMessage()               {}
+func (*StateKey) Descriptor() ([]byte, []int) { return fileDescriptor0, 
[]int{20} }
+type isStateKey_Type interface {
+       isStateKey_Type()
+type StateKey_Runner_ struct {
+       Runner *StateKey_Runner `protobuf:"bytes,1,opt,name=runner,oneof"`
+type StateKey_MultimapSideInput_ struct {
+       MultimapSideInput *StateKey_MultimapSideInput 
+type StateKey_BagUserState_ struct {
+       BagUserState *StateKey_BagUserState 
+func (*StateKey_Runner_) isStateKey_Type()            {}
+func (*StateKey_MultimapSideInput_) isStateKey_Type() {}
+func (*StateKey_BagUserState_) isStateKey_Type()      {}
+func (m *StateKey) GetType() isStateKey_Type {
+       if m != nil {
+               return m.Type
+       }
+       return nil
+func (m *StateKey) GetRunner() *StateKey_Runner {
+       if x, ok := m.GetType().(*StateKey_Runner_); ok {
+               return x.Runner
+       }
+       return nil
+func (m *StateKey) GetMultimapSideInput() *StateKey_MultimapSideInput {
+       if x, ok := m.GetType().(*StateKey_MultimapSideInput_); ok {
+               return x.MultimapSideInput
+       }
+       return nil
+func (m *StateKey) GetBagUserState() *StateKey_BagUserState {
+       if x, ok := m.GetType().(*StateKey_BagUserState_); ok {
+               return x.BagUserState
+       }
+       return nil
+// XXX_OneofFuncs is for the internal use of the proto package.
+func (*StateKey) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) 
error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), 
func(msg proto.Message) (n int), []interface{}) {
+       return _StateKey_OneofMarshaler, _StateKey_OneofUnmarshaler, 
_StateKey_OneofSizer, []interface{}{
+               (*StateKey_Runner_)(nil),
+               (*StateKey_MultimapSideInput_)(nil),
+               (*StateKey_BagUserState_)(nil),
+       }
+func _StateKey_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
+       m := msg.(*StateKey)
+       // type
+       switch x := m.Type.(type) {
+       case *StateKey_Runner_:
+               b.EncodeVarint(1<<3 | proto.WireBytes)
+               if err := b.EncodeMessage(x.Runner); err != nil {
+                       return err
+               }
+       case *StateKey_MultimapSideInput_:
+               b.EncodeVarint(2<<3 | proto.WireBytes)
+               if err := b.EncodeMessage(x.MultimapSideInput); err != nil {
+                       return err
+               }
+       case *StateKey_BagUserState_:
+               b.EncodeVarint(3<<3 | proto.WireBytes)
+               if err := b.EncodeMessage(x.BagUserState); err != nil {
+                       return err
+               }
+       case nil:
+       default:
+               return fmt.Errorf("StateKey.Type has unexpected type %T", x)
+       }
+       return nil
+func _StateKey_OneofUnmarshaler(msg proto.Message, tag, wire int, b 
*proto.Buffer) (bool, error) {
+       m := msg.(*StateKey)
+       switch tag {
+       case 1: // type.runner
+               if wire != proto.WireBytes {
+                       return true, proto.ErrInternalBadWireType
+               }
+               msg := new(StateKey_Runner)
+               err := b.DecodeMessage(msg)
+               m.Type = &StateKey_Runner_{msg}
+               return true, err
+       case 2: // type.multimap_side_input
+               if wire != proto.WireBytes {
+                       return true, proto.ErrInternalBadWireType
+               }
+               msg := new(StateKey_MultimapSideInput)
+               err := b.DecodeMessage(msg)
+               m.Type = &StateKey_MultimapSideInput_{msg}
+               return true, err
+       case 3: // type.bag_user_state
+               if wire != proto.WireBytes {
+                       return true, proto.ErrInternalBadWireType
+               }
+               msg := new(StateKey_BagUserState)
+               err := b.DecodeMessage(msg)
+               m.Type = &StateKey_BagUserState_{msg}
+               return true, err
+       default:
+               return false, nil
+       }
+func _StateKey_OneofSizer(msg proto.Message) (n int) {
+       m := msg.(*StateKey)
+       // type
+       switch x := m.Type.(type) {
+       case *StateKey_Runner_:
+               s := proto.Size(x.Runner)
+               n += proto.SizeVarint(1<<3 | proto.WireBytes)
+               n += proto.SizeVarint(uint64(s))
+               n += s
+       case *StateKey_MultimapSideInput_:
+               s := proto.Size(x.MultimapSideInput)
+               n += proto.SizeVarint(2<<3 | proto.WireBytes)
+               n += proto.SizeVarint(uint64(s))
+               n += s
+       case *StateKey_BagUserState_:
+               s := proto.Size(x.BagUserState)
+               n += proto.SizeVarint(3<<3 | proto.WireBytes)
+               n += proto.SizeVarint(uint64(s))
+               n += s
+       case nil:
+       default:
+               panic(fmt.Sprintf("proto: unexpected type %T in oneof", x))
+       }
+       return n
+type StateKey_Runner struct {
+       // (Required) Opaque information supplied by the runner. Used to support
+       // remote references.
+       Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
+func (m *StateKey_Runner) Reset()                    { *m = StateKey_Runner{} }
+func (m *StateKey_Runner) String() string            { return 
proto.CompactTextString(m) }
+func (*StateKey_Runner) ProtoMessage()               {}
+func (*StateKey_Runner) Descriptor() ([]byte, []int) { return fileDescriptor0, 
[]int{20, 0} }
+func (m *StateKey_Runner) GetKey() []byte {
+       if m != nil {
+               return m.Key
+       }
+       return nil
+type StateKey_MultimapSideInput struct {
+       // (Required) The id of the PTransform containing a side input.
+       PtransformId string 
+       // (Required) The id of the side input.
+       SideInputId string 
+       // (Required) The window (after mapping the currently executing elements
+       // window into the side input windows domain) encoded in a nested 
+       Window []byte `protobuf:"bytes,3,opt,name=window,proto3" 
+       // (Required) The key encoded in a nested context.
+       Key []byte `protobuf:"bytes,4,opt,name=key,proto3" json:"key,omitempty"`
+func (m *StateKey_MultimapSideInput) Reset()                    { *m = 
StateKey_MultimapSideInput{} }
+func (m *StateKey_MultimapSideInput) String() string            { return 
proto.CompactTextString(m) }
+func (*StateKey_MultimapSideInput) ProtoMessage()               {}
+func (*StateKey_MultimapSideInput) Descriptor() ([]byte, []int) { return 
fileDescriptor0, []int{20, 1} }
+func (m *StateKey_MultimapSideInput) GetPtransformId() string {
+       if m != nil {
+               return m.PtransformId
+       }
+       return ""
+func (m *StateKey_MultimapSideInput) GetSideInputId() string {
+       if m != nil {
+               return m.SideInputId
+       }
+       return ""
+func (m *StateKey_MultimapSideInput) GetWindow() []byte {
+       if m != nil {
+               return m.Window
+       }
+       return nil
+func (m *StateKey_MultimapSideInput) GetKey() []byte {
+       if m != nil {
+               return m.Key
+       }
+       return nil
+type StateKey_BagUserState struct {
+       // (Required) The id of the PTransform containing user state.
+       PtransformId string 
+       // (Required) The id of the user state.
+       UserStateId string 
+       // (Required) The window encoded in a nested context.
+       Window []byte `protobuf:"bytes,3,opt,name=window,proto3" 
+       // (Required) The key of the currently executing element encoded in a
+       // nested context.
+       Key []byte `protobuf:"bytes,4,opt,name=key,proto3" json:"key,omitempty"`
+func (m *StateKey_BagUserState) Reset()                    { *m = 
StateKey_BagUserState{} }
+func (m *StateKey_BagUserState) String() string            { return 
proto.CompactTextString(m) }
+func (*StateKey_BagUserState) ProtoMessage()               {}
+func (*StateKey_BagUserState) Descriptor() ([]byte, []int) { return 
fileDescriptor0, []int{20, 2} }
+func (m *StateKey_BagUserState) GetPtransformId() string {
+       if m != nil {
+               return m.PtransformId
+       }
+       return ""
+func (m *StateKey_BagUserState) GetUserStateId() string {
+       if m != nil {
+               return m.UserStateId
+       }
+       return ""
+func (m *StateKey_BagUserState) GetWindow() []byte {
+       if m != nil {
+               return m.Window
+       }
+       return nil
+func (m *StateKey_BagUserState) GetKey() []byte {
+       if m != nil {
+               return m.Key
+       }
+       return nil
+// A request to get state.
+type StateGetRequest struct {
+       // (Optional) If specified, signals to the runner that the response
+       // should resume from the following continuation token.
+       //
+       // If unspecified, signals to the runner that the response should start
+       // from the beginning of the logical continuable stream.
+       ContinuationToken []byte 
+func (m *StateGetRequest) Reset()                    { *m = StateGetRequest{} }
+func (m *StateGetRequest) String() string            { return 
proto.CompactTextString(m) }
+func (*StateGetRequest) ProtoMessage()               {}
+func (*StateGetRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, 
[]int{21} }
+func (m *StateGetRequest) GetContinuationToken() []byte {
+       if m != nil {
+               return m.ContinuationToken
+       }
+       return nil
+// A response to get state representing a logical byte stream which can be
+// continued using the state API.
+type StateGetResponse struct {
+       // (Optional) If specified, represents a token which can be used with 
+       // state API to get the next chunk of this logical byte stream. The end 
+       // the logical byte stream is signalled by this field being unset.
+       ContinuationToken []byte 
+       // Represents a part of a logical byte stream. Elements within
+       // the logical byte stream are encoded in the nested context and
+       // concatenated together.
+       Data []byte `protobuf:"bytes,2,opt,name=data,proto3" 
+func (m *StateGetResponse) Reset()                    { *m = 
StateGetResponse{} }
+func (m *StateGetResponse) String() string            { return 
proto.CompactTextString(m) }
+func (*StateGetResponse) ProtoMessage()               {}
+func (*StateGetResponse) Descriptor() ([]byte, []int) { return 
fileDescriptor0, []int{22} }
+func (m *StateGetResponse) GetContinuationToken() []byte {
+       if m != nil {
+               return m.ContinuationToken
+       }
+       return nil
+func (m *StateGetResponse) GetData() []byte {
+       if m != nil {
+               return m.Data
+       }
+       return nil
+// A request to append state.
+type StateAppendRequest struct {
+       // Represents a part of a logical byte stream. Elements within
+       // the logical byte stream are encoded in the nested context and
+       // multiple append requests are concatenated together.
+       Data []byte `protobuf:"bytes,1,opt,name=data,proto3" 
+func (m *StateAppendRequest) Reset()                    { *m = 
StateAppendRequest{} }
+func (m *StateAppendRequest) String() string            { return 
proto.CompactTextString(m) }
+func (*StateAppendRequest) ProtoMessage()               {}
+func (*StateAppendRequest) Descriptor() ([]byte, []int) { return 
fileDescriptor0, []int{23} }
+func (m *StateAppendRequest) GetData() []byte {
+       if m != nil {
+               return m.Data
+       }
+       return nil
+// A response to append state.
+type StateAppendResponse struct {
+func (m *StateAppendResponse) Reset()                    { *m = 
StateAppendResponse{} }
+func (m *StateAppendResponse) String() string            { return 
proto.CompactTextString(m) }
+func (*StateAppendResponse) ProtoMessage()               {}
+func (*StateAppendResponse) Descriptor() ([]byte, []int) { return 
fileDescriptor0, []int{24} }
+// A request to clear state.
+type StateClearRequest struct {
+func (m *StateClearRequest) Reset()                    { *m = 
StateClearRequest{} }
+func (m *StateClearRequest) String() string            { return 
proto.CompactTextString(m) }
+func (*StateClearRequest) ProtoMessage()               {}
+func (*StateClearRequest) Descriptor() ([]byte, []int) { return 
fileDescriptor0, []int{25} }
+// A response to clear state.
+type StateClearResponse struct {
+func (m *StateClearResponse) Reset()                    { *m = 
StateClearResponse{} }
+func (m *StateClearResponse) String() string            { return 
proto.CompactTextString(m) }
+func (*StateClearResponse) ProtoMessage()               {}
+func (*StateClearResponse) Descriptor() ([]byte, []int) { return 
fileDescriptor0, []int{26} }
+// A log entry
+type LogEntry struct {
+       // (Required) The severity of the log statement.
+       Severity LogEntry_Severity_Enum 
+       // (Required) The time at which this log statement occurred.
+       Timestamp *google_protobuf1.Timestamp 
`protobuf:"bytes,2,opt,name=timestamp" json:"timestamp,omitempty"`
+       // (Required) A human readable message.
+       Message string `protobuf:"bytes,3,opt,name=message" 
+       // (Optional) An optional trace of the functions involved. For example, 
+       // Java this can include multiple causes and multiple suppressed 
+       Trace string `protobuf:"bytes,4,opt,name=trace" json:"trace,omitempty"`
+       // (Optional) A reference to the instruction this log statement is 
+       // with.
+       InstructionReference string 
+       // (Optional) A reference to the primitive transform this log statement 
+       // associated with.
+       PrimitiveTransformReference string 
+       // (Optional) Human-readable name of the function or method being 
+       // with optional context such as the class or package name. The format 
+       // vary by language. For example:
+       //   qual.if.ied.Class.method (Java)
+       //   dir/package.func (Go)
+       //   module.function (Python)
+       // (C++)
+       LogLocation string 
+       // (Optional) The name of the thread this log statement is associated 
+       Thread string `protobuf:"bytes,8,opt,name=thread" 
+func (m *LogEntry) Reset()                    { *m = LogEntry{} }
+func (m *LogEntry) String() string            { return 
proto.CompactTextString(m) }
+func (*LogEntry) ProtoMessage()               {}
+func (*LogEntry) Descriptor() ([]byte, []int) { return fileDescriptor0, 
[]int{27} }
+func (m *LogEntry) GetSeverity() LogEntry_Severity_Enum {
+       if m != nil {
+               return m.Severity
+       }
+       return LogEntry_Severity_UNSPECIFIED
+func (m *LogEntry) GetTimestamp() *google_protobuf1.Timestamp {
+       if m != nil {
+               return m.Timestamp
+       }
+       return nil
+func (m *LogEntry) GetMessage() string {
+       if m != nil {
+               return m.Message
+       }
+       return ""
+func (m *LogEntry) GetTrace() string {
+       if m != nil {
+               return m.Trace
+       }
+       return ""
+func (m *LogEntry) GetInstructionReference() string {
+       if m != nil {
+               return m.InstructionReference
+       }
+       return ""
+func (m *LogEntry) GetPrimitiveTransformReference() string {
+       if m != nil {
+               return m.PrimitiveTransformReference
+       }
+       return ""
+func (m *LogEntry) GetLogLocation() string {
+       if m != nil {
+               return m.LogLocation
+       }
+       return ""
+func (m *LogEntry) GetThread() string {
+       if m != nil {
+               return m.Thread
+       }
+       return ""
+// A list of log entries, enables buffering and batching of multiple
+// log messages using the logging API.
+type LogEntry_List struct {
+       // (Required) One or or more log messages.
+       LogEntries []*LogEntry 
+func (m *LogEntry_List) Reset()                    { *m = LogEntry_List{} }
+func (m *LogEntry_List) String() string            { return 
proto.CompactTextString(m) }
+func (*LogEntry_List) ProtoMessage()               {}
+func (*LogEntry_List) Descriptor() ([]byte, []int) { return fileDescriptor0, 
[]int{27, 0} }
+func (m *LogEntry_List) GetLogEntries() []*LogEntry {
+       if m != nil {
+               return m.LogEntries
+       }
+       return nil
+// The severity of the event described in a log entry, expressed as one of the
+// severity levels listed below. For your reference, the levels are
+// assigned the listed numeric values. The effect of using numeric values
+// other than those listed is undefined.
+// If you are writing log entries, you should map other severity encodings to
+// one of these standard levels. For example, you might map all of
+// Java's FINE, FINER, and FINEST levels to `Severity.DEBUG`.
+// This list is intentionally not comprehensive; the intent is to provide a
+// common set of "good enough" severity levels so that logging front ends
+// can provide filtering and searching across log types. Users of the API are
+// free not to use all severity levels in their log messages.
+type LogEntry_Severity struct {
+func (m *LogEntry_Severity) Reset()                    { *m = 
LogEntry_Severity{} }
+func (m *LogEntry_Severity) String() string            { return 
proto.CompactTextString(m) }
+func (*LogEntry_Severity) ProtoMessage()               {}
+func (*LogEntry_Severity) Descriptor() ([]byte, []int) { return 
fileDescriptor0, []int{27, 1} }
+type LogControl struct {
+func (m *LogControl) Reset()                    { *m = LogControl{} }
+func (m *LogControl) String() string            { return 
proto.CompactTextString(m) }
+func (*LogControl) ProtoMessage()               {}
+func (*LogControl) Descriptor() ([]byte, []int) { return fileDescriptor0, 
[]int{28} }
+// A Docker container configuration for launching the SDK harness to execute
+// user specified functions.
+type DockerContainer struct {
+       // (Required) A pipeline level unique id which can be used as a 
reference to
+       // refer to this.
+       Id string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"`
+       // (Required) The Docker container URI
+       // For example ""
+       Uri string `protobuf:"bytes,2,opt,name=uri" json:"uri,omitempty"`
+       // (Optional) Docker registry specification.
+       // If unspecified, the uri is expected to be able to be fetched without
+       // requiring additional configuration by a runner.
+       RegistryReference string 
+func (m *DockerContainer) Reset()                    { *m = DockerContainer{} }
+func (m *DockerContainer) String() string            { return 
proto.CompactTextString(m) }
+func (*DockerContainer) ProtoMessage()               {}
+func (*DockerContainer) Descriptor() ([]byte, []int) { return fileDescriptor0, 
[]int{29} }
+func (m *DockerContainer) GetId() string {
+       if m != nil {
+               return m.Id
+       }
+       return ""
+func (m *DockerContainer) GetUri() string {
+       if m != nil {
+               return m.Uri
+       }
+       return ""
+func (m *DockerContainer) GetRegistryReference() string {
+       if m != nil {
+               return m.RegistryReference
+       }
+       return ""
+func init() {
+       proto.RegisterType((*Target)(nil), 
+       proto.RegisterType((*Target_List)(nil), 
+       proto.RegisterType((*RemoteGrpcPort)(nil), 
+       proto.RegisterType((*InstructionRequest)(nil), 
+       proto.RegisterType((*InstructionResponse)(nil), 
+       proto.RegisterType((*RegisterRequest)(nil), 
+       proto.RegisterType((*RegisterResponse)(nil), 
+       proto.RegisterType((*ProcessBundleDescriptor)(nil), 
+       proto.RegisterType((*ProcessBundleRequest)(nil), 
+       proto.RegisterType((*ProcessBundleResponse)(nil), 
+       proto.RegisterType((*ProcessBundleProgressRequest)(nil), 
+       proto.RegisterType((*Metrics)(nil), 
+       proto.RegisterType((*Metrics_PTransform)(nil), 
+       proto.RegisterType((*Metrics_PTransform_Measured)(nil), 
+       proto.RegisterType((*Metrics_PTransform_ProcessedElements)(nil), 
+       proto.RegisterType((*Metrics_PTransform_ActiveElements)(nil), 
+       proto.RegisterType((*Metrics_User)(nil), 
+       proto.RegisterType((*ProcessBundleProgressResponse)(nil), 
+       proto.RegisterType((*ProcessBundleSplitRequest)(nil), 
+       proto.RegisterType((*ElementCountRestriction)(nil), 
+       proto.RegisterType((*ElementCountSkipRestriction)(nil), 
+       proto.RegisterType((*PrimitiveTransformSplit)(nil), 
+       proto.RegisterType((*ProcessBundleSplitResponse)(nil), 
+       proto.RegisterType((*Elements)(nil), 
+       proto.RegisterType((*Elements_Data)(nil), 
+       proto.RegisterType((*StateRequest)(nil), 
+       proto.RegisterType((*StateResponse)(nil), 
+       proto.RegisterType((*StateKey)(nil), 
+       proto.RegisterType((*StateKey_Runner)(nil), 
+       proto.RegisterType((*StateKey_MultimapSideInput)(nil), 
+       proto.RegisterType((*StateKey_BagUserState)(nil), 
+       proto.RegisterType((*StateGetRequest)(nil), 
+       proto.RegisterType((*StateGetResponse)(nil), 
+       proto.RegisterType((*StateAppendRequest)(nil), 
+       proto.RegisterType((*StateAppendResponse)(nil), 
+       proto.RegisterType((*StateClearRequest)(nil), 
+       proto.RegisterType((*StateClearResponse)(nil), 
+       proto.RegisterType((*LogEntry)(nil), 
+       proto.RegisterType((*LogEntry_List)(nil), 
+       proto.RegisterType((*LogEntry_Severity)(nil), 
+       proto.RegisterType((*LogControl)(nil), 
+       proto.RegisterType((*DockerContainer)(nil), 
 LogEntry_Severity_Enum_name, LogEntry_Severity_Enum_value)
+// Reference imports to suppress errors if they are not otherwise used.
+var _ context.Context
+var _ grpc.ClientConn
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the grpc package it is being compiled against.
+const _ = grpc.SupportPackageIsVersion4
+// Client API for BeamFnControl service
+type BeamFnControlClient interface {
+       // Instructions sent by the runner to the SDK requesting different types
+       // of work.
+       Control(ctx context.Context, opts ...grpc.CallOption) 
(BeamFnControl_ControlClient, error)
+type beamFnControlClient struct {
+       cc *grpc.ClientConn
+func NewBeamFnControlClient(cc *grpc.ClientConn) BeamFnControlClient {
+       return &beamFnControlClient{cc}
+func (c *beamFnControlClient) Control(ctx context.Context, opts 
...grpc.CallOption) (BeamFnControl_ControlClient, error) {
+       stream, err := grpc.NewClientStream(ctx, 
"/org.apache.beam.model.fn_execution.v1.BeamFnControl/Control", opts...)
+       if err != nil {
+               return nil, err
+       }
+       x := &beamFnControlControlClient{stream}
+       return x, nil
+type BeamFnControl_ControlClient interface {
+       Send(*InstructionResponse) error
+       Recv() (*InstructionRequest, error)
+       grpc.ClientStream
+type beamFnControlControlClient struct {
+       grpc.ClientStream
+func (x *beamFnControlControlClient) Send(m *InstructionResponse) error {
+       return x.ClientStream.SendMsg(m)
+func (x *beamFnControlControlClient) Recv() (*InstructionRequest, error) {
+       m := new(InstructionRequest)
+       if err := x.ClientStream.RecvMsg(m); err != nil {
+               return nil, err
+       }
+       return m, nil
+// Server API for BeamFnControl service
+type BeamFnControlServer interface {
+       // Instructions sent by the runner to the SDK requesting different types
+       // of work.
+       Control(BeamFnControl_ControlServer) error
+func RegisterBeamFnControlServer(s *grpc.Server, srv BeamFnControlServer) {
+       s.RegisterService(&_BeamFnControl_serviceDesc, srv)
+func _BeamFnControl_Control_Handler(srv interface{}, stream grpc.ServerStream) 
error {
+       return 
+type BeamFnControl_ControlServer interface {
+       Send(*InstructionRequest) error
+       Recv() (*InstructionResponse, error)
+       grpc.ServerStream
+type beamFnControlControlServer struct {
+       grpc.ServerStream
+func (x *beamFnControlControlServer) Send(m *InstructionRequest) error {
+       return x.ServerStream.SendMsg(m)
+func (x *beamFnControlControlServer) Recv() (*InstructionResponse, error) {
+       m := new(InstructionResponse)
+       if err := x.ServerStream.RecvMsg(m); err != nil {
+               return nil, err
+       }
+       return m, nil
+var _BeamFnControl_serviceDesc = grpc.ServiceDesc{
+       ServiceName: "org.apache.beam.model.fn_execution.v1.BeamFnControl",
+       HandlerType: (*BeamFnControlServer)(nil),
+       Methods:     []grpc.MethodDesc{},
+       Streams: []grpc.StreamDesc{
+               {
+                       StreamName:    "Control",
+                       Handler:       _BeamFnControl_Control_Handler,
+                       ServerStreams: true,
+                       ClientStreams: true,
+               },
+       },
+       Metadata: "beam_fn_api.proto",
+// Client API for BeamFnData service
+type BeamFnDataClient interface {
+       // Used to send data between harnesses.
+       Data(ctx context.Context, opts ...grpc.CallOption) 
(BeamFnData_DataClient, error)
+type beamFnDataClient struct {
+       cc *grpc.ClientConn
+func NewBeamFnDataClient(cc *grpc.ClientConn) BeamFnDataClient {
+       return &beamFnDataClient{cc}
+func (c *beamFnDataClient) Data(ctx context.Context, opts ...grpc.CallOption) 
(BeamFnData_DataClient, error) {
+       stream, err := grpc.NewClientStream(ctx, 
"/org.apache.beam.model.fn_execution.v1.BeamFnData/Data", opts...)
+       if err != nil {
+               return nil, err
+       }
+       x := &beamFnDataDataClient{stream}
+       return x, nil
+type BeamFnData_DataClient interface {
+       Send(*Elements) error
+       Recv() (*Elements, error)
+       grpc.ClientStream
+type beamFnDataDataClient struct {
+       grpc.ClientStream
+func (x *beamFnDataDataClient) Send(m *Elements) error {
+       return x.ClientStream.SendMsg(m)
+func (x *beamFnDataDataClient) Recv() (*Elements, error) {
+       m := new(Elements)
+       if err := x.ClientStream.RecvMsg(m); err != nil {
+               return nil, err
+       }
+       return m, nil
+// Server API for BeamFnData service
+type BeamFnDataServer interface {
+       // Used to send data between harnesses.
+       Data(BeamFnData_DataServer) error
+func RegisterBeamFnDataServer(s *grpc.Server, srv BeamFnDataServer) {
+       s.RegisterService(&_BeamFnData_serviceDesc, srv)
+func _BeamFnData_Data_Handler(srv interface{}, stream grpc.ServerStream) error 
+       return srv.(BeamFnDataServer).Data(&beamFnDataDataServer{stream})
+type BeamFnData_DataServer interface {
+       Send(*Elements) error
+       Recv() (*Elements, error)
+       grpc.ServerStream
+type beamFnDataDataServer struct {
+       grpc.ServerStream
+func (x *beamFnDataDataServer) Send(m *Elements) error {
+       return x.ServerStream.SendMsg(m)
+func (x *beamFnDataDataServer) Recv() (*Elements, error) {
+       m := new(Elements)
+       if err := x.ServerStream.RecvMsg(m); err != nil {
+               return nil, err
+       }
+       return m, nil
+var _BeamFnData_serviceDesc = grpc.ServiceDesc{
+       ServiceName: "org.apache.beam.model.fn_execution.v1.BeamFnData",
+       HandlerType: (*BeamFnDataServer)(nil),
+       Methods:     []grpc.MethodDesc{},
+       Streams: []grpc.StreamDesc{
+               {
+                       StreamName:    "Data",
+                       Handler:       _BeamFnData_Data_Handler,
+                       ServerStreams: true,
+                       ClientStreams: true,
+               },
+       },
+       Metadata: "beam_fn_api.proto",
+// Client API for BeamFnState service
+type BeamFnStateClient interface {
+       // Used to get/append/clear state stored by the runner on behalf of the 
+       State(ctx context.Context, opts ...grpc.CallOption) 
(BeamFnState_StateClient, error)
+type beamFnStateClient struct {
+       cc *grpc.ClientConn
+func NewBeamFnStateClient(cc *grpc.ClientConn) BeamFnStateClient {
+       return &beamFnStateC


Reply via email to