Hi All, we are currently using direct streams to get the data from a kafka topic as followed
KafkaUtils.createDirectStream(ssc=self.streaming_context, topics=topics, kafkaParams=kafka_params, valueDecoder=message_decoder, messageHandler=message_handler) We would like to switch to to Structured Streaming approach such as self.spark_session \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", kafkaServers) \ .option("subscribe", self.topic_id) \ .option("auto.offset.reset", self.msgoffset)\ .load() I was wondering how I can apply the existing message_decoder and message_handler functions to the message stream? Thank you, Ali -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org