Hi all, I create a dataframe, convert it to Avro with to_avro and write it to Kafka. Then I read it back out with from_avro. (Not using Schema Registry.) The problem is that the values skip every other field in the result.
I expect: +---------+--------+-----+-------+ |firstName|lastName|color| mood| +---------+--------+-----+-------+ | Suzy | Samson | indigo | grim | | Jim | Johnson | blue | grimmer | +---------+--------+-----+-------+ Instead I get: +---------+--------+-----+-------+ |firstName|lastName|color| mood| +---------+--------+-----+-------+ | | Suzy| | Samson| | | Jim| |Johnson| +---------+--------+-----+-------+ Here's what I'm doing -- $ kt admin -createtopic persons-avro-spark9 -topicdetail <(jsonify =NumPartitions 1 =ReplicationFactor 1) $ cat person.avsc { "type": "record", "name": "Person", "namespace": "com.ippontech.kafkatutorials", "fields": [ { "name": "firstName", "type": "string" }, { "name": "lastName", "type": "string" }, { "name": "color", "type": "string" }, { "name": "mood", "type": "string" } ] $ spark-shell --packages org.apache.spark:spark-avro_2.11:2.4.5,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5 scala> :paste // Entering paste mode (ctrl-D to finish) import org.apache.spark.sql.avro._ import java.nio.file.Files; import java.nio.file.Paths; val topic = "persons-avro-spark9" // `from_avro` requires Avro schema in JSON string format. val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("person.avsc"))) val personDF = sc.parallelize(Seq( ("Jim","Johnson","indigo","grim"), ("Suzy","Samson","blue","grimmer") )).toDF("firstName","lastName","color","mood") personDF.select(to_avro(struct(personDF.columns.map(column):_*)).alias("value")) .write .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("topic",topic) .option("avroSchema",jsonFormatSchema) .save() val stream = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", topic) .option("startingOffsets", "earliest") .load() .select(from_avro('value, jsonFormatSchema) as 'person) .select($"person.firstName",$"person.lastName",$"person.color",$"person.mood") .writeStream .format("console") .start() // Exiting paste mode, now interpreting. import org.apache.spark.sql.avro._ import java.nio.file.Files import java.nio.file.Paths topic: String = persons-avro-spark9 jsonFormatSchema: String = { "type": "record", "name": "Person", "namespace": "com.ippontech.kafkatutorials", "fields": [ { "name": "firstName", "type": "string" }, { "name": "lastName", "type": "string" }, { "name": "color", "type": "string" }, { "name": "mood", "type": "string" } ] } personDF: org.apache.spark.sql.DataFrame = [firstName: string, lastName: string ... 2 more fields] stream: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@3990c36c scala> ------------------------------------------- Batch: 0 ------------------------------------------- +---------+--------+-----+-------+ |firstName|lastName|color| mood| +---------+--------+-----+-------+ | | Suzy| | Samson| | | Jim| |Johnson| +---------+--------+-----+-------+ See the raw bytes: $ kt consume -topic persons-avro-spark9 { "partition": 0, "offset": 0, "key": null, "value": "\u0000\u0008Suzy\u0000\u000cSamson\u0000\u0008blue\u0000\u000egrimmer", "timestamp": "2020-05-12T17:18:53.858-04:00" } { "partition": 0, "offset": 1, "key": null, "value": "\u0000\u0006Jim\u0000\u000eJohnson\u0000\u000cindigo\u0000\u0008grim", "timestamp": "2020-05-12T17:18:53.859-04:00" } Thanks, Alex.