[ https://issues.apache.org/jira/browse/BEAM-7738?focusedWorklogId=296663&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-296663 ]
ASF GitHub Bot logged work on BEAM-7738: ---------------------------------------- Author: ASF GitHub Bot Created on: 16/Aug/19 23:40 Start Date: 16/Aug/19 23:40 Worklog Time Spent: 10m Work Description: chadrik commented on issue #9268: [BEAM-7738] Add external transform support to PubsubIO URL: https://github.com/apache/beam/pull/9268#issuecomment-522181883 Here's some info on where I am with this. I could really use some help to push this over the finish line. The expansion service runs correctly, sends the expanded transforms back to python, but the job fails inside Java on Flink because it's trying to use the incorrect serializer. There's a good chance that I'm overlooking something very obvious. Here's the stack trace: ``` 2019-08-16 16:04:58,297 INFO org.apache.flink.runtime.taskmanager.Task - Source: PubSubInflow/ExternalTransform(beam:external:java:pubsub:read:v1)/PubsubUnboundedSource/Read(PubsubSource) (1/1) (93e578d8ae737c7502685cd978413a98) switched from RUNNING to FAILED. java.lang.RuntimeException: org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage cannot be cast to [B at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) at org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.emitElement(UnboundedSourceWrapper.java:342) at org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper.run(UnboundedSourceWrapper.java:268) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97) at org.apache.flink.streaming.runtime.tasks.StoppableSourceStreamTask.run(StoppableSourceStreamTask.java:45) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassCastException: org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage cannot be cast to [B at org.apache.beam.sdk.coders.ByteArrayCoder.encode(ByteArrayCoder.java:41) at org.apache.beam.sdk.coders.LengthPrefixCoder.encode(LengthPrefixCoder.java:56) at org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:105) at org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:81) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:578) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:569) at org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.encode(WindowedValue.java:529) at org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.serialize(CoderTypeSerializer.java:87) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175) at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46) at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:78) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:160) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107) ... 15 more ``` Here's the serializer that `CoderTypeSerializer.serialize` is using before the exception: ``` WindowedValue$FullWindowedValueCoder( ValueWithRecordId$ValueWithRecordIdCoder( LengthPrefixCoder( ByteArrayCoder) ), GlobalWindow$Coder ) ``` Here's the value that's being serialized: ``` TimestampedValueInGlobalWindow{ value=ValueWithRecordId{ id=[54, 56, 54, 48, 48, 52, 48, 49, 49, 57, 55, 54, 52, 53, 48], value=org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage@64d3b2a }, timestamp=2019-08-16T20:04:29.230Z, pane=PaneInfo.NO_FIRING } ``` Obviously ByteArrayCoder is not the right serializer for PubsubMessage. Note that I get the same error whether `with_attributes` is enabled or disabled. So why is `ByteArrayCoder` being used? The graph that's generated after expansion looks right to me. Here's a table of the primitive transforms and their PCollections (note the last row is a python pardo). | transform name | transform type | out collection | out collection coder | | |------------------------------------------------------------------------------------|-------------------------------------------------------------|----------------|----------------------------------|---| | external_1root/PubsubUnboundedSource/Read(PubsubSource) | beam:transform:read:v1 | PubsubMessage | PubsubMessageWithAttributesCoder | | | external_1root/PubsubUnboundedSource/PubsubUnboundedSource.Stats/ParMultiDo(Stats) | beam:transform:pardo:v1 of StatsFn | PubsubMessage | PubsubMessageWithAttributesCoder | | | external_1root/MapElements/Map/ParMultiDo(Anonymous) | beam:transform:pardo:v1 of ParsePayloadAsPubsubMessageProto | byte[] | BytesCoder | | | ref_AppliedPTransform_PubSubInflow/Map(_from_proto_str)_4 | beam:transform:pardo:v1 of _from_proto_str | PubsubMessage | Pickle | | So as far as the Pipeline definition is concerned, it seems like the output of `Read(PubsubSource)` is properly associated with the `PubsubMessageWithAttributesCoder`, but when when the job runs it's using `BytesCoder`. So somehow the wires are getting crossed with `ParMultiDo(Anonymous)`, which uses `BytesCoder`. Any ideas? The last thing that's important to note is that I needed `PubsubIO.Read` to output a data type that is compatible with Python, so I wrote a custom parseFn that converts each message to a protobuf byte array. So the PCollection that bridges the divide between Java and Python uses a BytesCoder, but the transforms on either side do the work of converting to and from protobuf representation of a PubsubMessage. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 296663) Time Spent: 1h 40m (was: 1.5h) > Support PubSubIO to be configured externally for use with other SDKs > -------------------------------------------------------------------- > > Key: BEAM-7738 > URL: https://issues.apache.org/jira/browse/BEAM-7738 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp, runner-flink, sdk-py-core > Reporter: Chad Dombrova > Assignee: Chad Dombrova > Priority: Major > Labels: portability > Time Spent: 1h 40m > Remaining Estimate: 0h > > Now that KafkaIO is supported via the external transform API (BEAM-7029) we > should add support for PubSub. -- This message was sent by Atlassian JIRA (v7.6.14#76016)