http://git-wip-us.apache.org/repos/asf/beam/blob/42100456/sdks/go/pkg/beam/model/pipeline_v1/beam_runner_api.pb.go ---------------------------------------------------------------------- diff --git a/sdks/go/pkg/beam/model/pipeline_v1/beam_runner_api.pb.go b/sdks/go/pkg/beam/model/pipeline_v1/beam_runner_api.pb.go new file mode 100644 index 0000000..31dc53e --- /dev/null +++ b/sdks/go/pkg/beam/model/pipeline_v1/beam_runner_api.pb.go @@ -0,0 +1,3491 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: beam_runner_api.proto + +/* +Package pipeline_v1 is a generated protocol buffer package. + +It is generated from these files: + beam_runner_api.proto + endpoints.proto + standard_window_fns.proto + +It has these top-level messages: + Components + MessageWithComponents + Pipeline + PTransform + PCollection + ParDoPayload + Parameter + StateSpec + ValueStateSpec + BagStateSpec + CombiningStateSpec + MapStateSpec + SetStateSpec + TimerSpec + IsBounded + ReadPayload + WindowIntoPayload + CombinePayload + TestStreamPayload + WriteFilesPayload + Coder + WindowingStrategy + MergeStatus + AccumulationMode + ClosingBehavior + OnTimeBehavior + OutputTime + TimeDomain + Trigger + TimestampTransform + SideInput + Environment + SdkFunctionSpec + FunctionSpec + DisplayData + ApiServiceDescriptor + OAuth2ClientCredentialsGrant + FixedWindowsPayload + SlidingWindowsPayload + SessionsPayload +*/ +package pipeline_v1 + +import proto "github.com/golang/protobuf/proto" +import fmt "fmt" +import math "math" +import google_protobuf "github.com/golang/protobuf/ptypes/any" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type Parameter_Type_Enum int32 + +const ( + Parameter_Type_UNSPECIFIED Parameter_Type_Enum = 0 + Parameter_Type_WINDOW Parameter_Type_Enum = 1 + Parameter_Type_PIPELINE_OPTIONS Parameter_Type_Enum = 2 + Parameter_Type_RESTRICTION_TRACKER Parameter_Type_Enum = 3 +) + +var Parameter_Type_Enum_name = map[int32]string{ + 0: "UNSPECIFIED", + 1: "WINDOW", + 2: "PIPELINE_OPTIONS", + 3: "RESTRICTION_TRACKER", +} +var Parameter_Type_Enum_value = map[string]int32{ + "UNSPECIFIED": 0, + "WINDOW": 1, + "PIPELINE_OPTIONS": 2, + "RESTRICTION_TRACKER": 3, +} + +func (x Parameter_Type_Enum) String() string { + return proto.EnumName(Parameter_Type_Enum_name, int32(x)) +} +func (Parameter_Type_Enum) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{6, 0, 0} } + +type IsBounded_Enum int32 + +const ( + IsBounded_UNSPECIFIED IsBounded_Enum = 0 + IsBounded_UNBOUNDED IsBounded_Enum = 1 + IsBounded_BOUNDED IsBounded_Enum = 2 +) + +var IsBounded_Enum_name = map[int32]string{ + 0: "UNSPECIFIED", + 1: "UNBOUNDED", + 2: "BOUNDED", +} +var IsBounded_Enum_value = map[string]int32{ + "UNSPECIFIED": 0, + "UNBOUNDED": 1, + "BOUNDED": 2, +} + +func (x IsBounded_Enum) String() string { + return proto.EnumName(IsBounded_Enum_name, int32(x)) +} +func (IsBounded_Enum) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{14, 0} } + +type MergeStatus_Enum int32 + +const ( + MergeStatus_UNSPECIFIED MergeStatus_Enum = 0 + // The WindowFn does not require merging. + // Examples: global window, FixedWindows, SlidingWindows + MergeStatus_NON_MERGING MergeStatus_Enum = 1 + // The WindowFn is merging and the PCollection has not had merging + // performed. + // Example: Sessions prior to a GroupByKey + MergeStatus_NEEDS_MERGE MergeStatus_Enum = 2 + // The WindowFn is merging and the PCollection has had merging occur + // already. + // Example: Sessions after a GroupByKey + MergeStatus_ALREADY_MERGED MergeStatus_Enum = 3 +) + +var MergeStatus_Enum_name = map[int32]string{ + 0: "UNSPECIFIED", + 1: "NON_MERGING", + 2: "NEEDS_MERGE", + 3: "ALREADY_MERGED", +} +var MergeStatus_Enum_value = map[string]int32{ + "UNSPECIFIED": 0, + "NON_MERGING": 1, + "NEEDS_MERGE": 2, + "ALREADY_MERGED": 3, +} + +func (x MergeStatus_Enum) String() string { + return proto.EnumName(MergeStatus_Enum_name, int32(x)) +} +func (MergeStatus_Enum) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{22, 0} } + +type AccumulationMode_Enum int32 + +const ( + AccumulationMode_UNSPECIFIED AccumulationMode_Enum = 0 + // The aggregation is discarded when it is output + AccumulationMode_DISCARDING AccumulationMode_Enum = 1 + // The aggregation is accumulated across outputs + AccumulationMode_ACCUMULATING AccumulationMode_Enum = 2 +) + +var AccumulationMode_Enum_name = map[int32]string{ + 0: "UNSPECIFIED", + 1: "DISCARDING", + 2: "ACCUMULATING", +} +var AccumulationMode_Enum_value = map[string]int32{ + "UNSPECIFIED": 0, + "DISCARDING": 1, + "ACCUMULATING": 2, +} + +func (x AccumulationMode_Enum) String() string { + return proto.EnumName(AccumulationMode_Enum_name, int32(x)) +} +func (AccumulationMode_Enum) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{23, 0} } + +type ClosingBehavior_Enum int32 + +const ( + ClosingBehavior_UNSPECIFIED ClosingBehavior_Enum = 0 + // Emit output when a window expires, whether or not there has been + // any new data since the last output. + ClosingBehavior_EMIT_ALWAYS ClosingBehavior_Enum = 1 + // Only emit output when new data has arrives since the last output + ClosingBehavior_EMIT_IF_NONEMPTY ClosingBehavior_Enum = 2 +) + +var ClosingBehavior_Enum_name = map[int32]string{ + 0: "UNSPECIFIED", + 1: "EMIT_ALWAYS", + 2: "EMIT_IF_NONEMPTY", +} +var ClosingBehavior_Enum_value = map[string]int32{ + "UNSPECIFIED": 0, + "EMIT_ALWAYS": 1, + "EMIT_IF_NONEMPTY": 2, +} + +func (x ClosingBehavior_Enum) String() string { + return proto.EnumName(ClosingBehavior_Enum_name, int32(x)) +} +func (ClosingBehavior_Enum) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{24, 0} } + +type OnTimeBehavior_Enum int32 + +const ( + OnTimeBehavior_UNSPECIFIED OnTimeBehavior_Enum = 0 + // Always fire the on-time pane. Even if there is no new data since + // the previous firing, an element will be produced. + OnTimeBehavior_FIRE_ALWAYS OnTimeBehavior_Enum = 1 + // Only fire the on-time pane if there is new data since the previous firing. + OnTimeBehavior_FIRE_IF_NONEMPTY OnTimeBehavior_Enum = 2 +) + +var OnTimeBehavior_Enum_name = map[int32]string{ + 0: "UNSPECIFIED", + 1: "FIRE_ALWAYS", + 2: "FIRE_IF_NONEMPTY", +} +var OnTimeBehavior_Enum_value = map[string]int32{ + "UNSPECIFIED": 0, + "FIRE_ALWAYS": 1, + "FIRE_IF_NONEMPTY": 2, +} + +func (x OnTimeBehavior_Enum) String() string { + return proto.EnumName(OnTimeBehavior_Enum_name, int32(x)) +} +func (OnTimeBehavior_Enum) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{25, 0} } + +type OutputTime_Enum int32 + +const ( + OutputTime_UNSPECIFIED OutputTime_Enum = 0 + // The output has the timestamp of the end of the window. + OutputTime_END_OF_WINDOW OutputTime_Enum = 1 + // The output has the latest timestamp of the input elements since + // the last output. + OutputTime_LATEST_IN_PANE OutputTime_Enum = 2 + // The output has the earliest timestamp of the input elements since + // the last output. + OutputTime_EARLIEST_IN_PANE OutputTime_Enum = 3 +) + +var OutputTime_Enum_name = map[int32]string{ + 0: "UNSPECIFIED", + 1: "END_OF_WINDOW", + 2: "LATEST_IN_PANE", + 3: "EARLIEST_IN_PANE", +} +var OutputTime_Enum_value = map[string]int32{ + "UNSPECIFIED": 0, + "END_OF_WINDOW": 1, + "LATEST_IN_PANE": 2, + "EARLIEST_IN_PANE": 3, +} + +func (x OutputTime_Enum) String() string { + return proto.EnumName(OutputTime_Enum_name, int32(x)) +} +func (OutputTime_Enum) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{26, 0} } + +type TimeDomain_Enum int32 + +const ( + TimeDomain_UNSPECIFIED TimeDomain_Enum = 0 + // Event time is time from the perspective of the data + TimeDomain_EVENT_TIME TimeDomain_Enum = 1 + // Processing time is time from the perspective of the + // execution of your pipeline + TimeDomain_PROCESSING_TIME TimeDomain_Enum = 2 + // Synchronized processing time is the minimum of the + // processing time of all pending elements. + // + // The "processing time" of an element refers to + // the local processing time at which it was emitted + TimeDomain_SYNCHRONIZED_PROCESSING_TIME TimeDomain_Enum = 3 +) + +var TimeDomain_Enum_name = map[int32]string{ + 0: "UNSPECIFIED", + 1: "EVENT_TIME", + 2: "PROCESSING_TIME", + 3: "SYNCHRONIZED_PROCESSING_TIME", +} +var TimeDomain_Enum_value = map[string]int32{ + "UNSPECIFIED": 0, + "EVENT_TIME": 1, + "PROCESSING_TIME": 2, + "SYNCHRONIZED_PROCESSING_TIME": 3, +} + +func (x TimeDomain_Enum) String() string { + return proto.EnumName(TimeDomain_Enum_name, int32(x)) +} +func (TimeDomain_Enum) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{27, 0} } + +type DisplayData_Type_Enum int32 + +const ( + DisplayData_Type_UNSPECIFIED DisplayData_Type_Enum = 0 + DisplayData_Type_STRING DisplayData_Type_Enum = 1 + DisplayData_Type_INTEGER DisplayData_Type_Enum = 2 + DisplayData_Type_FLOAT DisplayData_Type_Enum = 3 + DisplayData_Type_BOOLEAN DisplayData_Type_Enum = 4 + DisplayData_Type_TIMESTAMP DisplayData_Type_Enum = 5 + DisplayData_Type_DURATION DisplayData_Type_Enum = 6 + DisplayData_Type_JAVA_CLASS DisplayData_Type_Enum = 7 +) + +var DisplayData_Type_Enum_name = map[int32]string{ + 0: "UNSPECIFIED", + 1: "STRING", + 2: "INTEGER", + 3: "FLOAT", + 4: "BOOLEAN", + 5: "TIMESTAMP", + 6: "DURATION", + 7: "JAVA_CLASS", +} +var DisplayData_Type_Enum_value = map[string]int32{ + "UNSPECIFIED": 0, + "STRING": 1, + "INTEGER": 2, + "FLOAT": 3, + "BOOLEAN": 4, + "TIMESTAMP": 5, + "DURATION": 6, + "JAVA_CLASS": 7, +} + +func (x DisplayData_Type_Enum) String() string { + return proto.EnumName(DisplayData_Type_Enum_name, int32(x)) +} +func (DisplayData_Type_Enum) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{34, 2, 0} } + +// A set of mappings from id to message. This is included as an optional field +// on any proto message that may contain references needing resolution. +type Components struct { + // (Required) A map from pipeline-scoped id to PTransform. + Transforms map[string]*PTransform `protobuf:"bytes,1,rep,name=transforms" json:"transforms,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + // (Required) A map from pipeline-scoped id to PCollection. + Pcollections map[string]*PCollection `protobuf:"bytes,2,rep,name=pcollections" json:"pcollections,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + // (Required) A map from pipeline-scoped id to WindowingStrategy. + WindowingStrategies map[string]*WindowingStrategy `protobuf:"bytes,3,rep,name=windowing_strategies,json=windowingStrategies" json:"windowing_strategies,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + // (Required) A map from pipeline-scoped id to Coder. + Coders map[string]*Coder `protobuf:"bytes,4,rep,name=coders" json:"coders,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + // (Required) A map from pipeline-scoped id to Environment. + Environments map[string]*Environment `protobuf:"bytes,5,rep,name=environments" json:"environments,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` +} + +func (m *Components) Reset() { *m = Components{} } +func (m *Components) String() string { return proto.CompactTextString(m) } +func (*Components) ProtoMessage() {} +func (*Components) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } + +func (m *Components) GetTransforms() map[string]*PTransform { + if m != nil { + return m.Transforms + } + return nil +} + +func (m *Components) GetPcollections() map[string]*PCollection { + if m != nil { + return m.Pcollections + } + return nil +} + +func (m *Components) GetWindowingStrategies() map[string]*WindowingStrategy { + if m != nil { + return m.WindowingStrategies + } + return nil +} + +func (m *Components) GetCoders() map[string]*Coder { + if m != nil { + return m.Coders + } + return nil +} + +func (m *Components) GetEnvironments() map[string]*Environment { + if m != nil { + return m.Environments + } + return nil +} + +// A disjoint union of all the things that may contain references +// that require Components to resolve. +type MessageWithComponents struct { + // (Optional) The by-reference components of the root message, + // enabling a standalone message. + // + // If this is absent, it is expected that there are no + // references. + Components *Components `protobuf:"bytes,1,opt,name=components" json:"components,omitempty"` + // (Required) The root message that may contain pointers + // that should be resolved by looking inside components. + // + // Types that are valid to be assigned to Root: + // *MessageWithComponents_Coder + // *MessageWithComponents_CombinePayload + // *MessageWithComponents_SdkFunctionSpec + // *MessageWithComponents_ParDoPayload + // *MessageWithComponents_Ptransform + // *MessageWithComponents_Pcollection + // *MessageWithComponents_ReadPayload + // *MessageWithComponents_SideInput + // *MessageWithComponents_WindowIntoPayload + // *MessageWithComponents_WindowingStrategy + // *MessageWithComponents_FunctionSpec + Root isMessageWithComponents_Root `protobuf_oneof:"root"` +} + +func (m *MessageWithComponents) Reset() { *m = MessageWithComponents{} } +func (m *MessageWithComponents) String() string { return proto.CompactTextString(m) } +func (*MessageWithComponents) ProtoMessage() {} +func (*MessageWithComponents) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +type isMessageWithComponents_Root interface { + isMessageWithComponents_Root() +} + +type MessageWithComponents_Coder struct { + Coder *Coder `protobuf:"bytes,2,opt,name=coder,oneof"` +} +type MessageWithComponents_CombinePayload struct { + CombinePayload *CombinePayload `protobuf:"bytes,3,opt,name=combine_payload,json=combinePayload,oneof"` +} +type MessageWithComponents_SdkFunctionSpec struct { + SdkFunctionSpec *SdkFunctionSpec `protobuf:"bytes,4,opt,name=sdk_function_spec,json=sdkFunctionSpec,oneof"` +} +type MessageWithComponents_ParDoPayload struct { + ParDoPayload *ParDoPayload `protobuf:"bytes,6,opt,name=par_do_payload,json=parDoPayload,oneof"` +} +type MessageWithComponents_Ptransform struct { + Ptransform *PTransform `protobuf:"bytes,7,opt,name=ptransform,oneof"` +} +type MessageWithComponents_Pcollection struct { + Pcollection *PCollection `protobuf:"bytes,8,opt,name=pcollection,oneof"` +} +type MessageWithComponents_ReadPayload struct { + ReadPayload *ReadPayload `protobuf:"bytes,9,opt,name=read_payload,json=readPayload,oneof"` +} +type MessageWithComponents_SideInput struct { + SideInput *SideInput `protobuf:"bytes,11,opt,name=side_input,json=sideInput,oneof"` +} +type MessageWithComponents_WindowIntoPayload struct { + WindowIntoPayload *WindowIntoPayload `protobuf:"bytes,12,opt,name=window_into_payload,json=windowIntoPayload,oneof"` +} +type MessageWithComponents_WindowingStrategy struct { + WindowingStrategy *WindowingStrategy `protobuf:"bytes,13,opt,name=windowing_strategy,json=windowingStrategy,oneof"` +} +type MessageWithComponents_FunctionSpec struct { + FunctionSpec *FunctionSpec `protobuf:"bytes,14,opt,name=function_spec,json=functionSpec,oneof"` +} + +func (*MessageWithComponents_Coder) isMessageWithComponents_Root() {} +func (*MessageWithComponents_CombinePayload) isMessageWithComponents_Root() {} +func (*MessageWithComponents_SdkFunctionSpec) isMessageWithComponents_Root() {} +func (*MessageWithComponents_ParDoPayload) isMessageWithComponents_Root() {} +func (*MessageWithComponents_Ptransform) isMessageWithComponents_Root() {} +func (*MessageWithComponents_Pcollection) isMessageWithComponents_Root() {} +func (*MessageWithComponents_ReadPayload) isMessageWithComponents_Root() {} +func (*MessageWithComponents_SideInput) isMessageWithComponents_Root() {} +func (*MessageWithComponents_WindowIntoPayload) isMessageWithComponents_Root() {} +func (*MessageWithComponents_WindowingStrategy) isMessageWithComponents_Root() {} +func (*MessageWithComponents_FunctionSpec) isMessageWithComponents_Root() {} + +func (m *MessageWithComponents) GetRoot() isMessageWithComponents_Root { + if m != nil { + return m.Root + } + return nil +} + +func (m *MessageWithComponents) GetComponents() *Components { + if m != nil { + return m.Components + } + return nil +} + +func (m *MessageWithComponents) GetCoder() *Coder { + if x, ok := m.GetRoot().(*MessageWithComponents_Coder); ok { + return x.Coder + } + return nil +} + +func (m *MessageWithComponents) GetCombinePayload() *CombinePayload { + if x, ok := m.GetRoot().(*MessageWithComponents_CombinePayload); ok { + return x.CombinePayload + } + return nil +} + +func (m *MessageWithComponents) GetSdkFunctionSpec() *SdkFunctionSpec { + if x, ok := m.GetRoot().(*MessageWithComponents_SdkFunctionSpec); ok { + return x.SdkFunctionSpec + } + return nil +} + +func (m *MessageWithComponents) GetParDoPayload() *ParDoPayload { + if x, ok := m.GetRoot().(*MessageWithComponents_ParDoPayload); ok { + return x.ParDoPayload + } + return nil +} + +func (m *MessageWithComponents) GetPtransform() *PTransform { + if x, ok := m.GetRoot().(*MessageWithComponents_Ptransform); ok { + return x.Ptransform + } + return nil +} + +func (m *MessageWithComponents) GetPcollection() *PCollection { + if x, ok := m.GetRoot().(*MessageWithComponents_Pcollection); ok { + return x.Pcollection + } + return nil +} + +func (m *MessageWithComponents) GetReadPayload() *ReadPayload { + if x, ok := m.GetRoot().(*MessageWithComponents_ReadPayload); ok { + return x.ReadPayload + } + return nil +} + +func (m *MessageWithComponents) GetSideInput() *SideInput { + if x, ok := m.GetRoot().(*MessageWithComponents_SideInput); ok { + return x.SideInput + } + return nil +} + +func (m *MessageWithComponents) GetWindowIntoPayload() *WindowIntoPayload { + if x, ok := m.GetRoot().(*MessageWithComponents_WindowIntoPayload); ok { + return x.WindowIntoPayload + } + return nil +} + +func (m *MessageWithComponents) GetWindowingStrategy() *WindowingStrategy { + if x, ok := m.GetRoot().(*MessageWithComponents_WindowingStrategy); ok { + return x.WindowingStrategy + } + return nil +} + +func (m *MessageWithComponents) GetFunctionSpec() *FunctionSpec { + if x, ok := m.GetRoot().(*MessageWithComponents_FunctionSpec); ok { + return x.FunctionSpec + } + return nil +} + +// XXX_OneofFuncs is for the internal use of the proto package. +func (*MessageWithComponents) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { + return _MessageWithComponents_OneofMarshaler, _MessageWithComponents_OneofUnmarshaler, _MessageWithComponents_OneofSizer, []interface{}{ + (*MessageWithComponents_Coder)(nil), + (*MessageWithComponents_CombinePayload)(nil), + (*MessageWithComponents_SdkFunctionSpec)(nil), + (*MessageWithComponents_ParDoPayload)(nil), + (*MessageWithComponents_Ptransform)(nil), + (*MessageWithComponents_Pcollection)(nil), + (*MessageWithComponents_ReadPayload)(nil), + (*MessageWithComponents_SideInput)(nil), + (*MessageWithComponents_WindowIntoPayload)(nil), + (*MessageWithComponents_WindowingStrategy)(nil), + (*MessageWithComponents_FunctionSpec)(nil), + } +} + +func _MessageWithComponents_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { + m := msg.(*MessageWithComponents) + // root + switch x := m.Root.(type) { + case *MessageWithComponents_Coder: + b.EncodeVarint(2<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Coder); err != nil { + return err + } + case *MessageWithComponents_CombinePayload: + b.EncodeVarint(3<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.CombinePayload); err != nil { + return err + } + case *MessageWithComponents_SdkFunctionSpec: + b.EncodeVarint(4<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.SdkFunctionSpec); err != nil { + return err + } + case *MessageWithComponents_ParDoPayload: + b.EncodeVarint(6<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.ParDoPayload); err != nil { + return err + } + case *MessageWithComponents_Ptransform: + b.EncodeVarint(7<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Ptransform); err != nil { + return err + } + case *MessageWithComponents_Pcollection: + b.EncodeVarint(8<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Pcollection); err != nil { + return err + } + case *MessageWithComponents_ReadPayload: + b.EncodeVarint(9<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.ReadPayload); err != nil { + return err + } + case *MessageWithComponents_SideInput: + b.EncodeVarint(11<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.SideInput); err != nil { + return err + } + case *MessageWithComponents_WindowIntoPayload: + b.EncodeVarint(12<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.WindowIntoPayload); err != nil { + return err + } + case *MessageWithComponents_WindowingStrategy: + b.EncodeVarint(13<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.WindowingStrategy); err != nil { + return err + } + case *MessageWithComponents_FunctionSpec: + b.EncodeVarint(14<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.FunctionSpec); err != nil { + return err + } + case nil: + default: + return fmt.Errorf("MessageWithComponents.Root has unexpected type %T", x) + } + return nil +} + +func _MessageWithComponents_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { + m := msg.(*MessageWithComponents) + switch tag { + case 2: // root.coder + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(Coder) + err := b.DecodeMessage(msg) + m.Root = &MessageWithComponents_Coder{msg} + return true, err + case 3: // root.combine_payload + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(CombinePayload) + err := b.DecodeMessage(msg) + m.Root = &MessageWithComponents_CombinePayload{msg} + return true, err + case 4: // root.sdk_function_spec + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(SdkFunctionSpec) + err := b.DecodeMessage(msg) + m.Root = &MessageWithComponents_SdkFunctionSpec{msg} + return true, err + case 6: // root.par_do_payload + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(ParDoPayload) + err := b.DecodeMessage(msg) + m.Root = &MessageWithComponents_ParDoPayload{msg} + return true, err + case 7: // root.ptransform + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(PTransform) + err := b.DecodeMessage(msg) + m.Root = &MessageWithComponents_Ptransform{msg} + return true, err + case 8: // root.pcollection + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(PCollection) + err := b.DecodeMessage(msg) + m.Root = &MessageWithComponents_Pcollection{msg} + return true, err + case 9: // root.read_payload + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(ReadPayload) + err := b.DecodeMessage(msg) + m.Root = &MessageWithComponents_ReadPayload{msg} + return true, err + case 11: // root.side_input + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(SideInput) + err := b.DecodeMessage(msg) + m.Root = &MessageWithComponents_SideInput{msg} + return true, err + case 12: // root.window_into_payload + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(WindowIntoPayload) + err := b.DecodeMessage(msg) + m.Root = &MessageWithComponents_WindowIntoPayload{msg} + return true, err + case 13: // root.windowing_strategy + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(WindowingStrategy) + err := b.DecodeMessage(msg) + m.Root = &MessageWithComponents_WindowingStrategy{msg} + return true, err + case 14: // root.function_spec + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(FunctionSpec) + err := b.DecodeMessage(msg) + m.Root = &MessageWithComponents_FunctionSpec{msg} + return true, err + default: + return false, nil + } +} + +func _MessageWithComponents_OneofSizer(msg proto.Message) (n int) { + m := msg.(*MessageWithComponents) + // root + switch x := m.Root.(type) { + case *MessageWithComponents_Coder: + s := proto.Size(x.Coder) + n += proto.SizeVarint(2<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *MessageWithComponents_CombinePayload: + s := proto.Size(x.CombinePayload) + n += proto.SizeVarint(3<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *MessageWithComponents_SdkFunctionSpec: + s := proto.Size(x.SdkFunctionSpec) + n += proto.SizeVarint(4<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *MessageWithComponents_ParDoPayload: + s := proto.Size(x.ParDoPayload) + n += proto.SizeVarint(6<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *MessageWithComponents_Ptransform: + s := proto.Size(x.Ptransform) + n += proto.SizeVarint(7<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *MessageWithComponents_Pcollection: + s := proto.Size(x.Pcollection) + n += proto.SizeVarint(8<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *MessageWithComponents_ReadPayload: + s := proto.Size(x.ReadPayload) + n += proto.SizeVarint(9<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *MessageWithComponents_SideInput: + s := proto.Size(x.SideInput) + n += proto.SizeVarint(11<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *MessageWithComponents_WindowIntoPayload: + s := proto.Size(x.WindowIntoPayload) + n += proto.SizeVarint(12<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *MessageWithComponents_WindowingStrategy: + s := proto.Size(x.WindowingStrategy) + n += proto.SizeVarint(13<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *MessageWithComponents_FunctionSpec: + s := proto.Size(x.FunctionSpec) + n += proto.SizeVarint(14<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case nil: + default: + panic(fmt.Sprintf("proto: unexpected type %T in oneof", x)) + } + return n +} + +// A Pipeline is a hierarchical graph of PTransforms, linked +// by PCollections. +// +// This is represented by a number of by-reference maps to nodes, +// PCollections, SDK environments, UDF, etc., for +// supporting compact reuse and arbitrary graph structure. +// +// All of the keys in the maps here are arbitrary strings that are only +// required to be internally consistent within this proto message. +type Pipeline struct { + // (Required) The coders, UDFs, graph nodes, etc, that make up + // this pipeline. + Components *Components `protobuf:"bytes,1,opt,name=components" json:"components,omitempty"` + // (Required) The ids of all PTransforms that are not contained within another PTransform. + // These must be in shallow topological order, so that traversing them recursively + // in this order yields a recursively topological traversal. + RootTransformIds []string `protobuf:"bytes,2,rep,name=root_transform_ids,json=rootTransformIds" json:"root_transform_ids,omitempty"` + // (Optional) Static display data for the pipeline. If there is none, + // it may be omitted. + DisplayData *DisplayData `protobuf:"bytes,3,opt,name=display_data,json=displayData" json:"display_data,omitempty"` +} + +func (m *Pipeline) Reset() { *m = Pipeline{} } +func (m *Pipeline) String() string { return proto.CompactTextString(m) } +func (*Pipeline) ProtoMessage() {} +func (*Pipeline) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } + +func (m *Pipeline) GetComponents() *Components { + if m != nil { + return m.Components + } + return nil +} + +func (m *Pipeline) GetRootTransformIds() []string { + if m != nil { + return m.RootTransformIds + } + return nil +} + +func (m *Pipeline) GetDisplayData() *DisplayData { + if m != nil { + return m.DisplayData + } + return nil +} + +// An applied PTransform! This does not contain the graph data, but only the +// fields specific to a graph node that is a Runner API transform +// between PCollections. +type PTransform struct { + // (Required) A unique name for the application node. + // + // Ideally, this should be stable over multiple evolutions of a pipeline + // for the purposes of logging and associating pipeline state with a node, + // etc. + // + // If it is not stable, then the runner decides what will happen. But, most + // importantly, it must always be here and be unique, even if it is + // autogenerated. + UniqueName string `protobuf:"bytes,5,opt,name=unique_name,json=uniqueName" json:"unique_name,omitempty"` + // (Optional) A URN and payload that, together, fully defined the semantics + // of this transform. + // + // If absent, this must be an "anonymous" composite transform. + // + // For primitive transform in the Runner API, this is required, and the + // payloads are well-defined messages. When the URN indicates ParDo it + // is a ParDoPayload, and so on. + // + // TODO: document the standardized URNs and payloads + // TODO: separate standardized payloads into a separate proto file + // + // For some special composite transforms, the payload is also officially + // defined: + // + // - when the URN is "urn:beam:transforms:combine" it is a CombinePayload + // + Spec *FunctionSpec `protobuf:"bytes,1,opt,name=spec" json:"spec,omitempty"` + // (Optional) if this node is a composite, a list of the ids of + // transforms that it contains. + Subtransforms []string `protobuf:"bytes,2,rep,name=subtransforms" json:"subtransforms,omitempty"` + // (Required) A map from local names of inputs (unique only with this map, and + // likely embedded in the transform payload and serialized user code) to + // PCollection ids. + // + // The payload for this transform may clarify the relationship of these + // inputs. For example: + // + // - for a Flatten transform they are merged + // - for a ParDo transform, some may be side inputs + // + // All inputs are recorded here so that the topological ordering of + // the graph is consistent whether or not the payload is understood. + // + Inputs map[string]string `protobuf:"bytes,3,rep,name=inputs" json:"inputs,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + // (Required) A map from local names of outputs (unique only within this map, + // and likely embedded in the transform payload and serialized user code) + // to PCollection ids. + // + // The URN or payload for this transform node may clarify the type and + // relationship of these outputs. For example: + // + // - for a ParDo transform, these are tags on PCollections, which will be + // embedded in the DoFn. + // + Outputs map[string]string `protobuf:"bytes,4,rep,name=outputs" json:"outputs,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + // (Optional) Static display data for this PTransform application. If + // there is none, or it is not relevant (such as use by the Fn API) + // then it may be omitted. + DisplayData *DisplayData `protobuf:"bytes,6,opt,name=display_data,json=displayData" json:"display_data,omitempty"` +} + +func (m *PTransform) Reset() { *m = PTransform{} } +func (m *PTransform) String() string { return proto.CompactTextString(m) } +func (*PTransform) ProtoMessage() {} +func (*PTransform) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } + +func (m *PTransform) GetUniqueName() string { + if m != nil { + return m.UniqueName + } + return "" +} + +func (m *PTransform) GetSpec() *FunctionSpec { + if m != nil { + return m.Spec + } + return nil +} + +func (m *PTransform) GetSubtransforms() []string { + if m != nil { + return m.Subtransforms + } + return nil +} + +func (m *PTransform) GetInputs() map[string]string { + if m != nil { + return m.Inputs + } + return nil +} + +func (m *PTransform) GetOutputs() map[string]string { + if m != nil { + return m.Outputs + } + return nil +} + +func (m *PTransform) GetDisplayData() *DisplayData { + if m != nil { + return m.DisplayData + } + return nil +} + +// A PCollection! +type PCollection struct { + // (Required) A unique name for the PCollection. + // + // Ideally, this should be stable over multiple evolutions of a pipeline + // for the purposes of logging and associating pipeline state with a node, + // etc. + // + // If it is not stable, then the runner decides what will happen. But, most + // importantly, it must always be here, even if it is autogenerated. + UniqueName string `protobuf:"bytes,1,opt,name=unique_name,json=uniqueName" json:"unique_name,omitempty"` + // (Required) The id of the Coder for this PCollection. + CoderId string `protobuf:"bytes,2,opt,name=coder_id,json=coderId" json:"coder_id,omitempty"` + // (Required) Whether this PCollection is bounded or unbounded + IsBounded IsBounded_Enum `protobuf:"varint,3,opt,name=is_bounded,json=isBounded,enum=org.apache.beam.model.pipeline.v1.IsBounded_Enum" json:"is_bounded,omitempty"` + // (Required) The id of the windowing strategy for this PCollection. + WindowingStrategyId string `protobuf:"bytes,4,opt,name=windowing_strategy_id,json=windowingStrategyId" json:"windowing_strategy_id,omitempty"` + // (Optional) Static display data for this PTransform application. If + // there is none, or it is not relevant (such as use by the Fn API) + // then it may be omitted. + DisplayData *DisplayData `protobuf:"bytes,5,opt,name=display_data,json=displayData" json:"display_data,omitempty"` +} + +func (m *PCollection) Reset() { *m = PCollection{} } +func (m *PCollection) String() string { return proto.CompactTextString(m) } +func (*PCollection) ProtoMessage() {} +func (*PCollection) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{4} } + +func (m *PCollection) GetUniqueName() string { + if m != nil { + return m.UniqueName + } + return "" +} + +func (m *PCollection) GetCoderId() string { + if m != nil { + return m.CoderId + } + return "" +} + +func (m *PCollection) GetIsBounded() IsBounded_Enum { + if m != nil { + return m.IsBounded + } + return IsBounded_UNSPECIFIED +} + +func (m *PCollection) GetWindowingStrategyId() string { + if m != nil { + return m.WindowingStrategyId + } + return "" +} + +func (m *PCollection) GetDisplayData() *DisplayData { + if m != nil { + return m.DisplayData + } + return nil +} + +// The payload for the primitive ParDo transform. +type ParDoPayload struct { + // (Required) The SdkFunctionSpec of the DoFn. + DoFn *SdkFunctionSpec `protobuf:"bytes,1,opt,name=do_fn,json=doFn" json:"do_fn,omitempty"` + // (Required) Additional pieces of context the DoFn may require that + // are not otherwise represented in the payload. + // (may force runners to execute the ParDo differently) + Parameters []*Parameter `protobuf:"bytes,2,rep,name=parameters" json:"parameters,omitempty"` + // (Optional) A mapping of local input names to side inputs, describing + // the expected access pattern. + SideInputs map[string]*SideInput `protobuf:"bytes,3,rep,name=side_inputs,json=sideInputs" json:"side_inputs,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + // (Optional) A mapping of local state names to state specifications. + StateSpecs map[string]*StateSpec `protobuf:"bytes,4,rep,name=state_specs,json=stateSpecs" json:"state_specs,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + // (Optional) A mapping of local timer names to timer specifications. + TimerSpecs map[string]*TimerSpec `protobuf:"bytes,5,rep,name=timer_specs,json=timerSpecs" json:"timer_specs,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` + // Whether the DoFn is splittable + Splittable bool `protobuf:"varint,6,opt,name=splittable" json:"splittable,omitempty"` +} + +func (m *ParDoPayload) Reset() { *m = ParDoPayload{} } +func (m *ParDoPayload) String() string { return proto.CompactTextString(m) } +func (*ParDoPayload) ProtoMessage() {} +func (*ParDoPayload) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{5} } + +func (m *ParDoPayload) GetDoFn() *SdkFunctionSpec { + if m != nil { + return m.DoFn + } + return nil +} + +func (m *ParDoPayload) GetParameters() []*Parameter { + if m != nil { + return m.Parameters + } + return nil +} + +func (m *ParDoPayload) GetSideInputs() map[string]*SideInput { + if m != nil { + return m.SideInputs + } + return nil +} + +func (m *ParDoPayload) GetStateSpecs() map[string]*StateSpec { + if m != nil { + return m.StateSpecs + } + return nil +} + +func (m *ParDoPayload) GetTimerSpecs() map[string]*TimerSpec { + if m != nil { + return m.TimerSpecs + } + return nil +} + +func (m *ParDoPayload) GetSplittable() bool { + if m != nil { + return m.Splittable + } + return false +} + +// Parameters that a UDF might require. +// +// The details of how a runner sends these parameters to the SDK harness +// are the subject of the Fn API. +// +// The details of how an SDK harness delivers them to the UDF is entirely +// up to the SDK. (for some SDKs there may be parameters that are not +// represented here if the runner doesn't need to do anything) +// +// Here, the parameters are simply indicators to the runner that they +// need to run the function a particular way. +// +// TODO: the evolution of the Fn API will influence what needs explicit +// representation here +type Parameter struct { + Type Parameter_Type_Enum `protobuf:"varint,1,opt,name=type,enum=org.apache.beam.model.pipeline.v1.Parameter_Type_Enum" json:"type,omitempty"` +} + +func (m *Parameter) Reset() { *m = Parameter{} } +func (m *Parameter) String() string { return proto.CompactTextString(m) } +func (*Parameter) ProtoMessage() {} +func (*Parameter) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6} } + +func (m *Parameter) GetType() Parameter_Type_Enum { + if m != nil { + return m.Type + } + return Parameter_Type_UNSPECIFIED +} + +type Parameter_Type struct { +} + +func (m *Parameter_Type) Reset() { *m = Parameter_Type{} } +func (m *Parameter_Type) String() string { return proto.CompactTextString(m) } +func (*Parameter_Type) ProtoMessage() {} +func (*Parameter_Type) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{6, 0} } + +type StateSpec struct { + // Types that are valid to be assigned to Spec: + // *StateSpec_ValueSpec + // *StateSpec_BagSpec + // *StateSpec_CombiningSpec + // *StateSpec_MapSpec + // *StateSpec_SetSpec + Spec isStateSpec_Spec `protobuf_oneof:"spec"` +} + +func (m *StateSpec) Reset() { *m = StateSpec{} } +func (m *StateSpec) String() string { return proto.CompactTextString(m) } +func (*StateSpec) ProtoMessage() {} +func (*StateSpec) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{7} } + +type isStateSpec_Spec interface { + isStateSpec_Spec() +} + +type StateSpec_ValueSpec struct { + ValueSpec *ValueStateSpec `protobuf:"bytes,1,opt,name=value_spec,json=valueSpec,oneof"` +} +type StateSpec_BagSpec struct { + BagSpec *BagStateSpec `protobuf:"bytes,2,opt,name=bag_spec,json=bagSpec,oneof"` +} +type StateSpec_CombiningSpec struct { + CombiningSpec *CombiningStateSpec `protobuf:"bytes,3,opt,name=combining_spec,json=combiningSpec,oneof"` +} +type StateSpec_MapSpec struct { + MapSpec *MapStateSpec `protobuf:"bytes,4,opt,name=map_spec,json=mapSpec,oneof"` +} +type StateSpec_SetSpec struct { + SetSpec *SetStateSpec `protobuf:"bytes,5,opt,name=set_spec,json=setSpec,oneof"` +} + +func (*StateSpec_ValueSpec) isStateSpec_Spec() {} +func (*StateSpec_BagSpec) isStateSpec_Spec() {} +func (*StateSpec_CombiningSpec) isStateSpec_Spec() {} +func (*StateSpec_MapSpec) isStateSpec_Spec() {} +func (*StateSpec_SetSpec) isStateSpec_Spec() {} + +func (m *StateSpec) GetSpec() isStateSpec_Spec { + if m != nil { + return m.Spec + } + return nil +} + +func (m *StateSpec) GetValueSpec() *ValueStateSpec { + if x, ok := m.GetSpec().(*StateSpec_ValueSpec); ok { + return x.ValueSpec + } + return nil +} + +func (m *StateSpec) GetBagSpec() *BagStateSpec { + if x, ok := m.GetSpec().(*StateSpec_BagSpec); ok { + return x.BagSpec + } + return nil +} + +func (m *StateSpec) GetCombiningSpec() *CombiningStateSpec { + if x, ok := m.GetSpec().(*StateSpec_CombiningSpec); ok { + return x.CombiningSpec + } + return nil +} + +func (m *StateSpec) GetMapSpec() *MapStateSpec { + if x, ok := m.GetSpec().(*StateSpec_MapSpec); ok { + return x.MapSpec + } + return nil +} + +func (m *StateSpec) GetSetSpec() *SetStateSpec { + if x, ok := m.GetSpec().(*StateSpec_SetSpec); ok { + return x.SetSpec + } + return nil +} + +// XXX_OneofFuncs is for the internal use of the proto package. +func (*StateSpec) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { + return _StateSpec_OneofMarshaler, _StateSpec_OneofUnmarshaler, _StateSpec_OneofSizer, []interface{}{ + (*StateSpec_ValueSpec)(nil), + (*StateSpec_BagSpec)(nil), + (*StateSpec_CombiningSpec)(nil), + (*StateSpec_MapSpec)(nil), + (*StateSpec_SetSpec)(nil), + } +} + +func _StateSpec_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { + m := msg.(*StateSpec) + // spec + switch x := m.Spec.(type) { + case *StateSpec_ValueSpec: + b.EncodeVarint(1<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.ValueSpec); err != nil { + return err + } + case *StateSpec_BagSpec: + b.EncodeVarint(2<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.BagSpec); err != nil { + return err + } + case *StateSpec_CombiningSpec: + b.EncodeVarint(3<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.CombiningSpec); err != nil { + return err + } + case *StateSpec_MapSpec: + b.EncodeVarint(4<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.MapSpec); err != nil { + return err + } + case *StateSpec_SetSpec: + b.EncodeVarint(5<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.SetSpec); err != nil { + return err + } + case nil: + default: + return fmt.Errorf("StateSpec.Spec has unexpected type %T", x) + } + return nil +} + +func _StateSpec_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { + m := msg.(*StateSpec) + switch tag { + case 1: // spec.value_spec + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(ValueStateSpec) + err := b.DecodeMessage(msg) + m.Spec = &StateSpec_ValueSpec{msg} + return true, err + case 2: // spec.bag_spec + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(BagStateSpec) + err := b.DecodeMessage(msg) + m.Spec = &StateSpec_BagSpec{msg} + return true, err + case 3: // spec.combining_spec + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(CombiningStateSpec) + err := b.DecodeMessage(msg) + m.Spec = &StateSpec_CombiningSpec{msg} + return true, err + case 4: // spec.map_spec + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(MapStateSpec) + err := b.DecodeMessage(msg) + m.Spec = &StateSpec_MapSpec{msg} + return true, err + case 5: // spec.set_spec + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(SetStateSpec) + err := b.DecodeMessage(msg) + m.Spec = &StateSpec_SetSpec{msg} + return true, err + default: + return false, nil + } +} + +func _StateSpec_OneofSizer(msg proto.Message) (n int) { + m := msg.(*StateSpec) + // spec + switch x := m.Spec.(type) { + case *StateSpec_ValueSpec: + s := proto.Size(x.ValueSpec) + n += proto.SizeVarint(1<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *StateSpec_BagSpec: + s := proto.Size(x.BagSpec) + n += proto.SizeVarint(2<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *StateSpec_CombiningSpec: + s := proto.Size(x.CombiningSpec) + n += proto.SizeVarint(3<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *StateSpec_MapSpec: + s := proto.Size(x.MapSpec) + n += proto.SizeVarint(4<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *StateSpec_SetSpec: + s := proto.Size(x.SetSpec) + n += proto.SizeVarint(5<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case nil: + default: + panic(fmt.Sprintf("proto: unexpected type %T in oneof", x)) + } + return n +} + +type ValueStateSpec struct { + CoderId string `protobuf:"bytes,1,opt,name=coder_id,json=coderId" json:"coder_id,omitempty"` +} + +func (m *ValueStateSpec) Reset() { *m = ValueStateSpec{} } +func (m *ValueStateSpec) String() string { return proto.CompactTextString(m) } +func (*ValueStateSpec) ProtoMessage() {} +func (*ValueStateSpec) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{8} } + +func (m *ValueStateSpec) GetCoderId() string { + if m != nil { + return m.CoderId + } + return "" +} + +type BagStateSpec struct { + ElementCoderId string `protobuf:"bytes,1,opt,name=element_coder_id,json=elementCoderId" json:"element_coder_id,omitempty"` +} + +func (m *BagStateSpec) Reset() { *m = BagStateSpec{} } +func (m *BagStateSpec) String() string { return proto.CompactTextString(m) } +func (*BagStateSpec) ProtoMessage() {} +func (*BagStateSpec) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{9} } + +func (m *BagStateSpec) GetElementCoderId() string { + if m != nil { + return m.ElementCoderId + } + return "" +} + +type CombiningStateSpec struct { + AccumulatorCoderId string `protobuf:"bytes,1,opt,name=accumulator_coder_id,json=accumulatorCoderId" json:"accumulator_coder_id,omitempty"` + CombineFn *SdkFunctionSpec `protobuf:"bytes,2,opt,name=combine_fn,json=combineFn" json:"combine_fn,omitempty"` +} + +func (m *CombiningStateSpec) Reset() { *m = CombiningStateSpec{} } +func (m *CombiningStateSpec) String() string { return proto.CompactTextString(m) } +func (*CombiningStateSpec) ProtoMessage() {} +func (*CombiningStateSpec) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{10} } + +func (m *CombiningStateSpec) GetAccumulatorCoderId() string { + if m != nil { + return m.AccumulatorCoderId + } + return "" +} + +func (m *CombiningStateSpec) GetCombineFn() *SdkFunctionSpec { + if m != nil { + return m.CombineFn + } + return nil +} + +type MapStateSpec struct { + KeyCoderId string `protobuf:"bytes,1,opt,name=key_coder_id,json=keyCoderId" json:"key_coder_id,omitempty"` + ValueCoderId string `protobuf:"bytes,2,opt,name=value_coder_id,json=valueCoderId" json:"value_coder_id,omitempty"` +} + +func (m *MapStateSpec) Reset() { *m = MapStateSpec{} } +func (m *MapStateSpec) String() string { return proto.CompactTextString(m) } +func (*MapStateSpec) ProtoMessage() {} +func (*MapStateSpec) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{11} } + +func (m *MapStateSpec) GetKeyCoderId() string { + if m != nil { + return m.KeyCoderId + } + return "" +} + +func (m *MapStateSpec) GetValueCoderId() string { + if m != nil { + return m.ValueCoderId + } + return "" +} + +type SetStateSpec struct { + ElementCoderId string `protobuf:"bytes,1,opt,name=element_coder_id,json=elementCoderId" json:"element_coder_id,omitempty"` +} + +func (m *SetStateSpec) Reset() { *m = SetStateSpec{} } +func (m *SetStateSpec) String() string { return proto.CompactTextString(m) } +func (*SetStateSpec) ProtoMessage() {} +func (*SetStateSpec) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{12} } + +func (m *SetStateSpec) GetElementCoderId() string { + if m != nil { + return m.ElementCoderId + } + return "" +} + +type TimerSpec struct { + TimeDomain TimeDomain_Enum `protobuf:"varint,1,opt,name=time_domain,json=timeDomain,enum=org.apache.beam.model.pipeline.v1.TimeDomain_Enum" json:"time_domain,omitempty"` +} + +func (m *TimerSpec) Reset() { *m = TimerSpec{} } +func (m *TimerSpec) String() string { return proto.CompactTextString(m) } +func (*TimerSpec) ProtoMessage() {} +func (*TimerSpec) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{13} } + +func (m *TimerSpec) GetTimeDomain() TimeDomain_Enum { + if m != nil { + return m.TimeDomain + } + return TimeDomain_UNSPECIFIED +} + +type IsBounded struct { +} + +func (m *IsBounded) Reset() { *m = IsBounded{} } +func (m *IsBounded) String() string { return proto.CompactTextString(m) } +func (*IsBounded) ProtoMessage() {} +func (*IsBounded) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{14} } + +// The payload for the primitive Read transform. +type ReadPayload struct { + // (Required) The SdkFunctionSpec of the source for this Read. + Source *SdkFunctionSpec `protobuf:"bytes,1,opt,name=source" json:"source,omitempty"` + // (Required) Whether the source is bounded or unbounded + IsBounded IsBounded_Enum `protobuf:"varint,2,opt,name=is_bounded,json=isBounded,enum=org.apache.beam.model.pipeline.v1.IsBounded_Enum" json:"is_bounded,omitempty"` +} + +func (m *ReadPayload) Reset() { *m = ReadPayload{} } +func (m *ReadPayload) String() string { return proto.CompactTextString(m) } +func (*ReadPayload) ProtoMessage() {} +func (*ReadPayload) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{15} } + +func (m *ReadPayload) GetSource() *SdkFunctionSpec { + if m != nil { + return m.Source + } + return nil +} + +func (m *ReadPayload) GetIsBounded() IsBounded_Enum { + if m != nil { + return m.IsBounded + } + return IsBounded_UNSPECIFIED +} + +// The payload for the WindowInto transform. +type WindowIntoPayload struct { + // (Required) The SdkFunctionSpec of the WindowFn. + WindowFn *SdkFunctionSpec `protobuf:"bytes,1,opt,name=window_fn,json=windowFn" json:"window_fn,omitempty"` +} + +func (m *WindowIntoPayload) Reset() { *m = WindowIntoPayload{} } +func (m *WindowIntoPayload) String() string { return proto.CompactTextString(m) } +func (*WindowIntoPayload) ProtoMessage() {} +func (*WindowIntoPayload) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{16} } + +func (m *WindowIntoPayload) GetWindowFn() *SdkFunctionSpec { + if m != nil { + return m.WindowFn + } + return nil +} + +// The payload for the special-but-not-primitive Combine transform. +type CombinePayload struct { + // (Required) The SdkFunctionSpec of the CombineFn. + CombineFn *SdkFunctionSpec `protobuf:"bytes,1,opt,name=combine_fn,json=combineFn" json:"combine_fn,omitempty"` + // (Required) A reference to the Coder to use for accumulators of the CombineFn + AccumulatorCoderId string `protobuf:"bytes,2,opt,name=accumulator_coder_id,json=accumulatorCoderId" json:"accumulator_coder_id,omitempty"` + // (Required) Additional pieces of context the DoFn may require that + // are not otherwise represented in the payload. + // (may force runners to execute the ParDo differently) + Parameters []*Parameter `protobuf:"bytes,3,rep,name=parameters" json:"parameters,omitempty"` + // (Optional) A mapping of local input names to side inputs, describing + // the expected access pattern. + SideInputs map[string]*SideInput `protobuf:"bytes,4,rep,name=side_inputs,json=sideInputs" json:"side_inputs,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` +} + +func (m *CombinePayload) Reset() { *m = CombinePayload{} } +func (m *CombinePayload) String() string { return proto.CompactTextString(m) } +func (*CombinePayload) ProtoMessage() {} +func (*CombinePayload) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{17} } + +func (m *CombinePayload) GetCombineFn() *SdkFunctionSpec { + if m != nil { + return m.CombineFn + } + return nil +} + +func (m *CombinePayload) GetAccumulatorCoderId() string { + if m != nil { + return m.AccumulatorCoderId + } + return "" +} + +func (m *CombinePayload) GetParameters() []*Parameter { + if m != nil { + return m.Parameters + } + return nil +} + +func (m *CombinePayload) GetSideInputs() map[string]*SideInput { + if m != nil { + return m.SideInputs + } + return nil +} + +// The payload for the test-only primitive TestStream +type TestStreamPayload struct { + // (Required) the coder for elements in the TestStream events + CoderId string `protobuf:"bytes,1,opt,name=coder_id,json=coderId" json:"coder_id,omitempty"` + Events []*TestStreamPayload_Event `protobuf:"bytes,2,rep,name=events" json:"events,omitempty"` +} + +func (m *TestStreamPayload) Reset() { *m = TestStreamPayload{} } +func (m *TestStreamPayload) String() string { return proto.CompactTextString(m) } +func (*TestStreamPayload) ProtoMessage() {} +func (*TestStreamPayload) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{18} } + +func (m *TestStreamPayload) GetCoderId() string { + if m != nil { + return m.CoderId + } + return "" +} + +func (m *TestStreamPayload) GetEvents() []*TestStreamPayload_Event { + if m != nil { + return m.Events + } + return nil +} + +type TestStreamPayload_Event struct { + // Types that are valid to be assigned to Event: + // *TestStreamPayload_Event_WatermarkEvent + // *TestStreamPayload_Event_ProcessingTimeEvent + // *TestStreamPayload_Event_ElementEvent + Event isTestStreamPayload_Event_Event `protobuf_oneof:"event"` +} + +func (m *TestStreamPayload_Event) Reset() { *m = TestStreamPayload_Event{} } +func (m *TestStreamPayload_Event) String() string { return proto.CompactTextString(m) } +func (*TestStreamPayload_Event) ProtoMessage() {} +func (*TestStreamPayload_Event) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{18, 0} } + +type isTestStreamPayload_Event_Event interface { + isTestStreamPayload_Event_Event() +} + +type TestStreamPayload_Event_WatermarkEvent struct { + WatermarkEvent *TestStreamPayload_Event_AdvanceWatermark `protobuf:"bytes,1,opt,name=watermark_event,json=watermarkEvent,oneof"` +} +type TestStreamPayload_Event_ProcessingTimeEvent struct { + ProcessingTimeEvent *TestStreamPayload_Event_AdvanceProcessingTime `protobuf:"bytes,2,opt,name=processing_time_event,json=processingTimeEvent,oneof"` +} +type TestStreamPayload_Event_ElementEvent struct { + ElementEvent *TestStreamPayload_Event_AddElements `protobuf:"bytes,3,opt,name=element_event,json=elementEvent,oneof"` +} + +func (*TestStreamPayload_Event_WatermarkEvent) isTestStreamPayload_Event_Event() {} +func (*TestStreamPayload_Event_ProcessingTimeEvent) isTestStreamPayload_Event_Event() {} +func (*TestStreamPayload_Event_ElementEvent) isTestStreamPayload_Event_Event() {} + +func (m *TestStreamPayload_Event) GetEvent() isTestStreamPayload_Event_Event { + if m != nil { + return m.Event + } + return nil +} + +func (m *TestStreamPayload_Event) GetWatermarkEvent() *TestStreamPayload_Event_AdvanceWatermark { + if x, ok := m.GetEvent().(*TestStreamPayload_Event_WatermarkEvent); ok { + return x.WatermarkEvent + } + return nil +} + +func (m *TestStreamPayload_Event) GetProcessingTimeEvent() *TestStreamPayload_Event_AdvanceProcessingTime { + if x, ok := m.GetEvent().(*TestStreamPayload_Event_ProcessingTimeEvent); ok { + return x.ProcessingTimeEvent + } + return nil +} + +func (m *TestStreamPayload_Event) GetElementEvent() *TestStreamPayload_Event_AddElements { + if x, ok := m.GetEvent().(*TestStreamPayload_Event_ElementEvent); ok { + return x.ElementEvent + } + return nil +} + +// XXX_OneofFuncs is for the internal use of the proto package. +func (*TestStreamPayload_Event) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { + return _TestStreamPayload_Event_OneofMarshaler, _TestStreamPayload_Event_OneofUnmarshaler, _TestStreamPayload_Event_OneofSizer, []interface{}{ + (*TestStreamPayload_Event_WatermarkEvent)(nil), + (*TestStreamPayload_Event_ProcessingTimeEvent)(nil), + (*TestStreamPayload_Event_ElementEvent)(nil), + } +} + +func _TestStreamPayload_Event_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { + m := msg.(*TestStreamPayload_Event) + // event + switch x := m.Event.(type) { + case *TestStreamPayload_Event_WatermarkEvent: + b.EncodeVarint(1<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.WatermarkEvent); err != nil { + return err + } + case *TestStreamPayload_Event_ProcessingTimeEvent: + b.EncodeVarint(2<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.ProcessingTimeEvent); err != nil { + return err + } + case *TestStreamPayload_Event_ElementEvent: + b.EncodeVarint(3<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.ElementEvent); err != nil { + return err + } + case nil: + default: + return fmt.Errorf("TestStreamPayload_Event.Event has unexpected type %T", x) + } + return nil +} + +func _TestStreamPayload_Event_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { + m := msg.(*TestStreamPayload_Event) + switch tag { + case 1: // event.watermark_event + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(TestStreamPayload_Event_AdvanceWatermark) + err := b.DecodeMessage(msg) + m.Event = &TestStreamPayload_Event_WatermarkEvent{msg} + return true, err + case 2: // event.processing_time_event + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(TestStreamPayload_Event_AdvanceProcessingTime) + err := b.DecodeMessage(msg) + m.Event = &TestStreamPayload_Event_ProcessingTimeEvent{msg} + return true, err + case 3: // event.element_event + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(TestStreamPayload_Event_AddElements) + err := b.DecodeMessage(msg) + m.Event = &TestStreamPayload_Event_ElementEvent{msg} + return true, err + default: + return false, nil + } +} + +func _TestStreamPayload_Event_OneofSizer(msg proto.Message) (n int) { + m := msg.(*TestStreamPayload_Event) + // event + switch x := m.Event.(type) { + case *TestStreamPayload_Event_WatermarkEvent: + s := proto.Size(x.WatermarkEvent) + n += proto.SizeVarint(1<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *TestStreamPayload_Event_ProcessingTimeEvent: + s := proto.Size(x.ProcessingTimeEvent) + n += proto.SizeVarint(2<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *TestStreamPayload_Event_ElementEvent: + s := proto.Size(x.ElementEvent) + n += proto.SizeVarint(3<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case nil: + default: + panic(fmt.Sprintf("proto: unexpected type %T in oneof", x)) + } + return n +} + +type TestStreamPayload_Event_AdvanceWatermark struct { + NewWatermark int64 `protobuf:"varint,1,opt,name=new_watermark,json=newWatermark" json:"new_watermark,omitempty"` +} + +func (m *TestStreamPayload_Event_AdvanceWatermark) Reset() { + *m = TestStreamPayload_Event_AdvanceWatermark{} +} +func (m *TestStreamPayload_Event_AdvanceWatermark) String() string { return proto.CompactTextString(m) } +func (*TestStreamPayload_Event_AdvanceWatermark) ProtoMessage() {} +func (*TestStreamPayload_Event_AdvanceWatermark) Descriptor() ([]byte, []int) { + return fileDescriptor0, []int{18, 0, 0} +} + +func (m *TestStreamPayload_Event_AdvanceWatermark) GetNewWatermark() int64 { + if m != nil { + return m.NewWatermark + } + return 0 +} + +type TestStreamPayload_Event_AdvanceProcessingTime struct { + AdvanceDuration int64 `protobuf:"varint,1,opt,name=advance_duration,json=advanceDuration" json:"advance_duration,omitempty"` +} + +func (m *TestStreamPayload_Event_AdvanceProcessingTime) Reset() { + *m = TestStreamPayload_Event_AdvanceProcessingTime{} +} +func (m *TestStreamPayload_Event_AdvanceProcessingTime) String() string { + return proto.CompactTextString(m) +} +func (*TestStreamPayload_Event_AdvanceProcessingTime) ProtoMessage() {} +func (*TestStreamPayload_Event_AdvanceProcessingTime) Descriptor() ([]byte, []int) { + return fileDescriptor0, []int{18, 0, 1} +} + +func (m *TestStreamPayload_Event_AdvanceProcessingTime) GetAdvanceDuration() int64 { + if m != nil { + return m.AdvanceDuration + } + return 0 +} + +type TestStreamPayload_Event_AddElements struct { + Elements []*TestStreamPayload_TimestampedElement `protobuf:"bytes,1,rep,name=elements" json:"elements,omitempty"` +} + +func (m *TestStreamPayload_Event_AddElements) Reset() { *m = TestStreamPayload_Event_AddElements{} } +func (m *TestStreamPayload_Event_AddElements) String() string { return proto.CompactTextString(m) } +func (*TestStreamPayload_Event_AddElements) ProtoMessage() {} +func (*TestStreamPayload_Event_AddElements) Descriptor() ([]byte, []int) { + return fileDescriptor0, []int{18, 0, 2} +} + +func (m *TestStreamPayload_Event_AddElements) GetElements() []*TestStreamPayload_TimestampedElement { + if m != nil { + return m.Elements + } + return nil +} + +type TestStreamPayload_TimestampedElement struct { + EncodedElement []byte `protobuf:"bytes,1,opt,name=encoded_element,json=encodedElement,proto3" json:"encoded_element,omitempty"` + Timestamp int64 `protobuf:"varint,2,opt,name=timestamp" json:"timestamp,omitempty"` +} + +func (m *TestStreamPayload_TimestampedElement) Reset() { *m = TestStreamPayload_TimestampedElement{} } +func (m *TestStreamPayload_TimestampedElement) String() string { return proto.CompactTextString(m) } +func (*TestStreamPayload_TimestampedElement) ProtoMessage() {} +func (*TestStreamPayload_TimestampedElement) Descriptor() ([]byte, []int) { + return fileDescriptor0, []int{18, 1} +} + +func (m *TestStreamPayload_TimestampedElement) GetEncodedElement() []byte { + if m != nil { + return m.EncodedElement + } + return nil +} + +func (m *TestStreamPayload_TimestampedElement) GetTimestamp() int64 { + if m != nil { + return m.Timestamp + } + return 0 +} + +// The payload for the special-but-not-primitive WriteFiles transform. +type WriteFilesPayload struct { + // (Required) The SdkFunctionSpec of the FileBasedSink. + Sink *SdkFunctionSpec `protobuf:"bytes,1,opt,name=sink" json:"sink,omitempty"` + // (Required) The format function. + FormatFunction *SdkFunctionSpec `protobuf:"bytes,2,opt,name=format_function,json=formatFunction" json:"format_function,omitempty"` + WindowedWrites bool `protobuf:"varint,3,opt,name=windowed_writes,json=windowedWrites" json:"windowed_writes,omitempty"` + RunnerDeterminedSharding bool `protobuf:"varint,4,opt,name=runner_determined_sharding,json=runnerDeterminedSharding" json:"runner_determined_sharding,omitempty"` + SideInputs map[string]*SideInput `protobuf:"bytes,5,rep,name=side_inputs,json=sideInputs" json:"side_inputs,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` +} + +func (m *WriteFilesPayload) Reset() { *m = WriteFilesPayload{} } +func (m *WriteFilesPayload) String() string { return proto.CompactTextString(m) } +func (*WriteFilesPayload) ProtoMessage() {} +func (*WriteFilesPayload) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{19} } + +func (m *WriteFilesPayload) GetSink() *SdkFunctionSpec { + if m != nil { + return m.Sink + } + return nil +} + +func (m *WriteFilesPayload) GetFormatFunction() *SdkFunctionSpec { + if m != nil { + return m.FormatFunction + } + return nil +} + +func (m *WriteFilesPayload) GetWindowedWrites() bool { + if m != nil { + return m.WindowedWrites + } + return false +} + +func (m *WriteFilesPayload) GetRunnerDeterminedSharding() bool { + if m != nil { + return m.RunnerDeterminedSharding + } + return false +} + +func (m *WriteFilesPayload) GetSideInputs() map[string]*SideInput { + if m != nil { + return m.SideInputs + } + return nil +} + +// A coder, the binary format for serialization and deserialization of data in +// a pipeline. +type Coder struct { + // (Required) A specification for the coder, as a URN plus parameters. This + // may be a cross-language agreed-upon format, or it may be a "custom coder" + // that can only be used by a particular SDK. It does not include component + // coders, as it is beneficial for these to be comprehensible to a runner + // regardless of whether the binary format is agree-upon. + Spec *SdkFunctionSpec `protobuf:"bytes,1,opt,name=spec" json:"spec,omitempty"` + // (Optional) If this coder is parametric, such as ListCoder(VarIntCoder), + // this is a list of the components. In order for encodings to be identical, + // the SdkFunctionSpec and all components must be identical, recursively. + ComponentCoderIds []string `protobuf:"bytes,2,rep,name=component_coder_ids,json=componentCoderIds" json:"component_coder_ids,omitempty"` +} + +func (m *Coder) Reset() { *m = Coder{} } +func (m *Coder) String() string { return proto.CompactTextString(m) } +func (*Coder) ProtoMessage() {} +func (*Coder) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{20} } + +func (m *Coder) GetSpec() *SdkFunctionSpec { + if m != nil { + return m.Spec + } + return nil +} + +func (m *Coder) GetComponentCoderIds() []string { + if m != nil { + return m.ComponentCoderIds + } + return nil +} + +// A windowing strategy describes the window function, triggering, allowed +// lateness, and accumulation mode for a PCollection. +// +// TODO: consider inlining field on PCollection +type WindowingStrategy struct { + // (Required) The SdkFunctionSpec of the UDF that assigns windows, + // merges windows, and shifts timestamps before they are + // combined according to the OutputTime. + WindowFn *SdkFunctionSpec `protobuf:"bytes,1,opt,name=window_fn,json=windowFn" json:"window_fn,omitempty"` + // (Required) Whether or not the window fn is merging. + // + // This knowledge is required for many optimizations. + MergeStatus MergeStatus_Enum `protobuf:"varint,2,opt,name=merge_status,json=mergeStatus,enum=org.apache.beam.model.pipeline.v1.MergeStatus_Enum" json:"merge_status,omitempty"` + // (Required) The coder for the windows of this PCollection. + WindowCoderId string `protobuf:"bytes,3,opt,name=window_coder_id,json=windowCoderId" json:"window_coder_id,omitempty"` + // (Required) The trigger to use when grouping this PCollection. + Trigger *Trigger `protobuf:"bytes,4,opt,name=trigger" json:"trigger,omitempty"` + // (Required) The accumulation mode indicates whether new panes are a full + // replacement for prior panes or whether they are deltas to be combined + // with other panes (the combine should correspond to whatever the upstream + // grouping transform is). + AccumulationMode AccumulationMode_Enum `protobuf:"varint,5,opt,name=accumulation_mode,json=accumulationMode,enum=org.apache.beam.model.pipeline.v1.AccumulationMode_Enum" json:"accumulation_mode,omitempty"` + // (Required) The OutputTime specifies, for a grouping transform, how to + // compute the aggregate timestamp. The window_fn will first possibly shift + // it later, then the OutputTime takes the max, min, or ignores it and takes + // the end of window. + // + // This is actually only for input to grouping transforms, but since they + // may be introduced in runner-specific ways, it is carried along with the + // windowing strategy. + OutputTime OutputTime_Enum `protobuf:"varint,6,opt,name=output_time,json=outputTime,enum=org.apache.beam.model.pipeline.v1.OutputTime_Enum" json:"output_time,omitempty"` + // (Required) Indicate when output should be omitted upon window expiration. + ClosingBehavior ClosingBehavior_Enum `protobuf:"varint,7,opt,name=closing_behavior,json=closingBehavior,enum=org.apache.beam.model.pipeline.v1.ClosingBehavior_Enum" json:"closing_behavior,omitempty"` + // (Required) The duration, in milliseconds, beyond the end of a window at + // which the window becomes droppable. + AllowedLateness int64 `protobuf:"varint,8,opt,name=allowed_lateness,json=allowedLateness" json:"allowed_lateness,omitempty"` + // (Required) Indicate whether empty on-time panes should be omitted. + OnTimeBehavior OnTimeBehavior_Enum `protobuf:"varint,9,opt,name=OnTimeBehavior,enum=org.apache.beam.model.pipeline.v1.OnTimeBehavior_Enum" json:"OnTimeBehavior,omitempty"` + // (Required) Whether or not the window fn assigns inputs to exactly one window + // + // This knowledge is required for some optimizations + AssignsToOneWindow bool `protobuf:"varint,10,opt,name=assigns_to_one_window,json=assignsToOneWindow" json:"assigns_to_one_window,omitempty"` +} + +func (m *WindowingStrategy) Reset() { *m = WindowingStrategy{} } +func (m *WindowingStrategy) String() string { return proto.CompactTextString(m) } +func (*WindowingStrategy) ProtoMessage() {} +func (*WindowingStrategy) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{21} } + +func (m *WindowingStrategy) GetWindowFn() *SdkFunctionSpec { + if m != nil { + return m.WindowFn + } + return nil +} + +func (m *WindowingStrategy) GetMergeStatus() MergeStatus_Enum { + if m != nil { + return m.MergeStatus + } + return MergeStatus_UNSPECIFIED +} + +func (m *WindowingStrategy) GetWindowCoderId() string { + if m != nil { + return m.WindowCoderId + } + return "" +} + +func (m *WindowingStrategy) GetTrigger() *Trigger { + if m != nil { + return m.Trigger + } + return nil +} + +func (m *WindowingStrategy) GetAccumulationMode() AccumulationMode_Enum { + if m != nil { + return m.AccumulationMode + } + return AccumulationMode_UNSPECIFIED +} + +func (m *WindowingStrategy) GetOutputTime() OutputTime_Enum { + if m != nil { + return m.OutputTime + } + return OutputTime_UNSPECIFIED +} + +func (m *WindowingStrategy) GetClosingBehavior() ClosingBehavior_Enum { + if m != nil { + return m.ClosingBehavior + } + return ClosingBehavior_UNSPECIFIED +} + +func (m *WindowingStrategy) GetAllowedLateness() int64 { + if m != nil { + return m.AllowedLateness + } + return 0 +} + +func (m *WindowingStrategy) GetOnTimeBehavior() OnTimeBehavior_Enum { + if m != nil { + return m.OnTimeBehavior + } + return OnTimeBehavior_UNSPECIFIED +} + +func (m *WindowingStrategy) GetAssignsToOneWindow() bool { + if m != nil { + return m.AssignsToOneWindow + } + return false +} + +// Whether or not a PCollection's WindowFn is non-merging, merging, or +// merging-but-already-merged, in which case a subsequent GroupByKey is almost +// always going to do something the user does not want +type MergeStatus struct { +} + +func (m *MergeStatus) Reset() { *m = MergeStatus{} } +func (m *MergeStatus) String() string { return proto.CompactTextString(m) } +func (*MergeStatus) ProtoMessage() {} +func (*MergeStatus) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{22} } + +// Whether or not subsequent outputs of aggregations should be entire +// replacement values or just the aggregation of inputs received since +// the prior output. +type AccumulationMode struct { +} + +func (m *AccumulationMode) Reset() { *m = AccumulationMode{} } +func (m *AccumulationMode) String() string { return proto.CompactTextString(m) } +func (*AccumulationMode) ProtoMessage() {} +func (*AccumulationMode) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{23} } + +// Controls whether or not an aggregating transform should output data +// when a window expires. +type ClosingBehavior struct { +} + +func (m *ClosingBehavior) Reset() { *m = ClosingBehavior{} } +func (m *ClosingBehavior) String() string { return proto.CompactTextString(m) } +func (*ClosingBehavior) ProtoMessage() {} +func (*ClosingBehavior) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{24} } + +// Controls whether or not an aggregating transform should output data +// when an on-time pane is empty. +type OnTimeBehavior struct { +} + +func (m *OnTimeBehavior) Reset() { *m = OnTimeBehavior{} } +func (m *OnTimeBehavior) String() string { return proto.CompactTextString(m) } +func (*OnTimeBehavior) ProtoMessage() {} +func (*OnTimeBehavior) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{25} } + +// When a number of windowed, timestamped inputs are aggregated, the timestamp +// for the resulting output. +type OutputTime struct { +} + +func (m *OutputTime) Reset() { *m = OutputTime{} } +func (m *OutputTime) String() string { return proto.CompactTextString(m) } +func (*OutputTime) ProtoMessage() {} +func (*OutputTime) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{26} } + +// The different time domains in the Beam model. +type TimeDomain struct { +} + +func (m *TimeDomain) Reset() { *m = TimeDomain{} } +func (m *TimeDomain) String() string { return proto.CompactTextString(m) } +func (*TimeDomain) ProtoMessage() {} +func (*TimeDomain) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{27} } + +// A small DSL for expressing when to emit new aggregations +// from a GroupByKey or CombinePerKey +// +// A trigger is described in terms of when it is _ready_ to permit output. +type Trigger struct { + // The full disjoint union of possible triggers. + // + // Types that are valid to be assigned to Trigger: + // *Trigger_AfterAll_ + // *Trigger_AfterAny_ + // *Trigger_AfterEach_ + // *Trigger_AfterEndOfWindow_ + // *Trigger_AfterProcessingTime_ + // *Trigger_AfterSynchronizedProcessingTime_ + // *Trigger_Always_ + // *Trigger_Default_ + // *Trigger_ElementCount_ + // *Trigger_Never_ + // *Trigger_OrFinally_ + // *Trigger_Repeat_ + Trigger isTrigger_Trigger `protobuf_oneof:"trigger"` +} + +func (m *Trigger) Reset() { *m = Trigger{} } +func (m *Trigger) String() string { return proto.CompactTextString(m) } +func (*Trigger) ProtoMessage() {} +func (*Trigger) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{28} } + +type isTrigger_Trigger interface { + isTrigger_Trigger() +} + +type Trigger_AfterAll_ struct { + AfterAll *Trigger_AfterAll `protobuf:"bytes,1,opt,name=after_all,json=afterAll,oneof"` +} +type Trigger_AfterAny_ struct { + AfterAny *Trigger_AfterAny `protobuf:"bytes,2,opt,name=after_any,json=afterAny,oneof"` +} +type Trigger_AfterEach_ struct { + AfterEach *Trigger_AfterEach `protobuf:"bytes,3,opt,name=after_each,json=afterEach,oneof"` +} +type Trigger_AfterEndOfWindow_ struct { + AfterEndOfWindow *Trigger_AfterEndOfWindow `protobuf:"bytes,4,opt,name=after_end_of_window,json=afterEndOfWindow,oneof"` +} +type Trigger_AfterProcessingTime_ struct { + AfterProcessingTime *Trigger_AfterProcessingTime `protobuf:"bytes,5,opt,name=after_processing_time,json=afterProcessingTime,oneof"` +} +type Trigger_AfterSynchronizedProcessingTime_ struct { + AfterSynchronizedProcessingTime *Trigger_AfterSynchronizedProcessingTime `protobuf:"bytes,6,opt,name=after_synchronized_processing_time,json=afterSynchronizedProcessingTime,oneof"` +} +type Trigger_Always_ struct { + Always *Trigger_Always `protobuf:"bytes,12,opt,name=always,oneof"` +} +type Trigger_Default_ struct { + Default *Trigger_Default `protobuf:"bytes,7,opt,name=default,oneof"` +} +type Trigger_ElementCount_ struct { + ElementCount *Trigger_ElementCount `protobuf:"bytes,8,opt,name=element_count,json=elementCount,oneof"` +} +type Trigger_Never_ struct { + Never *Trigger_Never `protobuf:"bytes,9,opt,name=never,oneof"` +} +type Trigger_OrFinally_ struct { + OrFinally *Trigger_OrFinally `protobuf:"bytes,10,opt,name=or_finally,json=orFinally,oneof"` +} +type Trigger_Repeat_ struct { + Repeat *Trigger_Repeat `protobuf:"bytes,11,opt,name=repeat,oneof"` +} + +func (*Trigger_AfterAll_) isTrigger_Trigger() {} +func (*Trigger_AfterAny_) isTrigger_Trigger() {} +func (*Trigger_AfterEach_) isTrigger_Trigger() {} +func (*Trigger_AfterEndOfWindow_) isTrigger_Trigger() {} +func (*Trigger_AfterProcessingTime_) isTrigger_Trigger() {} +func (*Trigger_AfterSynchronizedProcessingTime_) isTrigger_Trigger() {} +func (*Trigger_Always_) isTrigger_Trigger() {} +func (*Trigger_Default_) isTrigger_Trigger() {} +func (*Trigger_ElementCount_) isTrigger_Trigger() {} +func (*Trigger_Never_) isTrigger_Trigger() {} +func (*Trigger_OrFinally_) isTrigger_Trigger() {} +func (*Trigger_Repeat_) isTrigger_Trigger() {} + +func (m *Trigger) GetTrigger() isTrigger_Trigger { + if m != nil { + return m.Trigger + } + return nil +} + +func (m *Trigger) GetAfterAll() *Trigger_AfterAll { + if x, ok := m.GetTrigger().(*Trigger_AfterAll_); ok { + return x.AfterAll + } + return nil +} + +func (m *Trigger) GetAfterAny() *Trigger_AfterAny { + if x, ok := m.GetTrigger().(*Trigger_AfterAny_); ok { + return x.AfterAny + } + return nil +} + +func (m *Trigger) GetAfterEach() *Trigger_AfterEach { + if x, ok := m.GetTrigger().(*Trigger_AfterEach_); ok { + return x.AfterEach + } + return nil +} + +func (m *Trigger) GetAfterEndOfWindow() *Trigger_AfterEndOfWindow { + if x, ok := m.GetTrigger().(*Trigger_AfterEndOfWindow_); ok { + return x.AfterEndOfWindow + } + return nil +} + +func (m *Trigger) GetAfterProcessingTime() *Trigger_AfterProcessingTime { + if x, ok := m.GetTrigger().(*Trigger_AfterProcessingTime_); ok { + return x.AfterProcessingTime + } + return nil +} + +func (m *Trigger) GetAfterSynchronizedProcessingTime() *Trigger_AfterSynchronizedProcessingTime { + if x, ok := m.GetTrigger().(*Trigger_AfterSynchronizedProcessingTime_); ok { + return x.AfterSynchronizedProcessingTime + } + return nil +} + +func (m *Trigger) GetAlways() *Trigger_Always { + if x, ok := m.GetTrigger().(*Trigger_Always_); ok { + return x.Always + } + return nil +} + +func (m *Trigger) GetDefault() *Trigger_Default { + if x, ok := m.GetTrigger().(*Trigger_Default_); ok { + return x.Default + } + return nil +} + +func (m *Trigger) GetElementCount() *Trigger_ElementCount { + if x, ok := m.GetTrigger().(*Trigger_ElementCount_); ok { + return x.ElementCount + } + return nil +} + +func (m *Trigger) GetNever() *Trigger_Never { + if x, ok := m.GetTrigger().(*Trigger_Never_); ok { + return x.Never + } + return nil +} + +func (m *Trigger) GetOrFinally() *Trigger_OrFinally { + if x, ok := m.GetTrigger().(*Trigger_OrFinally_); ok { + return x.OrFinally + } + return nil +} + +func (m *Trigger) GetRepeat() *Trigger_Repeat { + if x, ok := m.GetTrigger().(*Trigger_Repeat_); ok { + return x.Repeat + } + return nil +} + +// XXX_OneofFuncs is for the internal use of the proto package. +func (*Trigger) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { + return _Trigger_OneofMarshaler, _Trigger_OneofUnmarshaler, _Trigger_OneofSizer, []interface{}{ + (*Trigger_AfterAll_)(nil), + (*Trigger_AfterAny_)(nil), + (*Trigger_AfterEach_)(nil), + (*Trigger_AfterEndOfWindow_)(nil), + (*Trigger_AfterProcessingTime_)(nil), + (*Trigger_AfterSynchronizedProcessingTime_)(nil), + (*Trigger_Always_)(nil), + (*Trigger_Default_)(nil), + (*Trigger_ElementCount_)(nil), + (*Trigger_Never_)(nil), + (*Trigger_OrFinally_)(nil), + (*Trigger_Repeat_)(nil), + } +} + +func _Trigger_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { + m := msg.(*Trigger) + // trigger + switch x := m.Trigger.(type) { + case *Trigger_AfterAll_: + b.EncodeVarint(1<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.AfterAll); err != nil { + return err + } + case *Trigger_AfterAny_: + b.EncodeVarint(2<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.AfterAny); err != nil { + return err + } + case *Trigger_AfterEach_: + b.EncodeVarint(3<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.AfterEach); err != nil { + return err + } + case *Trigger_AfterEndOfWindow_: + b.EncodeVarint(4<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.AfterEndOfWindow); err != nil { + return err + } + case *Trigger_AfterProcessingTime_: + b.EncodeVarint(5<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.AfterProcessingTime); err != nil { + return err + } + case *Trigger_AfterSynchronizedProcessingTime_: + b.EncodeVarint(6<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.AfterSynchronizedProcessingTime); err != nil { + return err + } + case *Trigger_Always_: + b.EncodeVarint(12<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Always); err != nil { + return err + } + case *Trigger_Default_: + b.EncodeVarint(7<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Default); err != nil { + return err + } + case *Trigger_ElementCount_: + b.EncodeVarint(8<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.ElementCount); err != nil { + return err + } + case *Trigger_Never_: + b.EncodeVarint(9<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Never); err != nil { + return err + } + case *Trigger_OrFinally_: + b.EncodeVarint(10<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.OrFinally); err != nil { + return err + } + case *Trigger_Repeat_: + b.EncodeVarint(11<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Repeat); err != nil { + return err + } + case nil: + default: + return fmt.Errorf("Trigger.Trigger has unexpected type %T", x) + } + return nil +} + +func _Trigger_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { + m := msg.(*Trigger) + switch tag { + case 1: // trigger.after_all + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(Trigger_AfterAll) + err := b.DecodeMessage(msg) + m.Trigger = &Trigger_AfterAll_{msg} + return true, err + case 2: // trigger.after_any + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(Trigger_AfterAny) + err := b.DecodeMessage(msg) + m.Trigger = &Trigger_AfterAny_{msg} + return true, err + case 3: // trigger.after_each + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(Trigger_AfterEach) + err := b.DecodeMessage(msg) + m.Trigger = &Trigger_AfterEach_{msg} + return true, err + case 4: // trigger.after_end_of_window + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(Trigger_AfterEndOfWindow) + err := b.DecodeMessage(msg) + m.Trigger = &Trigger_AfterEndOfWindow_{msg} + return true, err + case 5: // trigger.after_processing_time + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(Trigger_AfterProcessingTime) + err := b.DecodeMessage(msg) + m.Trigger = &Trigger_AfterProcessingTime_{msg} + return true, err + case 6: // trigger.after_synchronized_processing_time + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(Trigger_AfterSynchronizedProcessingTime) + err := b.DecodeMessage(msg) + m.Trigger = &Trigger_AfterSynchronizedProcessingTime_{msg} + return true, err + case 12: // trigger.always + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(Trigger_Always) + err := b.DecodeMessage(msg) + m.Trigger = &Trigger_Always_{msg} + return true, err + case 7: // trigger.default + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(Trigger_Default) + err := b.DecodeMessage(msg) + m.Trigger = &Trigger_Default_{msg} + return true, err + case 8: // trigger.element_count + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(Trigger_ElementCount) + err := b.DecodeMessage(msg) + m.Trigger = &Trigger_ElementCount_{msg} + return true, err + case 9: // trigger.never + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(Trigger_Never) + err := b.DecodeMessage(msg) + m.Trigger = &Trigger_Never_{msg} + return true, err + case 10: // trigger.or_finally + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(Trigger_OrFinally) + err := b.DecodeMessage(msg) + m.Trigger = &Trigger_OrFinally_{msg} + return true, err + case 11: // trigger.repeat + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(Trigger_Repeat) + err := b.DecodeMessage(msg) + m.Trigger = &Trigger_Repeat_{msg} + return true, err + default: + return false, nil + } +} + +func _Trigger_OneofSizer(msg proto.Message) (n int) { + m := msg.(*Trigger) + // trigger + switch x := m.Trigger.(type) { + case *Trigger_AfterAll_: + s := proto.Size(x.AfterAll) + n += proto.SizeVarint(1<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *Trigger_AfterAny_: + s := proto.Size(x.AfterAny) + n += proto.SizeVarint(2<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *Trigger_AfterEach_: + s := proto.Size(x.AfterEach) + n += proto.SizeVarint(3<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *Trigger_AfterEndOfWindow_: + s := proto.Size(x.AfterEndOfWindow) + n += proto.SizeVarint(4<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *Trigger_AfterProcessingTime_: + s := proto.Size(x.AfterProcessingTime) + n += proto.SizeVarint(5<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *Trigger_AfterSynchronizedProcessingTime_: + s := proto.Size(x.AfterSynchronizedProcessingTime) + n += proto.SizeVarint(6<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *Trigger_Always_: + s := proto.Size(x.Always) + n += proto.SizeVarint(12<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *Trigger_Default_: + s := proto.Size(x.Default) + n += proto.SizeVarint(7<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *Trigger_ElementCount_: + s := proto.Size(x.ElementCount) + n += proto.SizeVarint(8<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *Trigger_Never_: + s := proto.Size(x.Never) + n += proto.SizeVarint(9<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *Trigger_OrFinally_: + s := proto.Size(x.OrFinally) + n += proto.SizeVarint(10<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *Trigger_Repeat_: + s := proto.Size(x.Repeat) + n += proto.SizeVarint(11<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case nil: + default: + panic(fmt.Sprintf("proto: unexpected type %T in oneof", x)) + } + return n +} + +// Ready when all subtriggers are ready. +type Trigger_AfterAll struct { + Subtriggers []*Trigger `protobuf:"bytes,1,rep,name=subtriggers" json:"subtriggers,omitempty"` +} + +func (m *Trigger_AfterAll) Reset() { *m = Trigger_AfterAll{} } +func (m *Trigger_AfterAll) String() string { return proto.CompactTextString(m) } +func (*Trigger_AfterAll) ProtoMessage() {} +func (*Trigger_AfterAll) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{28, 0} } + +func (m *Trigger_AfterAll) GetSubtriggers() []*Trigger { + if m != nil { + return m.Subtriggers + } + return nil +} + +// Ready when any subtrigger is ready. +type Trigger_AfterAny struct { + Subtriggers []*Trigger `protobuf:"bytes,1,rep,name=subtriggers" json:"subtriggers,omitempty"` +} + +func (m *Trigger_AfterAny) Reset() {
<TRUNCATED>