I have created Jira for this improvement . https://issues.apache.org/jira/browse/APEXMALHAR-2509
-Vikram On Mon, Jun 12, 2017 at 1:47 PM, Mohit Jotwani <mo...@datatorrent.com> wrote: > +1 - The queue_name, host, port should be sufficient to start reading the > messages. > > Regards, > Mohit > > > > On Fri, Jun 9, 2017 at 5:47 PM, Shubham Pathak <shub...@datatorrent.com> > wrote: > > > +1 for the improvements > > This way the it will become easier for a new user to try out the operator > > as well. > > > > Thanks, > > Shubham Pathak > > > > On Fri, Jun 9, 2017 at 2:01 PM, vikram patil <patilvik...@gmail.com> > > wrote: > > > > > Hello All, > > > > > > While helping one of the Apex User, I found that current > > > AbstractRabbitMQInputOperator can be made simpler to use . After > > > investigation, I would like to suggest some improvements as below. If > > there > > > are more improvements needed, please suggest and I will incorporate > those > > > suggestions to create Jira ticket. > > > > > > In current code for AbstractRabbitMQInputOperator exchange, > exchangeType > > > are made NotNull which doesn't allow app to be launched without > > specifying > > > these values. For an input Operator, we are actually trying to create > > > queues and exchanges with specified values. But it leads to conflict in > > > some scenarios when default exchange is used for the queue as well when > > > queue type is transient. To consume from rabbitmq, operator need to use > > > only QueueName, host and port of rabbitmq. Similar to > KafkaInputOperator > > > we can let the operator fail if QueueName is not specified and let > > > developer correct an application or specified it from configuration. > > > > > > *Suggested Improvements:* > > > > > > 1) Drop requirements to specify exchange and its type . > > > 2) We should not be attempting to create queue in Input Operator. For > > > consumer only queue name is sufficient to start consuming data from > > queue. > > > 3) Currently queue name is optional, we should make it mandatory > instead > > of > > > creating queue. > > > > > > I tried out following scenarios to test existing operator . > > > > > > *Scenarios with default exchange:* > > > *1) Queue is already created as non-durable with default exchange* > > > *Setup:* > > > rabbitMQInputOperator.setQueueName("test_2"); > > > rabbitMQInputOperator.setExchangeType("fanout"); > > > rabbitMQInputOperator.setExchange(""); > > > rabbitMQInputOperator.setHost("localhost"); > > > rabbitMQInputOperator.setPort(5672); > > > > > > *Exception:* > > > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; > > > protocol method: #method<channel.close>(reply-code=403, > > > reply-text=ACCESS_REFUSED - operation not permitted on the default > > > exchange, class-id=40, method-id=10) > > > at com.rabbitmq.utility.ValueOrException.getValue( > > > ValueOrException.java:66) > > > at > > > com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue( > > > BlockingValueOrException.java:36) > > > at > > > com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation. > > > getReply(AMQChannel.java:398) > > > at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:244) > > > at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc( > > AMQChannel.java:128 > > > > > > *Reason:* > > > Default exchange is specified in the code as “” . Though we are trying > to > > > consume from specified and already created queue, operator crashes as > > > exchangeDeclare() call fails . > > > > > > *2)Queue is not created before launching an app * > > > > > > *Setup:* > > > rabbitMQInputOperator.setQueueName("test"); > > > rabbitMQInputOperator.setExchangeType("fanout"); > > > rabbitMQInputOperator.setExchange(""); > > > rabbitMQInputOperator.setHost("localhost"); > > > rabbitMQInputOperator.setPort(5672); > > > > > > *Exception:* > > > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; > > > protocol method: #method<channel.close>(reply-code=403, > > > reply-text=ACCESS_REFUSED - operation not permitted on the default > > > exchange, class-id=40, method-id=10) > > > at com.rabbitmq.utility.ValueOrException.getValue( > > > ValueOrException.java:66) > > > at > > > com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue( > > > BlockingValueOrException.java:36) > > > at > > > com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation. > > > getReply(AMQChannel.java:398) > > > at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:244) > > > at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc( > > AMQChannel.java:128 > > > > > > *Reason:* > > > Current operator failed to create default exchange . > > > > > > *Scnenarios with Custom Exchanges:* > > > > > > *1)Queue “test_2” created as non-durable with exchange “new_exchange” > in > > > rabbitmq* > > > *Setup:* > > > rabbitMQInputOperator.setQueueName("test_2"); > > > rabbitMQInputOperator.setExchangeType("fanout"); > > > rabbitMQInputOperator.setExchange("new_exchange"); > > > rabbitMQInputOperator.setHost("localhost"); > > > rabbitMQInputOperator.setPort(5672); > > > > > > *Exception:* > > > Exception Caused: due to mismatch in param while declaring queue as > > > “durable” > > > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; > > > protocol method: #method<channel.close>(reply-code=406, > > > reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for > exchange > > > 'new_exchange' in vhost '/': received 'false' but current is 'true', > > > class-id=40, method-id=10) > > > *Reason:* > > > Existing queue was transient ( non-durable ) while code tries to create > > > durable queue with the same name. > > > > > > *2) When Queue and exchange are not created ( declared ) in rabbitmq* > > > *Setup:* > > > No queue and exchange are present in rabbitmq > > > App with following configuration ends up creating durable queue > “test_2” > > > in new exchange “new_exchange” as “fanout” type and routing key as “”. > > > > > > rabbitMQInputOperator.setQueueName("test_2"); > > > rabbitMQInputOperator.setExchangeType("fanout"); > > > rabbitMQInputOperator.setExchange("new_exchange"); > > > rabbitMQInputOperator.setHost("localhost"); > > > rabbitMQInputOperator.setPort(5672); > > > > > > *Result:* > > > Now external entities can pushed data to newly created exchange and > queue > > > *3) Queue and exchanges are already created rabbitmq with queue as > > durable > > > and exchange and exchange Type as specified specified. * > > > > > > *Setup:* > > > > > > rabbitMQInputOperator.setQueueName("test"); > > > rabbitMQInputOperator.setExchangeType("fanout"); > > > rabbitMQInputOperator.setExchange("new_exchange"); > > > rabbitMQInputOperator.setRoutingKey("test"); > > > rabbitMQInputOperator.setHost("localhost"); > > > rabbitMQInputOperator.setPort(5672); > > > *Result:* > > > Worked with no issues. But it demands that it durable queue has to be > > > created with proper exchange and routing key > > > > > > *4) Queuename not specified in an app but exchange and exchangeType > > > specified for Operator * > > > > > > *Setup:* > > > rabbitMQInputOperator.setExchangeType("fanout"); > > > rabbitMQInputOperator.setExchange("new_exchange"); > > > rabbitMQInputOperator.setHost("localhost"); > > > rabbitMQInputOperator.setRoutingKey("test"); > > > rabbitMQInputOperator.setPort(5672); > > > > > > *Result:* > > > Operator ended up creating queue with unique names such as > > > “amq.gen-dHDsywLO-8eV8qZM4Q4T_w” which gets auto deleted when last > > > consumer stops consuming from it. > > > > > > > > > Thanks & Regards, > > > Vikram > > > > > > > > > -- > > Regards, > > ___________________________________________________ > > *Mohit Jotwani* > > Product Manager > > E: mo...@datatorrent.com | M: +91 97699 62740 > > www.datatorrent.com | apex.apache.org >