[ 
https://issues.apache.org/jira/browse/BEAM-7738?focusedWorklogId=296663&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-296663
 ]

ASF GitHub Bot logged work on BEAM-7738:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 16/Aug/19 23:40
            Start Date: 16/Aug/19 23:40
    Worklog Time Spent: 10m 
      Work Description: chadrik commented on issue #9268: [BEAM-7738] Add 
external transform support to PubsubIO
URL: https://github.com/apache/beam/pull/9268#issuecomment-522181883
 
 
   Here's some info on where I am with this.  I could really use some help to 
push this over the finish line.
   
   The expansion service runs correctly, sends the expanded transforms back to 
python, but the job fails inside Java on Flink because it's trying to use the 
incorrect serializer.  There's a good chance that I'm overlooking something 
very obvious.
   
   Here's the stack trace:
   
   ```
   2019-08-16 16:04:58,297 INFO  org.apache.flink.runtime.taskmanager.Task      
               - Source: 
PubSubInflow/ExternalTransform(beam:external:java:pubsub:read:v1)/PubsubUnboundedSource/Read(PubsubSource)
 (1/1) (93e578d8ae737c7502685cd978413a98) switched from RUNNING to FAILED.
   java.lang.RuntimeException: org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage 
cannot be cast to [B
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.emitElement(UnboundedSourceWrapper.java:342)
        at 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:268)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
        at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
        at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
        at 
org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:45)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
        at java.lang.Thread.run(Thread.java:745)
   Caused by: java.lang.ClassCastException: 
org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage cannot be cast to [B
        at 
org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41)
        at 
org.apache.beam.sdk.coders.LengthPrefixCoder.encode(LengthPrefixCoder.java:56)
        at 
org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:105)
        at 
org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:81)
        at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:578)
        at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:569)
        at 
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529)
        at 
org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:87)
        at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175)
        at 
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
        at 
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
        at 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:78)
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:160)
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128)
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
        ... 15 more
   ```
   
   Here's the serializer that `CoderTypeSerializer.serialize` is using before 
the exception:
   
   ```
   WindowedValue$FullWindowedValueCoder(
     ValueWithRecordId$ValueWithRecordIdCoder(
       LengthPrefixCoder(
         ByteArrayCoder)
     ),
     GlobalWindow$Coder
   ) 
   ```
   
   Here's the value that's being serialized:
   
   ```
   TimestampedValueInGlobalWindow{
     value=ValueWithRecordId{
       id=[54, 56, 54, 48, 48, 52, 48, 49, 49, 57, 55, 54, 52, 53, 48], 
       value=org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage@64d3b2a
     }, 
     timestamp=2019-08-16T20:04:29.230Z, 
     pane=PaneInfo.NO_FIRING
   }
   ```
   
   Obviously ByteArrayCoder is not the right serializer for PubsubMessage. 
   
   Note that I get the same error whether `with_attributes` is enabled or 
disabled.
   
   So why is `ByteArrayCoder` being used?
   
   The graph that's generated after expansion looks right to me.  Here's a 
table of the primitive transforms and their PCollections (note the last row is 
a python pardo).
   
   | transform name                                                             
        | transform type                                              | out 
collection | out collection coder             |   |
   
|------------------------------------------------------------------------------------|-------------------------------------------------------------|----------------|----------------------------------|---|
   | external_1root/PubsubUnboundedSource/Read(PubsubSource)                    
        | beam:transform:read:v1                           | PubsubMessage  | 
PubsubMessageWithAttributesCoder |   |
   | 
external_1root/PubsubUnboundedSource/PubsubUnboundedSource.Stats/ParMultiDo(Stats)
 | beam:transform:pardo:v1 of StatsFn                          | PubsubMessage  
| PubsubMessageWithAttributesCoder |   |
   | external_1root/MapElements/Map/ParMultiDo(Anonymous)                       
        | beam:transform:pardo:v1 of ParsePayloadAsPubsubMessageProto | byte[]  
       | BytesCoder                       |   |
   | ref_AppliedPTransform_PubSubInflow/Map(_from_proto_str)_4                  
        | beam:transform:pardo:v1 of _from_proto_str                  | 
PubsubMessage  | Pickle     |   |
   
   So as far as the Pipeline definition is concerned, it seems like the output 
of `Read(PubsubSource)` is properly associated with the 
`PubsubMessageWithAttributesCoder`, but when when the job runs it's using 
`BytesCoder`.  So somehow the wires are getting crossed with 
`ParMultiDo(Anonymous)`, which uses `BytesCoder`.
   
   Any ideas?
   
   The last thing that's important to note is that I needed `PubsubIO.Read` to 
output a data type that is compatible with Python, so I wrote a custom parseFn 
that converts each message to a protobuf byte array.  So the PCollection that 
bridges the divide between Java and Python uses a BytesCoder, but the 
transforms on either side do the work of converting to and from protobuf 
representation of a PubsubMessage.
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 296663)
    Time Spent: 1h 40m  (was: 1.5h)

> Support PubSubIO to be configured externally for use with other SDKs
> --------------------------------------------------------------------
>
>                 Key: BEAM-7738
>                 URL: https://issues.apache.org/jira/browse/BEAM-7738
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-gcp, runner-flink, sdk-py-core
>            Reporter: Chad Dombrova
>            Assignee: Chad Dombrova
>            Priority: Major
>              Labels: portability
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Now that KafkaIO is supported via the external transform API (BEAM-7029) we 
> should add support for PubSub.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to