[ 
https://issues.apache.org/jira/browse/BEAM-7716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16882640#comment-16882640
 ] 

Steve Niemitz edited comment on BEAM-7716 at 7/11/19 4:14 AM:
--------------------------------------------------------------

I did a little debugging tonight and tracked this down a little.  First off, 
I'm actually having trouble reproducing this in a minimal case, I'll try more 
with that tomorrow.

I think I figured it out though, and oh boy, this is a good one.

The DataflowRunner injects a few transforms into the pipeline to deduplicate 
the source output (see DataflowRunner.StreamingUnboundedRead).  During the 
process, values are turned into ValueWithRecordIds.

I'm using PubsubIO.readMessage(), which uses the PubsubMessagePayloadOnlyCoder, 
which simply delegates to the ByteArrayCoder.

Now, the ByteArrayCoder actually uses the context argument to determine how to 
serialize the payload.  With Context.OUTER, it skips the length prefixing, with 
NESTED, it adds a length prefix.

In some cases, in WindmillStreamWriter.encode, we call the coder's encode 
specifically with Context.OUTER, and this is passed all the way down to 
ByteArrayCoder.encode.  The call stack looks like WindmillStreamWriter.encode 
-> PubsubMessagePayloadOnlyCoder.encode(..., Context.OUTER) -> 
ByteArrayCoder.encode(..., Context.OUTER).

However, on the read side, the decode is done through ValueWithRecordIdCoder, 
also passing in Context.OUTER.  But, let's look at what this stack looks like.  
ValueWithRecordIdCoder calls it's wrapped valueCoder.decode, however, it 
DOESN'T PASS THE CONTEXT.  This results in ByteArrayCoder reading the stream 
assuming its in a nested context (which is actually is), and assuming the first 
few bytes are a VarInt.  However, since we encoded the data in an OUTER 
context, there was no length prefix written!  In my case, my data just happened 
to look var-inty enough that this decoded into an empty byte array.

I think the fix is to change WindmillStreamWriter.encode to use a nested 
context, but this is way above my pay-grade now.  Hopefully this can help 
someone fix it. :)


was (Author: steveniemitz):
I did a little debugging tonight and tracked this down a little.  First off, 
I'm actually having trouble reproducing this in a minimal case, I'll try more 
with that tomorrow.

I think I figured it out though, and oh boy, this is a good one.

The DataflowRunner injects a few transforms into the pipeline to deduplicate 
the source output (see DataflowRunner.StreamingUnboundedRead).  During the 
process, values are turned into ValueWithRecordIds.

I'm using PubsubIO.readMessage(), which uses the PubsubMessagePayloadOnlyCoder, 
which simply delegates to the ByteArrayCoder.

Now, the ByteArrayCoder actually uses the context argument to determine how to 
serialize the payload.  With Context.OUTER, it skips the length prefixing, with 
NESTED, it adds a length prefix.

In some cases, in WindmillStreamWriter.encode, we call the coder's encode 
specifically with Context.OUTER, and this is passed all the way down to 
ByteArrayCoder.encode.  The call stack looks like WindmillStreamWriter.encode 
-> PubsubMessagePayloadOnlyCoder.encode(..., Context.OUTER) -> 
ByteArrayCoder.encode(..., Context.OUTER).

However, on the read side, the decode is done through ValueWithRecordIdCoder, 
also passing in Context.OUTER.  But, let's look at what this stack looks like.  
ValueWithRecordIdCoder calls it's wrapped valueCoder.decode, however, it 
DOESN'T PASS THE CONTEXT.  This results in ByteArrayCoder reading the stream 
assuming its in a nested context (which is actually is), and assuming the first 
few bytes are a VarInt.  In my case, my data just happened to look var-inty 
enough that this decoded into an empty byte array.

I think the fix is to change WindmillStreamWriter.encode to use a nested 
context, but this is way above my pay-grade now.  Hopefully this can help 
someone fix it. :)

> PubsubIO returns empty message bodies for all messages read
> -----------------------------------------------------------
>
>                 Key: BEAM-7716
>                 URL: https://issues.apache.org/jira/browse/BEAM-7716
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-gcp
>            Reporter: Steve Niemitz
>            Priority: Critical
>
> {quote}Both the JSON and gRPC implementation return empty message bodies for 
> all messages read (using readMessages).  When running with the 
> dataflow-specific reader, this doesn't happen and the message bodies have the 
> content as expected.  I took a pipeline that works as expected on dataflow 
> using PubsubIO.Read, added the experiment flag, and then my pipeline broke 
> from empty message bodies.  This obviously blocked me from really 
> experimenting much more.{quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to