The problem is that i don't see any connections in the rabbitMQ log. I don't even see any attempts to connect to the Server. Usually I should see at least some failed tries. I post the complete program perhaps i do have an error somewhere there.
package com.example.rabbitMQ; import org.apache.hadoop.conf.Configuration; import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator; import com.datatorrent.api.annotation.ApplicationAnnotation; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.DAG; import com.datatorrent.lib.io.jms.JMSStringInputOperator; import com.datatorrent.lib.io.ConsoleOutputOperator; @ApplicationAnnotation(name="RabbitMQ2HDFS") public class RabbitMQApplication implements StreamingApplication { @Override public void populateDAG(DAG dag, Configuration conf) { RabbitMQInputOperator in = dag.addOperator("rabbitInput",new RabbitMQInputOperator()); in.setHost("localhost"); in.setExchange("apex"); in.setExchangeType("fanout"); in.setRoutingKey("rktest"); ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator()); dag.addStream("rand_console",in.outputPort, console.input); } } I'm really at an end here. There are no errors in any log file but the stream is also not connecting for some reason I can't understand. This is the client library I'm using. <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.1.0</version> </dependency> Cheers Manfred. Am 26.06.2017 um 17:34 schrieb vikram patil: > If you have used any routing_key , please specify that using > in.setRoutingKey() . I dont see that one in your code. > > On Mon, Jun 26, 2017 at 9:00 PM, <a...@x5h.eu <mailto:a...@x5h.eu>> wrote: > > I get no exception in the apex.log and yes the queue is durable. > > vhost: / > name: task > auto_delete: False > backing_queue_status.avg_ack_egress_rate: 0.0 > backing_queue_status.avg_ack_ingress_rate: 0.0 > backing_queue_status.avg_egress_rate: 0.0 > backing_queue_status.avg_ingress_rate: 0.5866956420847993 > backing_queue_status.delta: ["delta", "undefined", > 0, 0, "undefined"] > backing_queue_status.len: 31 > backing_queue_status.mode: default > backing_queue_status.next_seq_id: 31 > backing_queue_status.q1: 0 > backing_queue_status.q2: 0 > backing_queue_status.q3: 0 > backing_queue_status.q4: 31 > backing_queue_status.target_ram_count: infinity > consumer_utilisation: None > consumers: 0 > durable: True > exclusive: False > > The goal here is to connect to the RabbitMQ and fetch messages and > write them to the console. I send the messages via a script or > directly via the rabbitmqadmin console. Any Ideas why the program > does not read from the rabbitmq? > > Cheers Manfred. > > > > Am 26.06.2017 um 17:14 schrieb vikram patil: >> Hi Manfred, >> >> Are you getting any exception in the logs ? Check if your queue >> is durable. >> >> Thanks & Regards, >> Vikram >> >> On Mon, Jun 26, 2017 at 8:37 PM, <a...@x5h.eu >> <mailto:a...@x5h.eu>> wrote: >> >> I have a problem getting the connection working with RabbitMQ: >> >> I host the RabbitMQ on the same server the apex application >> is running. >> >> +--------------------+---------+ >> | name | type | >> +--------------------+---------+ >> | apex | fanout | >> +--------------------+---------+ >> >> +------+----------+ >> | name | messages | >> +------+----------+ >> | task | 31 | >> +------+----------+ >> >> In the program for test issues I declare it this way: >> >> @Override >> public void populateDAG(DAG dag, Configuration conf) >> { >> RabbitMQInputOperator in = >> dag.addOperator("rabbitInput",new RabbitMQInputOperator()); >> in.setHost("localhost"); >> in.setExchange("apex"); >> in.setExchangeType("fanout"); >> in.setQueueName("task"); >> ConsoleOutputOperator console = >> dag.addOperator("console", new ConsoleOutputOperator()); >> dag.addStream("rand_console",in.outputPort, console.input); >> } >> >> But a look at the operators shows that it does not fetch any >> messages: >> >> { >> "id": "1", >> "name": "rabbitInput", >> "className": >> "com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator", >> "container": null, >> "host": null, >> "totalTuplesProcessed": "0", >> "totalTuplesEmitted": "0", >> "tuplesProcessedPSMA": "0", >> "tuplesEmittedPSMA": "0", >> "cpuPercentageMA": "0.0", >> "latencyMA": "0", >> "status": "PENDING_DEPLOY", >> "lastHeartbeat": "0", >> "failureCount": "0", >> "recoveryWindowId": "0", >> "currentWindowId": "0", >> "ports": [], >> "unifierClass": null, >> "logicalName": "rabbitInput", >> "recordingId": null, >> "counters": null, >> "metrics": null, >> "checkpointStartTime": "0", >> "checkpointTime": "0", >> "checkpointTimeMA": "0" >> }, >> >> What am I doing wrong here? Since i can configure the >> RAbbitMQ side is there a preferred way of configuration for >> apex? >> >> Cheers >> >> Manfred. >> >> >> > >