I have a problem getting the connection working with RabbitMQ:

I host the RabbitMQ on the same server the apex application is running.

+--------------------+---------+
|        name        |  type   |
+--------------------+---------+
| apex               | fanout  |
+--------------------+---------+

+------+----------+
| name | messages |
+------+----------+
| task | 31       |
+------+----------+

In the program for test issues I declare it this way:

 @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    RabbitMQInputOperator in = dag.addOperator("rabbitInput",new
RabbitMQInputOperator());
    in.setHost("localhost");
    in.setExchange("apex");
    in.setExchangeType("fanout");
    in.setQueueName("task");
    ConsoleOutputOperator console = dag.addOperator("console", new
ConsoleOutputOperator());
    dag.addStream("rand_console",in.outputPort, console.input);
}

But a look at the operators shows that it does not fetch any messages:

 {
    "id": "1",
    "name": "rabbitInput",
    "className": "com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator",
    "container": null,
    "host": null,
    "totalTuplesProcessed": "0",
    "totalTuplesEmitted": "0",
    "tuplesProcessedPSMA": "0",
    "tuplesEmittedPSMA": "0",
    "cpuPercentageMA": "0.0",
    "latencyMA": "0",
    "status": "PENDING_DEPLOY",
    "lastHeartbeat": "0",
    "failureCount": "0",
    "recoveryWindowId": "0",
    "currentWindowId": "0",
    "ports": [],
    "unifierClass": null,
    "logicalName": "rabbitInput",
    "recordingId": null,
    "counters": null,
    "metrics": null,
    "checkpointStartTime": "0",
    "checkpointTime": "0",
    "checkpointTimeMA": "0"
  },

What am I doing wrong here? Since i can configure the RAbbitMQ side is
there a preferred way of configuration for apex?

Cheers

Manfred.


Reply via email to