I am trying to convert avro records with field type = bytes to json string using Structured Streaming in Spark 2.1. Please see below.
object AvroConvert { case class KafkaMessage( payload: String ) val schemaString = """{ "type" : "record", "name" : "HdfsEvent", "namespace" : "com.abc.def.domain.hdfs", "fields" : [ { "name" : "payload", "type" : { "type" : "bytes", "java-class" : "[B" } } ] }""" val messageSchema = new Schema.Parser().parse(schemaString) val reader = new GenericDatumReader[GenericRecord](messageSchema) // Binary decoder val decoder = DecoderFactory.get() // Register implicit encoder for map operation implicit val encoder: Encoder[GenericRecord] = org.apache.spark.sql.Encoders.kryo[GenericRecord] def main(args: Array[String]) { val KafkaBroker = "**.**.**.**:9092"; val InTopic = "avro"; // Get Spark session val session = SparkSession .builder .master("local[*]") .appName("myapp") .getOrCreate() // Load streaming data import session.implicits._ val data = session .readStream .format("kafka") .option("kafka.bootstrap.servers", KafkaBroker) .option("subscribe", InTopic) .load() .select($"value".as[Array[Byte]]) .map(d => { val rec = reader.read(null, decoder.binaryDecoder(d, null)) val payload = rec.get("payload").asInstanceOf[Byte].toString new KafkaMessage(payload) }) val query = data.writeStream .outputMode("Append") .format("console") .start() query.awaitTermination() } } I am getting the below error. org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -40 at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336) at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263) at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201) at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:363) at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:355) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157) at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193) at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142) at com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:99) at com.expedia.ecpr.risk.data.spark.streaming.MyMain$$anonfun$1.apply(StreamingDecisionJson.scala:98) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) I read suggestions to use DataFileReader instead of binaryDecoder as below but was was not successful using this in scala. DatumReader<GenericRecord> datumReader = new SpecificDatumReader<GenericRecord>(schema); DataFileStream<GenericRecord> dataFileReader = new DataFileStream<GenericRecord>(inputStream, datumReader); Once the Byte type "payload" is converted to json, I plan write it back to another topic of kafka. Any help on this is much appreciated. Thank you! Revin