Hi, I am not successful when using using spark 2.1 with Kafka 0.9, can anyone please share the code snippet to use it.
val sparkSession: SparkSession = runMode match { case "local" => SparkSession.builder.config(sparkConfig).getOrCreate case "yarn" => SparkSession.builder.config(sparkConfig).enableHiveSupport.getOrCreate } val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(20)) println("streamingContext --------->"+streamingContext) streamingContext.checkpoint(config.getString(Constants.Properties.CheckPointDir)) println("topics --------->"+config.getString(Constants.Properties.KafkaBrokerList)) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsSet) with above code , job gets aborted. ------------------------------------ I used code snippet of 0.10 too but no luck. val streamingContext = new StreamingContext(sparkConfig, Seconds(20)) println("streamingContext --------->"+streamingContext) streamingContext.checkpoint(config.getString(Constants.Properties.CheckPointDir)) println("topics --------->"+config.getString(Constants.Properties.KafkaBrokerList)) //val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsSet) val messages = KafkaUtils.createDirectStream[String, String]( streamingContext, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)) any suggestions on how to use Spark2.1 with Kafka streaming ? Thanks, Asmath