Hi, If you want help I'd suggest copying the full code, you just shared the config part.
On the other hand, if you are doing a project *now* I'd also suggest using Structured Streaming <https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html>, I'm sure you would get better support than using DStreams. Regards. On Tue, 15 Jun 2021 at 08:39, Muhammed Favas < favas.muham...@expeedsoftware.com> wrote: > Hi All, > > > > Can some one help me how to resolve this?. It is very important to achieve > my project objective > > > > *Regards,* > > *Favas * > > > > *From:* Muhammed Favas > *Sent:* Thursday, June 10, 2021 15:26 PM > *To:* user@spark.apache.org > *Subject:* Apply window function on data consumed from Kafka topic > > > > Hi, > > > > I have a requirement to create a spark streaming job that > get data from kafka broker and need to apply window function on the data > coming into the spark context. > > This is how I connected to kafka from spark > > > > val kafkaParams = *Map*[String, Object]( > "bootstrap.servers" -> "<my-srvername>", > "key.deserializer" -> * classOf*[StringDeserializer], > "value.deserializer" -> * classOf*[StringDeserializer], > "group.id" -> "7b37787c-20e7-4614-98ba-6f4212e07bf0", > "auto.offset.reset" -> "earliest", > "enable.auto.commit" -> (true: java.lang.Boolean) > ) > > val topics = *Array*("7b37787c-20e7-4614-98ba-6f4212e07bf0") > val inputMsg = KafkaUtils.*createDirectStream*[String,String]( > ssc, > *PreferConsistent*, > *Subscribe*[String, String](topics, kafkaParams) > ) > > > > The variable “inputMsg” is of type > “InputDStream[ConsumerRecord[String,String]]” > > When I used window function “inputMsg.window(Minute(1))”, it throws an > error like below > > > > ERROR DirectKafkaInputDStream: Kafka ConsumerRecord is not serializable. > Use .map to extract fields before calling .persist or .window > > > > Can some one help me on how to use widow function on spark streaming using > kafka? > > > > *Regards,* > > *Favas * > > >