I have written a unit test which uses multithreading to start and stop
Sparkstreamingjob and kafkaproducer. All the dependencies have been declared
in maven pom.xml file.

When i run the test, once the all the kafka messages are read and the
threads are stopped i continue to get the below exception

 2017-08-19 17:08:16,783 INFO  [Executor task launch worker-0-
 SendThread(127.0.0.1:64040)] zookeeper.ClientCnxn 
(ClientCnxn.java:logStartConnect(1032)) - Opening socket connection to 
server 127.0.0.1/127.0.0.1:64040. Will not attempt to authenticate using 
SASL (unknown error)
2017-08-19 17:08:17,786 WARN  [Executor task launch worker-0-
SendThread(127.0.0.1:64040)] zookeeper.ClientCnxn
(ClientCnxn.java:run(1162)) - Session 0x15dfb08227f0001 for server null,
unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at
org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)

The code is as below :

@Test
    public void someKafkaTest() {

    try {


        //Thread controlling the Spark streaming
        Thread sparkStreamerThread = new Thread(
                new SparkStreamingJSonJob(new String[] { zookeeperConnect,
"my-consumer-group", "test", "1" }),
                "spark-streaming");
        sparkStreamerThread.start();


        Thread.sleep(10000);

        //Thread to start the producer
        Thread producerThread = new Thread(new KafkaJSonProducer(),
"producer");
        producerThread.start();

        //current kafkaTest thread to sleep for 1 second
        Thread.sleep(60000);

        producerThread.stop();

        int sparkAccVal = SparkStreamingJSonJob.getAccumulator().intValue();
        System.out.println("Spark Throughput value : " + sparkAccVal/60);

        while (sparkStreamerThread.isAlive())
            sparkStreamerThread.stop();
       } catch (Exception e) {
        e.printStackTrace();
        Assert.fail();
    }

}

I feel that the streamingjob continues to run even after stopping the
thread. Please help me on this.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkStreaming-connection-exception-tp29103.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to