Hello, see if this works, from the documentation:
// Subscribe to 1 topic, with headersval df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1") .option("includeHeaders", "true") .load()df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers") .as[(String, String, Map)] On Thu, 3 Dec 2020 at 18:22, <eugen.wintersber...@gmail.com> wrote: > Hi folks, > I am trying to read the message headers from a Kafka structured stream > which should be stored in a column named ``headers``. > I try something like this: > > val stream = sparkSession.readStream.format("kafka")......load() > > stream.map(row => { > > ... > > val headers = row.getAs[HeaderT]("headers") > > .... > > }) > > > My question is: what would be *HeaderT*? > > Thanks in advance > > Eugen >