>From the outset, I agree with you that there's something the Go SDK didn't
expect with the stream of bytes it received. The non-global Window handling
in the Go SDK is grossly undertested to begin with, other than the
windowed_wordcount example (which I've run maybe once or twice).  As an
aside, this is something we should change now that we have an improved
integration test set up to validate things against the Python Portable ULR.

This one is rightly tricky, I'm going to type out my debugging process.
Search for *tl;dr;* to skip to the end for the result of the debugging.

The error messages can always be improved, but it's been a while since I've
seen an encoding error like this.
First lets eliminate that the varint coder is doing something wrong.

The important part is the bit after the plan:
```
        caused by:
stream value decode failed
        caused by:
invalid varintz encoding for: []
```

So I search the github beam repo for that error message "invalid varintz
encoding" [1] finding one location the varintz decoder [2].
The '[]' at the end of the error message is the byte array it was trying to
decode, which means apparently it received a 0 length []byte
after reading, and couldn't handle it.

It put the []byte into `binary.Varint` which if we look to the imports, it
comes from the "encoding/binary" package [3]. As per the Varint
documentation [4], when it returns a 0, it means the []byte buffer was too
small (in this case 0) so the error is legitimate, and the returned value
is invalid.

Because I know this varint should have been encoded by beam (knowing how
the windowed_wordcount.go example works), I look
to the encoding function and see that it uses `binary.PutVarint` [5] to
encode the value. Looking at the documentation for PutVarint [6] there's an
example which we can modify to also print out the "n" length written to the
buffer, confirming that even for the natural case for a 0 length byte buffer
a 0 value, always encodes to at least 1 byte.

OK the varint coder did everything it expected. So now the question becomes
"Why is there a 0 length prefix where the integer value is expected?"
What's different here?
The printed out plan shows the hydrated ProcessBundleDescriptor as the Go
SDK interpreted it. The ordering is reversed, but the first and last lines
are the DataSource and
DataSink, and those are the only places where a Coder is going to be used.
They're the boundaries for the data entering and leaving the Go SDK
respectively, so where Decoding
and encoding is happening. Let's look at the plan line for DataSource.

1: DataSource[S[CombinePerKey/Group/Read@localhost:63851], out]
Coder:W;c8_windowed<CoGBK;c8<string;c0,int[varintz;c3];c4>>!IWC Out:7

The first part is the index (a 1) is just where it is in the plan listing,
and is used for more complex graphs, it's referred to by the Out lines to
which PTransform(s) this transform outputs.
The next part says it's a DataSource. This line comes from the
DataSource's String method [7]. It's outputting the Stream ID
"S[CombinePerKey/Group/Read@localhost:63851]" the runner gave it to access
the right data over the data channel, and the name "out" which is the key
used to identify this bundle in particular (used in progress reporting.)
This is followed by the Coder, and then what the datasource passes it's
output to. In this case, it's passing it to "7:
MergeAccumulators[stats.sumIntFn]".

The Coder is the important part for this bug hunt. I'm not 100% satisfied
with the format that's been ended up with here, but it is what it is. It's
a result of the general coder String() method [8].
What this coder is saying is that it's receiving CoGBK<string, int> values,
wrapped in an Interval Window Coder.
Loosely, the ;c# (and that c8_windowed) values are the ids into the job's
pipeline proto coders map. It prints out it's kind, the coder id, and any
component coders it might have.

This seems all in order, as it's a windowed job. I'd expect this for
unmodified wordcounts as well. It does give us the next place to look
though, Coder handling! The Beam standard coders are defined in the
beam_runner_api.proto [9]. We're about to become good friends with the
coding formats listed there.

The DataSource creates the window and value coders separately at the top of
it's Process method [10], and then does special handling if the component
of the window is a CoGBK coder. Then the per element processing begins. The
Beam Datachannel at this point is represented as a stream of bytes (using
Go's  io.Reader interface) Each element for this Datasource represents a
windowed value. The windowed values are the Key, and all the values using
that key passed to the GBK.

Aside: The special CoGBK handling is because Beam doesn't have a first
class notion of a CoGBK coder or a GBK coder, it's represented with KVs and
Iterables.
Simple GBKs are represented by a KV<Key, Iterable<Value>> coder, and it's
the Iterable that's the
tricky portion, as the Go SDK doesn't provide a first class notion of an
unbounded iterable for users to pass around. Iterables can additionally
have their data instead be on the State channel rather than the Data
channel, adding to the complexity. Since *most* coders don't need to make
RPCs, the SDK didn't burden the API with a custom context to pass those
interfaces through.  Those are called State Backed Iterables, which is a
superset of the coding format for iterables.

Further Aside: The first part of special handling for CoGBKs is to separate
the key coder from the value coder(s). As mentioned, since there's no real
notion of CoGBKs, there's really only a single Value stream at present,
which has a sequence of values for each prefixed with which stream they're
a part of. CoGBKs could have multiple value streams, one for each
PCollection<K,V> being grouped. Since this is only a simple GBK, there's
only one value stream, so only the one coder listed for it (varintz), which
is really an Iterable<varintz>.
To handle the iterables, each value coder is passed to the makeReStream
method, which handles the details of the Iterable protocol defined in the
proto [11].

Back to Debugging:
What we currently know is that something, somewhere went wrong with
interpreting the byte stream.

Looking at the coder specifications [9], we should be seeing <bytes for the
window><bytes for the string key><bytes for iterable prefix><bytes for
varint values>.  I'm pretty confident the error isn't with the varintz.
Something incorrect happened before it, and it's just the first to be
unable to interpret the offset data properly. Similarly, I think the string
key was fine, and the iterables. I'm eliminating these because if there
were a problem with them we'd run into them when Global windows are used.
The main one that is likely to be different in the scenario you've
described is the window.

So let us look at the window encoding format [12]. It starts with a
timestamp, and it's followed by windows and a pane. The Go SDK handling for
decoding the window header is DecodeWindowedValueHeader
DecodeWindowedValueHeaderin exec/coder.go [13], called from DataSource
[14]. We can see that it decodes the timestamp, regardless of the element's
window, followed  by the window(s), and the panes.

 We know from the coder print out that this should be an interval window
coder. That's what the IWC was indicating earlier (search for that in
github). It gets decoded up at the intervalWindowDecoder [15] , which
handles the Iterable window property before decoding each interval into
their timestamp, duration pairs. This mostly looks fine.

Technically the iterable format could be sending *multiple batches*, rather
than a single batch. This would mean the length for the iterable would be
encoded as -1, followed by a varint of the batch size N, then the N
elements in sequence. This could be the problem, as no windows would be
read, and then it's a hop, skip, and a jump to a corrupted varintz down the
line. Lets go a bit further to be sure that this is it.

Back to DecodeWindowedValueHeader, assuming the windows are all decoded
properly, the next step is interpreting the pane. This is currently hard
assumed to be the NO_FIRING pane [16]. That's probably the root cause of
the problem as without triggers, IIRC there are precious few ways to have
different panes. It's likely one of the multi-byte versions of the pane,
described in [17], eventually causing the varintz corruption.

*tl;dr;*

The window header reading and writing currently assumes the NO_FIRING pane
is the only one sent or received [16].

Panes are not propagated through the SDK as a result. I'm not sure how much
they should be TBH, I'm not familiar with how this part of the model works.
But the right move to debug this is probably to implement the pane handling
as described in the proto [17], to figure out what's being received. It's
probably one of the multibyte encodings here.

We would probably want to have a Pane type, and helpers for encoding and
decoding Panes, since the paneinfo format is re-used for timers, which are
also not implemented yet (but that's a separate JIRA).

For propagating the Pane through the execution plan, we likely need to add
a new field to the FullValue[18] for the pane and make sure it's copied
through pardos like the window assignments, adjusting the
EncodeWindwedValueHeader [19] to take in, and write out the proper Pane.
Looking to how the Java and Python SDKs handle panes is probably the right
move too.

I know this is quite the info dump, so if you have any questions at all
about any of it, please do let me know.

Robert Burke
Noted Go SDK Expert, but definitely not an expert on windows, panes,
triggers, state and timers.


[1] https://github.com/apache/beam/search?q=invalid+varintz+encoding
[2]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/coderx/varint.go#L74
[3] https://pkg.go.dev/encoding/binary
[4] https://pkg.go.dev/encoding/binary#Varint
[5]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/coderx/varint.go#L67

[6] https://pkg.go.dev/encoding/binary#PutVarint
[7]
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/datasource.go#L245
[8]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/graph/coder/coder.go#L237
[9]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/model/pipeline/src/main/proto/beam_runner_api.proto#L670
[10]
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/datasource.go#L101
[11]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/model/pipeline/src/main/proto/beam_runner_api.proto#L711
and
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/model/pipeline/src/main/proto/beam_runner_api.proto#L826
[12]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/model/pipeline/src/main/proto/beam_runner_api.proto#L751
[13]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/coder.go#L1096
[14]
https://github.com/apache/beam/tree/master/sdks/go/pkg/beam/core/runtime/https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/datasource.go#L122
[15]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/coder.go#L1051
[16]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/coder.go#L1108
[17]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/model/pipeline/src/main/proto/beam_runner_api.proto#L773
[18]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/fullvalue.go#L34
[19]
https://github.com/apache/beam/blob/7c43ab6a8df9b23caa7321fddff9a032a71908f6/sdks/go/pkg/beam/core/runtime/exec/coder.go#L1091

On Wed, Mar 24, 2021, 4:57 PM Eduardo Barrera <eduardo.barr...@wizeline.com>
wrote:

> Hello Team,
>
> We're testing an initial and very basic implementation of a trigger. While
> testing different triggers like *Always* or *Never* using the
> windowed_wordcount.go example and graphx/translate.go, we get this error
> message:
>
> RuntimeError: process bundle failed for instruction bundle_147 using plan
> 1 : while executing Process for Plan[1]:
> 2: DataSink[S[CoGBK/Write@localhost:63851]]
> Coder:W;c9_windowed<KV;c9<int[varintz;c3];c4,string;c0>>!GWC
> 3: ParDo[beam.addFixedKeyFn] Out:[2]
> 4: WindowInto[GLO]. Out:3
> 5: ParDo[main.formatFn] Out:[4]
> 6: ExtractOutput[stats.sumIntFn] Keyed:false Out:5
> 7: MergeAccumulators[stats.sumIntFn] Keyed:false Out:6
> 1: DataSource[S[CombinePerKey/Group/Read@localhost:63851], out]
> Coder:W;c8_windowed<CoGBK;c8<string;c0,int[varintz;c3];c4>>!IWC Out:7
>         caused by:
> stream value decode failed
>         caused by:
> invalid varintz encoding for: []
> Remote logging shutting down.exit status 1
>
> We believe that this is an error caused by the lack of a trigger
> coder/encoder, however, it works with the Default trigger (that's set by
> default in graphx/translate.go).
>
> Any guidance on this would be appreciated.
>
> Thanks!
> Eduardo
>
>
>
>
>
>
>
>
>
> *This email and its contents (including any attachments) are being sent
> toyou on the condition of confidentiality and may be protected by
> legalprivilege. Access to this email by anyone other than the intended
> recipientis unauthorized. If you are not the intended recipient, please
> immediatelynotify the sender by replying to this message and delete the
> materialimmediately from your system. Any further use, dissemination,
> distributionor reproduction of this email is strictly prohibited. Further,
> norepresentation is made with respect to any content contained in this
> email.*

Reply via email to