Hi All, I am trying to read messages from Kafka, deserialize the values using Avro and then convert the JSON content to a DF. I would like to see a dataframe like the following for a Kafka message value like {"a": "1" , "b": "1"}: +---------------+ |a | b | +---------------+ |1 | 2 | +---------------+
I have written the following code which reads messages from a Kafka topic. Then the messages are deserialized with confluent AVRO. Next, the code coverts the Avro Records to JSON format and finally the code converts the JSON messages to Map Objects. The output of this code is just a column "value" with a list of Byte arrays see output below. How should I change this code to Map each field, value of the JSON object to a column of the DF. Thank you, Ali Code: private val cachedSchemaRegistryClient = new CachedSchemaRegistryClient(SCHEMA_REGISTRY_URL, 20) private val kafkaAvroDeserializer = new KafkaAvroDeserializer(cachedSchemaRegistryClient) implicit val mapEncoder = Encoders.kryo[Map[String, Any]] def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .master("local[2]") .appName("Spark Consumption") .getOrCreate() val kafkaDf = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", KAFKA_BROKERS) .option("subscribe", TOPIC_ID) // .option("checkpointLocation", "s3://checkpoint") .load() val messageDf = kafkaDf .map(record =>{ val avroMsg = record.get(1).asInstanceOf[Array[Byte]] val msgObj = kafkaAvroDeserializer.deserialize("topic", avroMsg).asInstanceOf[GenericData.Record ] val result = JSON.parseFull(msgObj.toString) result match { // Matches if jsonStr is valid JSON and represents a Map of Strings to Any case Some(map: Map[String, Any]) => { map } case None => null } }) .writeStream .format("console") // <-- use ConsoleSink .option("truncate", "false") .option("numRows", 10) .start.awaitTermination Output: +-------------------------------------------------------------------------------------------------------------------------+ |value | +-------------------------------------------------------------------------------------------------------------------------+ |[374 69 6E 61 74 69 669 66 64 3A 20 53 65 61 72 63 68 20 45 6E 67 69 6E 65 73 20 26 20 50 6F 72 74 ]| |[37 01 20 40 01 03 01 64 65 76 69 63 65 5F 6E 61 6D E5 00 40 01 03 01 66 6C 6F 77 5F 73 74 61 72 74 ]| |... +-------------------------------------------------------------------------------------------------------------------------+ only showing top 10 rows -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org