In my Spark Streaming application I am reading data from certain Kafka topic.
While reading from topic whenever I encounter certain message (for example:
"poison") I want to stop the streaming. Currently I am achieving this using
following code:  jsc is instance of JavaStreamingContext and directStream is
instance of JavaPairInputDStream.

/
LongAccumulator poisonNotifier = sc.longAccumulator("poisonNotifier");

directStream.foreachRDD(rdd -> {
            RDD<Row> rows = rdd.values().map(value -> {              
                if (value.equals("poison") {
                    poisonNotifier.add(1);
                } else {
                    ... 
                }
                return row;
            }).rdd();
        });

jsc.start();
ExecutorService poisonMonitor = Executors.newSingleThreadExecutor();
poisonMonitor.execute(() -> {
    while (true) {
        if (poisonNotifier.value() > 0) {
            jsc.stop(false, true);
            break;
        }
    }
});
try {
    jsc.awaitTermination();
} catch (InterruptedException e) {
    e.printStackTrace();
}
poisonMonitor.shutdown();/

Although this approach is working, this doesn't sounds like right approach
to me. Is there any other better(cleaner) way to achieve the same?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Stopping-spark-steaming-context-on-encountering-certain-type-of-message-on-Kafka-tp27822p27823.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to