Hi Everyone,

I have been using Apache beam in the recent past. Recently, My work
required me to implement a Source and Sink Connect for Rabbitmq.
Apache beam version: 2.38 , java SDK (java 1.8), Direct Runner

I'm able to make a successful connection to Rabbitmq queue and send some
messages into queue using standalone producer code.

When I try to consume those messages using beam pipeline , I am not able to
print out the messages,  In the output console the , the Code executes
continuously in Debug Mode without any messages.


When I try to

Here is the Code I'm using:

package org.rabbit;
import org.apache.beam.sdk.io.rabbitmq.RabbitMqIO;
import org.apache.beam.sdk.io.rabbitmq.RabbitMqMessage;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

Public class RabbtiConsumer{
    public static void main(String args[ ] ){
      Pipeline p = Pipeline.create();
      String serveruri = "amqp://user:password:host:port/virtual_host";
       p.apply("Read from rabbit", RabbitMqIO.read()
                   .withUri(serverUri)
                   .withExchange(exchange_name, routing_key)
                   .withQueue(queue_name))
       .apply(ParDo.of(new DoFn<RabbitMqMessage,String>(){
              @PocessElement
               public  void  processElement (DoFn< RabbitMqMessage,
String>.ProcessContext c) {
          String data = c.element().getBody().toString();
          System.out.println("Reading Message from Queue" + data);
          c.output(data);
              }
    }
                ));


   p.run().waitUntilFinish();

      }
}


The message I'm putting into RabbitmQ queue is of XML format.
I'm guessing this has to with Serialization.


If someone has worked on RabbitMqMessage serialization or encountered this
problem before, any help would be greatly appreciated.

Regards
Hitesh













}

}

Reply via email to