The stack trace is clear enough: Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'hello1' in vhost '/': received 'true' but current is 'false', class-id=50, method-id=10)
Regards Sab On 19-Nov-2015 2:32 pm, "D" <subharaj.ma...@gmail.com> wrote: > I am trying to write a simple "Hello World" kind of application using > spark streaming and RabbitMq, in which Apache Spark Streaming will read > message from RabbitMq via the RabbitMqReceiver > <https://github.com/Stratio/rabbitmq-receiver> and print it in the > console. But some how I am not able to print the string read from Rabbit Mq > into console. The spark streaming code is printing the message below:- > > Value Received BlockRDD[1] at ReceiverInputDStream at > RabbitMQInputDStream.scala:33 > Value Received BlockRDD[2] at ReceiverInputDStream at > RabbitMQInputDStream.scala:33 > > > The message is sent to the rabbitmq via the simple code below:- > > package helloWorld; > > import com.rabbitmq.client.Channel; > import com.rabbitmq.client.Connection; > import com.rabbitmq.client.ConnectionFactory; > > public class Send { > > private final static String QUEUE_NAME = "hello1"; > > public static void main(String[] argv) throws Exception { > ConnectionFactory factory = new ConnectionFactory(); > factory.setHost("localhost"); > Connection connection = factory.newConnection(); > Channel channel = connection.createChannel(); > > channel.queueDeclare(QUEUE_NAME, false, false, false, null); > String message = "Hello World! is a code. Hi Hello World!"; > channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); > System.out.println(" [x] Sent '" + message + "'"); > > channel.close(); > connection.close(); > } > } > > > I am trying to read messages via Apache Streaming as shown below:- > > package rabbitmq.example; > > import java.util.*; > > import org.apache.spark.SparkConf; > import org.apache.spark.api.java.JavaRDD; > import org.apache.spark.api.java.function.Function; > import org.apache.spark.streaming.Durations; > import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; > import org.apache.spark.streaming.api.java.JavaStreamingContext; > > import com.stratio.receiver.RabbitMQUtils; > > public class RabbitMqEx { > > public static void main(String[] args) { > System.out.println("Creating Spark Configuration"); > SparkConf conf = new SparkConf(); > conf.setAppName("RabbitMq Receiver Example"); > conf.setMaster("local[2]"); > > System.out.println("Retreiving Streaming Context from Spark > Conf"); > JavaStreamingContext streamCtx = new JavaStreamingContext(conf, > Durations.seconds(2)); > > Map<String, String>rabbitMqConParams = new HashMap<String, String>(); > rabbitMqConParams.put("host", "localhost"); > rabbitMqConParams.put("queueName", "hello1"); > System.out.println("Trying to connect to RabbitMq"); > JavaReceiverInputDStream<String> receiverStream = > RabbitMQUtils.createJavaStream(streamCtx, rabbitMqConParams); > receiverStream.foreachRDD(new Function<JavaRDD<String>, Void>() { > @Override > public Void call(JavaRDD<String> arg0) throws Exception { > System.out.println("Value Received " + arg0.toString()); > return null; > } > } ); > streamCtx.start(); > streamCtx.awaitTermination(); > } > } > > The output console only has message like the following:- > > Creating Spark Configuration > Retreiving Streaming Context from Spark Conf > Trying to connect to RabbitMq > Value Received BlockRDD[1] at ReceiverInputDStream at > RabbitMQInputDStream.scala:33 > Value Received BlockRDD[2] at ReceiverInputDStream at > RabbitMQInputDStream.scala:33 > > > In the logs I see the following:- > > 15/11/18 13:20:45 INFO SparkContext: Running Spark version 1.5.2 > 15/11/18 13:20:45 WARN NativeCodeLoader: Unable to load native-hadoop library > for your platform... using builtin-java classes where applicable > 15/11/18 13:20:45 WARN Utils: Your hostname, jabong1143 resolves to a > loopback address: 127.0.1.1; using 192.168.1.3 instead (on interface wlan0) > 15/11/18 13:20:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to > another address > 15/11/18 13:20:45 INFO SecurityManager: Changing view acls to: jabong > 15/11/18 13:20:45 INFO SecurityManager: Changing modify acls to: jabong > 15/11/18 13:20:45 INFO SecurityManager: SecurityManager: authentication > disabled; ui acls disabled; users with view permissions: Set(jabong); users > with modify permissions: Set(jabong) > 15/11/18 13:20:46 INFO Slf4jLogger: Slf4jLogger started > 15/11/18 13:20:46 INFO Remoting: Starting remoting > 15/11/18 13:20:46 INFO Remoting: Remoting started; listening on addresses > :[akka.tcp://sparkDriver@192.168.1.3:42978] > 15/11/18 13:20:46 INFO Utils: Successfully started service 'sparkDriver' on > port 42978. > 15/11/18 13:20:46 INFO SparkEnv: Registering MapOutputTracker > 15/11/18 13:20:46 INFO SparkEnv: Registering BlockManagerMaster > 15/11/18 13:20:46 INFO DiskBlockManager: Created local directory at > /tmp/blockmgr-9309b35f-a506-49dc-91ab-5c340cd3bdd1 > 15/11/18 13:20:46 INFO MemoryStore: MemoryStore started with capacity 947.7 MB > 15/11/18 13:20:46 INFO HttpFileServer: HTTP File server directory is > /tmp/spark-736f4b9c-764c-4b85-9b37-1cece102c95a/httpd-29196fa0-eb3f-4b7d-97ad-35c5325b09e5 > 15/11/18 13:20:46 INFO HttpServer: Starting HTTP Server > 15/11/18 13:20:46 INFO Utils: Successfully started service 'HTTP file server' > on port 37150. > 15/11/18 13:20:46 INFO SparkEnv: Registering OutputCommitCoordinator > 15/11/18 13:20:52 INFO Utils: Successfully started service 'SparkUI' on port > 4040. > 15/11/18 13:20:52 INFO SparkUI: Started SparkUI at http://192.168.1.3:4040 > 15/11/18 13:20:52 WARN MetricsSystem: Using default name DAGScheduler for > source because spark.app.id is not set. > 15/11/18 13:20:52 INFO Executor: Starting executor ID driver on host localhost > 15/11/18 13:20:52 INFO Utils: Successfully started service > 'org.apache.spark.network.netty.NettyBlockTransferService' on port 47306. > 15/11/18 13:20:52 INFO NettyBlockTransferService: Server created on 47306 > 15/11/18 13:20:52 INFO BlockManagerMaster: Trying to register BlockManager > 15/11/18 13:20:52 INFO BlockManagerMasterEndpoint: Registering block manager > localhost:47306 with 947.7 MB RAM, BlockManagerId(driver, localhost, 47306) > 15/11/18 13:20:52 INFO BlockManagerMaster: Registered BlockManager > Trying to connect to RabbitMq > 15/11/18 13:20:53 INFO ReceiverTracker: Starting 1 receivers > 15/11/18 13:20:53 INFO ReceiverTracker: ReceiverTracker started > 15/11/18 13:20:53 INFO ForEachDStream: metadataCleanupDelay = -1 > 15/11/18 13:20:53 INFO RabbitMQInputDStream: metadataCleanupDelay = -1 > 15/11/18 13:20:53 INFO RabbitMQInputDStream: Slide time = 2000 ms > 15/11/18 13:20:53 INFO RabbitMQInputDStream: Storage level = > StorageLevel(false, false, false, false, 1) > 15/11/18 13:20:53 INFO RabbitMQInputDStream: Checkpoint interval = null > 15/11/18 13:20:53 INFO RabbitMQInputDStream: Remember duration = 2000 ms > 15/11/18 13:20:53 INFO RabbitMQInputDStream: Initialized and validated > com.stratio.receiver.RabbitMQInputDStream@5d00adc2 > 15/11/18 13:20:53 INFO ForEachDStream: Slide time = 2000 ms > 15/11/18 13:20:53 INFO ForEachDStream: Storage level = StorageLevel(false, > false, false, false, 1) > 15/11/18 13:20:53 INFO ForEachDStream: Checkpoint interval = null > 15/11/18 13:20:53 INFO ForEachDStream: Remember duration = 2000 ms > 15/11/18 13:20:53 INFO ForEachDStream: Initialized and validated > org.apache.spark.streaming.dstream.ForEachDStream@4c132773 > 15/11/18 13:20:53 INFO RecurringTimer: Started timer for JobGenerator at time > 1447833054000 > 15/11/18 13:20:53 INFO JobGenerator: Started JobGenerator at 1447833054000 ms > 15/11/18 13:20:53 INFO JobScheduler: Started JobScheduler > 15/11/18 13:20:53 INFO StreamingContext: StreamingContext started > 15/11/18 13:20:53 INFO DAGScheduler: Got job 0 (start at RabbitMqEx.java:38) > with 1 output partitions > 15/11/18 13:20:53 INFO DAGScheduler: Final stage: ResultStage 0(start at > RabbitMqEx.java:38) > 15/11/18 13:20:53 INFO ReceiverTracker: Receiver 0 started > 15/11/18 13:20:53 INFO DAGScheduler: Parents of final stage: List() > 15/11/18 13:20:53 INFO DAGScheduler: Missing parents: List() > 15/11/18 13:20:53 INFO DAGScheduler: Submitting ResultStage 0 (Receiver 0 > ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:556), which has > no missing parents > 15/11/18 13:20:53 INFO MemoryStore: ensureFreeSpace(46496) called with > curMem=0, maxMem=993735475 > 15/11/18 13:20:53 INFO MemoryStore: Block broadcast_0 stored as values in > memory (estimated size 45.4 KB, free 947.7 MB) > 15/11/18 13:20:53 INFO MemoryStore: ensureFreeSpace(15206) called with > curMem=46496, maxMem=993735475 > 15/11/18 13:20:53 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes > in memory (estimated size 14.8 KB, free 947.6 MB) > 15/11/18 13:20:53 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory > on localhost:47306 (size: 14.8 KB, free: 947.7 MB) > 15/11/18 13:20:53 INFO SparkContext: Created broadcast 0 from broadcast at > DAGScheduler.scala:861 > 15/11/18 13:20:53 INFO DAGScheduler: Submitting 1 missing tasks from > ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at > ReceiverTracker.scala:556) > 15/11/18 13:20:53 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks > 15/11/18 13:20:53 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, > localhost, NODE_LOCAL, 2729 bytes) > 15/11/18 13:20:53 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) > 15/11/18 13:20:53 INFO RecurringTimer: Started timer for BlockGenerator at > time 1447833053800 > 15/11/18 13:20:53 INFO BlockGenerator: Started BlockGenerator > 15/11/18 13:20:53 INFO BlockGenerator: Started block pushing thread > 15/11/18 13:20:53 INFO ReceiverTracker: Registered receiver for stream 0 from > 192.168.1.3:42978 > 15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Starting receiver > 15/11/18 13:20:53 INFO RabbitMQReceiver: Rabbit host addresses are :localhost > 15/11/18 13:20:53 INFO RabbitMQReceiver: Address localhost > 15/11/18 13:20:53 INFO RabbitMQReceiver: creating new connection and channel > 15/11/18 13:20:53 INFO RabbitMQReceiver: No virtual host configured > 15/11/18 13:20:53 INFO RabbitMQReceiver: created new connection and channel > 15/11/18 13:20:53 INFO RabbitMQReceiver: onStart, Connecting.. > 15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Called receiver onStart > 15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Waiting for receiver to be > stopped > 15/11/18 13:20:53 INFO RabbitMQReceiver: declaring direct queue > 15/11/18 13:20:53 ERROR RabbitMQReceiver: Got this unknown exception: > java.io.IOException > java.io.IOException > at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) > at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) > at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) > at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:844) > at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61) > at > com.stratio.receiver.RabbitMQReceiver.getQueueName(RabbitMQInputDStream.scala:126) > at > com.stratio.receiver.RabbitMQReceiver.com$stratio$receiver$RabbitMQReceiver$$receive(RabbitMQInputDStream.scala:86) > at > com.stratio.receiver.RabbitMQReceiver$$anon$1.run(RabbitMQInputDStream.scala:69) > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; > protocol method: #method<channel.close>(reply-code=406, > reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue > 'hello1' in vhost '/': received 'true' but current is 'false', class-id=50, > method-id=10) > at > com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) > at > com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) > at > com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361) > at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226) > at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) > ... 5 more > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; > protocol method: #method<channel.close>(reply-code=406, > reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue > 'hello1' in vhost '/': received 'true' but current is 'false', class-id=50, > method-id=10) > at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:484) > at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:321) > at > com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144) > at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) > at > com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:554) > at java.lang.Thread.run(Thread.java:745) > 15/11/18 13:20:53 INFO RabbitMQReceiver: it has been stopped > 15/11/18 13:20:53 ERROR RabbitMQReceiver: error on close channel, ignoring > 15/11/18 13:20:53 WARN ReceiverSupervisorImpl: Restarting receiver with delay > 2000 ms: Trying to connect again > 15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Stopping receiver with > message: Restarting receiver with delay 2000ms: Trying to connect again: > 15/11/18 13:20:53 INFO RabbitMQReceiver: onStop, doing nothing.. relaxing... > 15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Called receiver onStop > 15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Deregistering receiver 0 > 15/11/18 13:20:53 ERROR ReceiverTracker: Deregistered receiver for stream 0: > Restarting receiver with delay 2000ms: Trying to connect again > 15/11/18 13:20:53 INFO ReceiverSupervisorImpl: Stopped receiver 0 > 15/11/18 13:20:54 INFO JobScheduler: Added jobs for time 1447833054000 ms > 15/11/18 13:20:54 INFO JobScheduler: Starting job streaming job 1447833054000 > ms.0 from job set of time 1447833054000 ms > Value Received BlockRDD[1] at ReceiverInputDStream at > RabbitMQInputDStream.scala:33 > 15/11/18 13:20:54 INFO JobScheduler: Finished job streaming job 1447833054000 > ms.0 from job set of time 1447833054000 ms > 15/11/18 13:20:54 INFO JobScheduler: Total delay: 0.031 s for time > 1447833054000 ms (execution: 0.007 s) > 15/11/18 13:20:54 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer() > 15/11/18 13:20:54 INFO InputInfoTracker: remove old batch metadata: > 15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Starting receiver again > 15/11/18 13:20:55 INFO ReceiverTracker: Registered receiver for stream 0 from > 192.168.1.3:42978 > 15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Starting receiver > 15/11/18 13:20:55 INFO RabbitMQReceiver: Rabbit host addresses are :localhost > 15/11/18 13:20:55 INFO RabbitMQReceiver: Address localhost > 15/11/18 13:20:55 INFO RabbitMQReceiver: creating new connection and channel > 15/11/18 13:20:55 INFO RabbitMQReceiver: No virtual host configured > 15/11/18 13:20:55 INFO RabbitMQReceiver: created new connection and channel > 15/11/18 13:20:55 INFO RabbitMQReceiver: onStart, Connecting.. > 15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Called receiver onStart > 15/11/18 13:20:55 INFO RabbitMQReceiver: declaring direct queue > 15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Receiver started again > 15/11/18 13:20:55 ERROR RabbitMQReceiver: Got this unknown exception: > java.io.IOException > java.io.IOException > at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) > at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) > at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) > at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:844) > at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61) > at > com.stratio.receiver.RabbitMQReceiver.getQueueName(RabbitMQInputDStream.scala:126) > at > com.stratio.receiver.RabbitMQReceiver.com$stratio$receiver$RabbitMQReceiver$$receive(RabbitMQInputDStream.scala:86) > at > com.stratio.receiver.RabbitMQReceiver$$anon$1.run(RabbitMQInputDStream.scala:69) > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; > protocol method: #method<channel.close>(reply-code=406, > reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue > 'hello1' in vhost '/': received 'true' but current is 'false', class-id=50, > method-id=10) > at > com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) > at > com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) > at > com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361) > at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226) > at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) > ... 5 more > Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; > protocol method: #method<channel.close>(reply-code=406, > reply-text=PRECONDITION_FAILED - inequivalent arg 'durable' for queue > 'hello1' in vhost '/': received 'true' but current is 'false', class-id=50, > method-id=10) > at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:484) > at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:321) > at > com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144) > at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) > at > com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:554) > at java.lang.Thread.run(Thread.java:745) > 15/11/18 13:20:55 INFO RabbitMQReceiver: it has been stopped > 15/11/18 13:20:55 ERROR RabbitMQReceiver: error on close channel, ignoring > 15/11/18 13:20:55 WARN ReceiverSupervisorImpl: Restarting receiver with delay > 2000 ms: Trying to connect again > 15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Stopping receiver with > message: Restarting receiver with delay 2000ms: Trying to connect again: > 15/11/18 13:20:55 INFO RabbitMQReceiver: onStop, doing nothing.. relaxing... > 15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Called receiver onStop > 15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Deregistering receiver 0 > 15/11/18 13:20:55 ERROR ReceiverTracker: Deregistered receiver for stream 0: > Restarting receiver with delay 2000ms: Trying to connect again > 15/11/18 13:20:55 INFO ReceiverSupervisorImpl: Stopped receiver 0 > 15/11/18 13:20:56 INFO JobScheduler: Added jobs for time 1447833056000 ms > 15/11/18 13:20:56 INFO JobScheduler: Starting job streaming job 1447833056000 > ms.0 from job set of time 1447833056000 ms > > > Doing list_queues list the following:- > > sudo rabbitmqctl list_queues > Listing queues ... > hello1 2 > > > I also printed the value of arg0.count. It is reporting 0. It seems spark > streaming is not able to read messages from rabbitmq. > > However I can read from the queue using a simple java receiver as > mentioned here > <https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/Recv.java> > . > > Environment > > - RabbitMq Version - 3.5.6 > - Spark 1.5.2 > - Java 8 (Update 66) > > Can some one let me know what is going wrong and how can I read message > from RabbitMq via Spark Streaming. > > Thanks, > D > > > > > > > >