>From the outset, I agree with you that there's something the Go SDK didn't expect with the stream of bytes it received. The non-global Window handling in the Go SDK is grossly undertested to begin with, other than the windowed_wordcount example (which I've run maybe once or twice). As an aside, this is something we should change now that we have an improved integration test set up to validate things against the Python Portable ULR.
This one is rightly tricky, I'm going to type out my debugging process. Search for *tl;dr;* to skip to the end for the result of the debugging. The error messages can always be improved, but it's been a while since I've seen an encoding error like this. First lets eliminate that the varint coder is doing something wrong. The important part is the bit after the plan: ``` caused by: stream value decode failed caused by: invalid varintz encoding for: [] ``` So I search the github beam repo for that error message "invalid varintz encoding" [1] finding one location the varintz decoder [2]. The '[]' at the end of the error message is the byte array it was trying to decode, which means apparently it received a 0 length []byte after reading, and couldn't handle it. It put the []byte into `binary.Varint` which if we look to the imports, it comes from the "encoding/binary" package [3]. As per the Varint documentation [4], when it returns a 0, it means the []byte buffer was too small (in this case 0) so the error is legitimate, and the returned value is invalid. Because I know this varint should have been encoded by beam (knowing how the windowed_wordcount.go example works), I look to the encoding function and see that it uses `binary.PutVarint` [5] to encode the value. Looking at the documentation for PutVarint [6] there's an example which we can modify to also print out the "n" length written to the buffer, confirming that even for the natural case for a 0 length byte buffer a 0 value, always encodes to at least 1 byte. OK the varint coder did everything it expected. So now the question becomes "Why is there a 0 length prefix where the integer value is expected?" What's different here? The printed out plan shows the hydrated ProcessBundleDescriptor as the Go SDK interpreted it. The ordering is reversed, but the first and last lines are the DataSource and DataSink, and those are the only places where a Coder is going to be used. They're the boundaries for the data entering and leaving the Go SDK respectively, so where Decoding and encoding is happening. Let's look at the plan line for DataSource. 1: DataSource[S[CombinePerKey/Group/Read@localhost:63851], out] Coder:W;c8_windowed<CoGBK;c8<string;c0,int[varintz;c3];c4>>!IWC Out:7 The first part is the index (a 1) is just where it is in the plan listing, and is used for more complex graphs, it's referred to by the Out lines to which PTransform(s) this transform outputs. The next part says it's a DataSource. This line comes from the DataSource's String method [7]. It's outputting the Stream ID "S[CombinePerKey/Group/Read@localhost:63851]" the runner gave it to access the right data over the data channel, and the name "out" which is the key used to identify this bundle in particular (used in progress reporting.) This is followed by the Coder, and then what the datasource passes it's output to. In this case, it's passing it to "7: MergeAccumulators[stats.sumIntFn]". The Coder is the important part for this bug hunt. I'm not 100% satisfied with the format that's been ended up with here, but it is what it is. It's a result of the general coder String() method [8]. What this coder is saying is that it's receiving CoGBK<string, int> values, wrapped in an Interval Window Coder. Loosely, the ;c# (and that c8_windowed) values are the ids into the job's pipeline proto coders map. It prints out it's kind, the coder id, and any component coders it might have. This seems all in order, as it's a windowed job. I'd expect this for unmodified wordcounts as well. It does give us the next place to look though, Coder handling! The Beam standard coders are defined in the beam_runner_api.proto [9]. We're about to become good friends with the coding formats listed there. The DataSource creates the window and value coders separately at the top of it's Process method [10], and then does special handling if the component of the window is a CoGBK coder. Then the per element processing begins. The Beam Datachannel at this point is represented as a stream of bytes (using Go's io.Reader interface) Each element for this Datasource represents a windowed value. The windowed values are the Key, and all the values using that key passed to the GBK. Aside: The special CoGBK handling is because Beam doesn't have a first class notion of a CoGBK coder or a GBK coder, it's represented with KVs and Iterables. Simple GBKs are represented by a KV<Key, Iterable<Value>> coder, and it's the Iterable that's the tricky portion, as the Go SDK doesn't provide a first class notion of an unbounded iterable for users to pass around. Iterables can additionally have their data instead be on the State channel rather than the Data channel, adding to the complexity. Since *most* coders don't need to make RPCs, the SDK didn't burden the API with a custom context to pass those interfaces through. Those are called State Backed Iterables, which is a superset of the coding format for iterables. Further Aside: The first part of special handling for CoGBKs is to separate the key coder from the value coder(s). As mentioned, since there's no real notion of CoGBKs, there's really only a single Value stream at present, which has a sequence of values for each prefixed with which stream they're a part of. CoGBKs could have multiple value streams, one for each PCollection<K,V> being grouped. Since this is only a simple GBK, there's only one value stream, so only the one coder listed for it (varintz), which is really an Iterable<varintz>. To handle the iterables, each value coder is passed to the makeReStream method, which handles the details of the Iterable protocol defined in the proto [11]. Back to Debugging: What we currently know is that something, somewhere went wrong with interpreting the byte stream. Looking at the coder specifications [9], we should be seeing <bytes for the window><bytes for the string key><bytes for iterable prefix><bytes for varint values>. I'm pretty confident the error isn't with the varintz. Something incorrect happened before it, and it's just the first to be unable to interpret the offset data properly. Similarly, I think the string key was fine, and the iterables. I'm eliminating these because if there were a problem with them we'd run into them when Global windows are used. The main one that is likely to be different in the scenario you've described is the window. So let us look at the window encoding format [12]. It starts with a timestamp, and it's followed by windows and a pane. The Go SDK handling for decoding the window header is DecodeWindowedValueHeader DecodeWindowedValueHeaderin exec/coder.go [13], called from DataSource [14]. We can see that it decodes the timestamp, regardless of the element's window, followed by the window(s), and the panes. We know from the coder print out that this should be an interval window coder. That's what the IWC was indicating earlier (search for that in github). It gets decoded up at the intervalWindowDecoder [15] , which handles the Iterable window property before decoding each interval into their timestamp, duration pairs. This mostly looks fine. Technically the iterable format could be sending *multiple batches*, rather than a single batch. This would mean the length for the iterable would be encoded as -1, followed by a varint of the batch size N, then the N elements in sequence. This could be the problem, as no windows would be read, and then it's a hop, skip, and a jump to a corrupted varintz down the line. Lets go a bit further to be sure that this is it. Back to DecodeWindowedValueHeader, assuming the windows are all decoded properly, the next step is interpreting the pane. This is currently hard assumed to be the NO_FIRING pane [16]. That's probably the root cause of the problem as without triggers, IIRC there are precious few ways to have different panes. It's likely one of the multi-byte versions of the pane, described in [17], eventually causing the varintz corruption. *tl;dr;* The window header reading and writing currently assumes the NO_FIRING pane is the only one sent or received [16]. Panes are not propagated through the SDK as a result. I'm not sure how much they should be TBH, I'm not familiar with how this part of the model works. But the right move to debug this is probably to implement the pane handling as described in the proto [17], to figure out what's being received. It's probably one of the multibyte encodings here. We would probably want to have a Pane type, and helpers for encoding and decoding Panes, since the paneinfo format is re-used for timers, which are also not implemented yet (but that's a separate JIRA). For propagating the Pane through the execution plan, we likely need to add a new field to the FullValue[18] for the pane and make sure it's copied through pardos like the window assignments, adjusting the EncodeWindwedValueHeader [19] to take in, and write out the proper Pane. Looking to how the Java and Python SDKs handle panes is probably the right move too. I know this is quite the info dump, so if you have any questions at all about any of it, please do let me know. Robert Burke Noted Go SDK Expert, but definitely not an expert on windows, panes, triggers, state and timers. [1] https://github.com/apache/beam/search?q=invalid+varintz+encoding [2] https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/coderx/varint.go#L74 [3] https://pkg.go.dev/encoding/binary [4] https://pkg.go.dev/encoding/binary#Varint [5] https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/coderx/varint.go#L67 [6] https://pkg.go.dev/encoding/binary#PutVarint [7] https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/datasource.go#L245 [8] https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/graph/coder/coder.go#L237 [9] https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/model/pipeline/src/main/proto/beam_runner_api.proto#L670 [10] https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/datasource.go#L101 [11] https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/model/pipeline/src/main/proto/beam_runner_api.proto#L711 and https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/model/pipeline/src/main/proto/beam_runner_api.proto#L826 [12] https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/model/pipeline/src/main/proto/beam_runner_api.proto#L751 [13] https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/coder.go#L1096 [14] https://github.com/apache/beam/tree/master/sdks/go/pkg/beam/core/runtime/https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/datasource.go#L122 [15] https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/coder.go#L1051 [16] https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/coder.go#L1108 [17] https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/model/pipeline/src/main/proto/beam_runner_api.proto#L773 [18] https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go#L34 [19] https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/coder.go#L1091 On Wed, Mar 24, 2021, 4:57 PM Eduardo Barrera <eduardo.barr...@wizeline.com> wrote: > Hello Team, > > We're testing an initial and very basic implementation of a trigger. While > testing different triggers like *Always* or *Never* using the > windowed_wordcount.go example and graphx/translate.go, we get this error > message: > > RuntimeError: process bundle failed for instruction bundle_147 using plan > 1 : while executing Process for Plan[1]: > 2: DataSink[S[CoGBK/Write@localhost:63851]] > Coder:W;c9_windowed<KV;c9<int[varintz;c3];c4,string;c0>>!GWC > 3: ParDo[beam.addFixedKeyFn] Out:[2] > 4: WindowInto[GLO]. Out:3 > 5: ParDo[main.formatFn] Out:[4] > 6: ExtractOutput[stats.sumIntFn] Keyed:false Out:5 > 7: MergeAccumulators[stats.sumIntFn] Keyed:false Out:6 > 1: DataSource[S[CombinePerKey/Group/Read@localhost:63851], out] > Coder:W;c8_windowed<CoGBK;c8<string;c0,int[varintz;c3];c4>>!IWC Out:7 > caused by: > stream value decode failed > caused by: > invalid varintz encoding for: [] > Remote logging shutting down.exit status 1 > > We believe that this is an error caused by the lack of a trigger > coder/encoder, however, it works with the Default trigger (that's set by > default in graphx/translate.go). > > Any guidance on this would be appreciated. > > Thanks! > Eduardo > > > > > > > > > > *This email and its contents (including any attachments) are being sent > toyou on the condition of confidentiality and may be protected by > legalprivilege. Access to this email by anyone other than the intended > recipientis unauthorized. If you are not the intended recipient, please > immediatelynotify the sender by replying to this message and delete the > materialimmediately from your system. Any further use, dissemination, > distributionor reproduction of this email is strictly prohibited. Further, > norepresentation is made with respect to any content contained in this > email.*