Hi Daniel, Several things: 1) Your error seems to suggest you're using a different version of Spark and a different version of the sql-kafka connector. Could you make sure they are on the same Spark version? 2) With Structured Streaming, you may remove everything related to a StreamingContext.
val sparkConf = new SparkConf().setMaster(master).setAppName(appName) sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true") val ssc = new StreamingContext(sparkConf, batchDuration) ssc.checkpoint(checkpointDir) ssc.remember(Minutes(1)) These lines are not doing anything for Structured Streaming. Best, Burak On Thu, Nov 2, 2017 at 11:36 AM, Daniel de Oliveira Mantovani < daniel.oliveira.mantov...@gmail.com> wrote: > Hello, I'm trying to run the following code, > > var newContextCreated = false // Flag to detect whether new context was > created or not > val kafkaBrokers = "localhost:9092" // comma separated list of broker:host > > private val batchDuration: Duration = Seconds(3) > private val master: String = "local[2]" > private val appName: String = this.getClass().getSimpleName() > private val checkpointDir: String = "/tmp/spark-streaming-amqp-tests" > > // Create a Spark configuration > > val sparkConf = new SparkConf().setMaster(master).setAppName(appName) > sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true") > > val ssc = new StreamingContext(sparkConf, batchDuration) > ssc.checkpoint(checkpointDir) > ssc.remember(Minutes(1)) // To make sure data is not deleted by the time we > query it interactively > > val spark = SparkSession > .builder > .config(sparkConf) > .getOrCreate() > > val lines = spark > .readStream > .format("kafka") > .option("kafka.bootstrap.servers", "localhost:9092") > .option("subscribe", "evil_queue") > .load() > > lines.printSchema() > > import spark.implicits._ > val noAggDF = lines.select("key") > > noAggDF > .writeStream > .format("console") > .start() > > > But I'm having the error: > > http://paste.scsys.co.uk/565658 > > > How do I get my messages using kafka as format from Structured Streaming ? > > > Thank you > > > -- > > -- > Daniel de Oliveira Mantovani > Perl Evangelist/Data Hacker > +1 786 459 1341 <(786)%20459-1341> >