Coders are all set up by the SDK before the pipeline is given to a runner, so that sounds like a strange issue. Would you also file a Jira ticket about your experience with the coder registry and the DataflowRunner?
On Fri, May 24, 2019 at 5:26 AM Nicolas Delsaux <[email protected]> wrote: > Thanks, PR is started (https://github.com/apache/beam/pull/8677), and > I've set both Alexey and you as potential reviewers. > > Le 24/05/2019 à 13:55, Jean-Baptiste Onofré a écrit : > > Hi, > > > > You can create a PullRequest, I will do the review. > > > > The coder is set on the RabbitMQIO PTransform, so, it should work. > > > > AFAIR, we have a Jira about that and I already started to check. Not yet > > completed yet. > > > > Regards > > JB > > > > On 24/05/2019 11:01, Nicolas Delsaux wrote: > >> Hi all > >> > >> I'm currently evaluationg Apache Beam to transfer messages from RabbitMq > >> to kafka with some transform in between. > >> > >> Doing, so, i've discovered some differences between direct runner > >> behaviour and Google Dataflow runner. > >> > >> But first, a small introduction to what I know. > >> > >> From what I understand, elements transmitted between two different > >> transforms are serialized/deserialized. > >> > >> This (de)serialization process is normally managed by Coder, in which > >> the most used is obviously the Serializablecoder, which takes a > >> serializable object and (de)serialize it using classical java > mechanisms. > >> > >> On direct runner, i had issues with rabbitMq messages, as they contain > >> in their headers objects that are LongString, an interface implemented > >> solely in a private static class of RabbitMq, and used for large text > >> messages. > >> > >> So I wrote a RabbitMqMessageCoder, and installed it in my pipeline > >> (using > >> pipeline.getCoderregistry().registerCoderForClass(RabbitMqMessage.class, > >> new MyCoder()) > >> > >> And it worked ! well, not in dataflow runner. > >> > >> > >> indeed, it seems like dataflow runner don't use this coder registry > >> mechanism (for reasons I absolutely don't understand). > >> > >> So my fix didn't work. > >> > >> After various tries, I finally gave up and directly modified the > >> RabbitMqIO class (from Apache Beam) to handle my case. > >> > >> This fix is available on my Beam fork on GitHub, and i would like to > >> have it integrated. > >> > >> What is the procedure to do so ? > >> > >> Thanks ! > >> >
