In my Spark Streaming application I am reading data from certain Kafka topic. While reading from topic whenever I encounter certain message (for example: "poison") I want to stop the streaming. Currently I am achieving this using following code: jsc is instance of JavaStreamingContext and directStream is instance of JavaPairInputDStream.
/ LongAccumulator poisonNotifier = sc.longAccumulator("poisonNotifier"); directStream.foreachRDD(rdd -> { RDD<Row> rows = rdd.values().map(value -> { if (value.equals("poison") { poisonNotifier.add(1); } else { ... } return row; }).rdd(); }); jsc.start(); ExecutorService poisonMonitor = Executors.newSingleThreadExecutor(); poisonMonitor.execute(() -> { while (true) { if (poisonNotifier.value() > 0) { jsc.stop(false, true); break; } } }); try { jsc.awaitTermination(); } catch (InterruptedException e) { e.printStackTrace(); } poisonMonitor.shutdown();/ Although this approach is working, this doesn't sounds like right approach to me. Is there any other better(cleaner) way to achieve the same? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Stopping-spark-steaming-context-on-encountering-certain-type-of-message-on-Kafka-tp27822p27823.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org