I tested a bit further and got this fascinating result:

 1. It works in local mode
 2. It does not work when deployed in the hadoop cluster (There are no
    errors in the yarn log files)

Any hints why it does not throw an error but does not work either? Are
there other possible places then the yarn logs where i can dig for an
error?



Am 27.06.2017 um 11:44 schrieb a...@x5h.eu:
>
> The problem is that i don't see any connections in the rabbitMQ log. I
> don't even see any attempts to connect to the Server. Usually I should
> see at least some failed tries. I post the complete program perhaps i
> do have an error somewhere there.
>
> package com.example.rabbitMQ;
>
> import org.apache.hadoop.conf.Configuration;
>
> import com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator;
> import com.datatorrent.api.annotation.ApplicationAnnotation;
> import com.datatorrent.api.StreamingApplication;
> import com.datatorrent.api.DAG;
> import com.datatorrent.lib.io.jms.JMSStringInputOperator;
> import com.datatorrent.lib.io.ConsoleOutputOperator;
>
>
> @ApplicationAnnotation(name="RabbitMQ2HDFS")
> public class RabbitMQApplication implements StreamingApplication
> {
>
>   @Override
>   public void populateDAG(DAG dag, Configuration conf)
>   {
>     RabbitMQInputOperator in = dag.addOperator("rabbitInput",new
> RabbitMQInputOperator());
>     in.setHost("localhost");
>     in.setExchange("apex");
>     in.setExchangeType("fanout");
>     in.setRoutingKey("rktest");
>     ConsoleOutputOperator console = dag.addOperator("console", new
> ConsoleOutputOperator());
>     dag.addStream("rand_console",in.outputPort, console.input);
> }
> }
>
> I'm really at an end here. There are no errors in any log file but the
> stream is also not connecting for some reason I can't understand. This
> is the client library I'm using.
> <dependency>
>     <groupId>com.rabbitmq</groupId>
>     <artifactId>amqp-client</artifactId>
>     <version>4.1.0</version>
>   </dependency>
>  
> Cheers
> Manfred.
>
> Am 26.06.2017 um 17:34 schrieb vikram patil:
>> If you have used any routing_key , please specify that using
>> in.setRoutingKey() . I dont see that one in your code.
>>
>> On Mon, Jun 26, 2017 at 9:00 PM, <a...@x5h.eu <mailto:a...@x5h.eu>>
>> wrote:
>>
>>     I get no exception in the apex.log and yes the queue is durable.
>>
>>                                         vhost: /
>>                                          name: task
>>                                   auto_delete: False
>>      backing_queue_status.avg_ack_egress_rate: 0.0
>>     backing_queue_status.avg_ack_ingress_rate: 0.0
>>          backing_queue_status.avg_egress_rate: 0.0
>>         backing_queue_status.avg_ingress_rate: 0.5866956420847993
>>                    backing_queue_status.delta: ["delta", "undefined",
>>     0, 0, "undefined"]
>>                      backing_queue_status.len: 31
>>                     backing_queue_status.mode: default
>>              backing_queue_status.next_seq_id: 31
>>                       backing_queue_status.q1: 0
>>                       backing_queue_status.q2: 0
>>                       backing_queue_status.q3: 0
>>                       backing_queue_status.q4: 31
>>         backing_queue_status.target_ram_count: infinity
>>                          consumer_utilisation: None
>>                                     consumers: 0
>>                                       durable: True
>>                                     exclusive: False
>>
>>     The goal here is to connect to the RabbitMQ and fetch messages
>>     and write them to the console. I send the messages via a script
>>     or directly via the rabbitmqadmin console. Any Ideas why the
>>     program does not read from the rabbitmq?
>>
>>     Cheers Manfred.
>>
>>
>>
>>     Am 26.06.2017 um 17:14 schrieb vikram patil:
>>>     Hi Manfred,
>>>
>>>     Are you getting any exception in the logs ?  Check if your queue
>>>     is durable.  
>>>
>>>     Thanks & Regards,
>>>     Vikram
>>>
>>>     On Mon, Jun 26, 2017 at 8:37 PM, <a...@x5h.eu
>>>     <mailto:a...@x5h.eu>> wrote:
>>>
>>>         I have a problem getting the connection working with RabbitMQ:
>>>
>>>         I host the RabbitMQ on the same server the apex application
>>>         is running.
>>>
>>>         +--------------------+---------+
>>>         |        name        |  type   |
>>>         +--------------------+---------+
>>>         | apex               | fanout  |
>>>         +--------------------+---------+
>>>
>>>         +------+----------+
>>>         | name | messages |
>>>         +------+----------+
>>>         | task | 31       |
>>>         +------+----------+
>>>
>>>         In the program for test issues I declare it this way:
>>>
>>>          @Override
>>>           public void populateDAG(DAG dag, Configuration conf)
>>>           {
>>>             RabbitMQInputOperator in =
>>>         dag.addOperator("rabbitInput",new RabbitMQInputOperator());
>>>             in.setHost("localhost");
>>>             in.setExchange("apex");
>>>             in.setExchangeType("fanout");
>>>             in.setQueueName("task");
>>>             ConsoleOutputOperator console =
>>>         dag.addOperator("console", new ConsoleOutputOperator());
>>>             dag.addStream("rand_console",in.outputPort, console.input);
>>>         }
>>>
>>>         But a look at the operators shows that it does not fetch any
>>>         messages:
>>>
>>>          {
>>>             "id": "1",
>>>             "name": "rabbitInput",
>>>             "className":
>>>         "com.datatorrent.contrib.rabbitmq.RabbitMQInputOperator",
>>>             "container": null,
>>>             "host": null,
>>>             "totalTuplesProcessed": "0",
>>>             "totalTuplesEmitted": "0",
>>>             "tuplesProcessedPSMA": "0",
>>>             "tuplesEmittedPSMA": "0",
>>>             "cpuPercentageMA": "0.0",
>>>             "latencyMA": "0",
>>>             "status": "PENDING_DEPLOY",
>>>             "lastHeartbeat": "0",
>>>             "failureCount": "0",
>>>             "recoveryWindowId": "0",
>>>             "currentWindowId": "0",
>>>             "ports": [],
>>>             "unifierClass": null,
>>>             "logicalName": "rabbitInput",
>>>             "recordingId": null,
>>>             "counters": null,
>>>             "metrics": null,
>>>             "checkpointStartTime": "0",
>>>             "checkpointTime": "0",
>>>             "checkpointTimeMA": "0"
>>>           },
>>>
>>>         What am I doing wrong here? Since i can configure the
>>>         RAbbitMQ side is there a preferred way of configuration for
>>>         apex?
>>>
>>>         Cheers
>>>
>>>         Manfred.
>>>
>>>
>>>
>>
>>
>

Reply via email to