Currently, the only way for you would be to create proper schema for the data. This is not a bug, but you could open a jira (since this would help others to solve their similar use-cases) for feature and in future version it could be implemented and included.
Thanks Best Regards On Tue, Jul 21, 2015 at 4:41 PM, Krzysztof Zarzycki <k.zarzy...@gmail.com> wrote: > Hi everyone, > I have pretty challenging problem with reading/writing multiple parquet > files with streaming, but let me introduce my data flow: > > I have a lot of json events streaming to my platform. All of them have the > same structure, but fields are mostly optional. Some of the fields are > arrays with structs inside. > These arrays can be empty, but sometimes they contain the data (structs). > > Now I'm using Spark SQL & Streaming to: > 0. Stream data from Kafka > > val stream = KafkaUtils.createDirectStream ... > > 1. read json data to json dataframe: > > stream.foreachRDD( rdd => { > > val dataRdd : RDD[String] = myTransform(rdd) > > val jsonDf = sql.read.json(dataRdd) > > 2. write jsonDf to Parquet files: > > if (firstRun) { > > jsonRdd.write.parquet("parquet-events") > > firstRun = false > > } else { // the table has to exist to be able to append data. > > jsonRdd.write.mode(SaveMode.Append).parquet("parquet-events") > > } > > }) > > All the writing goes fine. It produces multiple files, each for one batch > of data. > > But the problem is on reading the data: > > scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc) > > scala> val events = sqlContext.read.parquet("parquet-events") > > org.apache.spark.SparkException: Failed to merge incompatible schemas > StructType... > > Caused by: org.apache.spark.SparkException: Failed to merge incompatible data > types StringType and StructType(StructField(key,StringType,true), > StructField(value,StringType,true)) > > > Indeed the printed schemas contain mismatched types of few fields, e.g.: > > StructField(details,ArrayType(StringType,true),true) > > vs > > StructField(details,ArrayType(StructType(StructField(key,StringType,true), > StructField(value,StringType,tru > e)),true),true) > > > It seems that bad thing happened in read.json: it recognized my array fields > differently: when array is empty as containing Strings; when filled with > data as containing structs. > > The code of json/InferSchema indeed suggests that: > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala#L127Where > canonicalizeType method replaces NullType with StringType in my empty arrays. > > This is a serious problem for someone trying to stream data from json to > parquet tables . Does anyone have ideas how to handle this problem? My ideas > are (some non-exclusive): > > 1. Have schema perfectly defined on my data. This is a last resort as I > wanted to create schema-less solution. > > 2. Write my own schema inference, that removes empty arrays from schema. Then > pass schema directly to read method. I could even use & modify InferSchema > class from spark source, but it is private unfortunately... So I need to copy > paste it. > > 3. Submit a bug to Spark about it. Do you also think it is a bug? > > It's a blocker for me currently, any help will be appreciated! > > Cheers, > > Krzysztof > > > > > > > > >