Hi Everyone, I have been using Apache beam in the recent past. Recently, My work required me to implement a Source and Sink Connect for Rabbitmq. Apache beam version: 2.38 , java SDK (java 1.8), Direct Runner
I'm able to make a successful connection to Rabbitmq queue and send some messages into queue using standalone producer code. When I try to consume those messages using beam pipeline , I am not able to print out the messages, In the output console the , the Code executes continuously in Debug Mode without any messages. When I try to Here is the Code I'm using: package org.rabbit; import org.apache.beam.sdk.io.rabbitmq.RabbitMqIO; import org.apache.beam.sdk.io.rabbitmq.RabbitMqMessage; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; Public class RabbtiConsumer{ public static void main(String args[ ] ){ Pipeline p = Pipeline.create(); String serveruri = "amqp://user:password:host:port/virtual_host"; p.apply("Read from rabbit", RabbitMqIO.read() .withUri(serverUri) .withExchange(exchange_name, routing_key) .withQueue(queue_name)) .apply(ParDo.of(new DoFn<RabbitMqMessage,String>(){ @PocessElement public void processElement (DoFn< RabbitMqMessage, String>.ProcessContext c) { String data = c.element().getBody().toString(); System.out.println("Reading Message from Queue" + data); c.output(data); } } )); p.run().waitUntilFinish(); } } The message I'm putting into RabbitmQ queue is of XML format. I'm guessing this has to with Serialization. If someone has worked on RabbitMqMessage serialization or encountered this problem before, any help would be greatly appreciated. Regards Hitesh } }