I have written a unit test which uses multithreading to start and stop Sparkstreamingjob and kafkaproducer. All the dependencies have been declared in maven pom.xml file.
When i run the test, once the all the kafka messages are read and the threads are stopped i continue to get the below exception 2017-08-19 17:08:16,783 INFO [Executor task launch worker-0- SendThread(127.0.0.1:64040)] zookeeper.ClientCnxn (ClientCnxn.java:logStartConnect(1032)) - Opening socket connection to server 127.0.0.1/127.0.0.1:64040. Will not attempt to authenticate using SASL (unknown error) 2017-08-19 17:08:17,786 WARN [Executor task launch worker-0- SendThread(127.0.0.1:64040)] zookeeper.ClientCnxn (ClientCnxn.java:run(1162)) - Session 0x15dfb08227f0001 for server null, unexpected error, closing socket connection and attempting reconnect java.net.ConnectException: Connection refused: no further information at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361) at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141) The code is as below : @Test public void someKafkaTest() { try { //Thread controlling the Spark streaming Thread sparkStreamerThread = new Thread( new SparkStreamingJSonJob(new String[] { zookeeperConnect, "my-consumer-group", "test", "1" }), "spark-streaming"); sparkStreamerThread.start(); Thread.sleep(10000); //Thread to start the producer Thread producerThread = new Thread(new KafkaJSonProducer(), "producer"); producerThread.start(); //current kafkaTest thread to sleep for 1 second Thread.sleep(60000); producerThread.stop(); int sparkAccVal = SparkStreamingJSonJob.getAccumulator().intValue(); System.out.println("Spark Throughput value : " + sparkAccVal/60); while (sparkStreamerThread.isAlive()) sparkStreamerThread.stop(); } catch (Exception e) { e.printStackTrace(); Assert.fail(); } } I feel that the streamingjob continues to run even after stopping the thread. Please help me on this. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkStreaming-connection-exception-tp29103.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org