In my Spark Streaming application I am reading data from certain Kafka topic. While reading from topic whenever I encounter certain message (for example: "poison") I want to stop the streaming. Currently I am achieving this using following code: jsc is instance of JavaStreamingContext and directStream is instance of JavaPairInputDStream./LongAccumulator poisonNotifier = sc.longAccumulator("poisonNotifier");directStream.foreachRDD(rdd -> { RDD rows = rdd.values().map(value -> { if (value.equals("poison") { poisonNotifier.add(1); } else { ... } return row; }).rdd(); });jsc.start();ExecutorService poisonMonitor = Executors.newSingleThreadExecutor();poisonMonitor.execute(() -> { while (true) { if (poisonNotifier.value() > 0) { jsc.stop(false, true); break; } }});try { jsc.awaitTermination();} catch (InterruptedException e) { e.printStackTrace();}poisonMonitor.shutdown();/Although this approach is working, this doesn't sounds like right approach to me. Is there any other better(cleaner) way to achieve the same?
-- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Stopping-spark-steaming-context-on-encountering-certain-type-of-message-on-Kafka-tp27822.html Sent from the Apache Spark User List mailing list archive at Nabble.com.