Currently, the only way for you would be to create proper schema for the
data. This is not a bug, but you could open a jira (since this would help
others to solve their similar use-cases) for feature and in future version
it could be implemented and included.

Best Regards

On Tue, Jul 21, 2015 at 4:41 PM, Krzysztof Zarzycki <>

> Hi everyone,
> I have pretty challenging problem with reading/writing multiple parquet
> files with streaming, but let me introduce my data flow:
> I have a lot of json events streaming to my platform. All of them have the
> same structure, but fields are mostly optional. Some of the fields are
> arrays with structs inside.
> These arrays can be empty, but sometimes they contain the data (structs).
> Now I'm using Spark SQL & Streaming to:
> 0. Stream data from Kafka
> val stream = KafkaUtils.createDirectStream ...
> 1. read json data to json dataframe:
> stream.foreachRDD( rdd => {
> val dataRdd : RDD[String] = myTransform(rdd)
> val jsonDf =
> 2. write jsonDf to Parquet files:
> if (firstRun) {
>   jsonRdd.write.parquet("parquet-events")
>   firstRun = false
> } else { // the table has to exist to be able to append data.
>   jsonRdd.write.mode(SaveMode.Append).parquet("parquet-events")
> }
> })
> All the writing goes fine. It produces multiple files, each for one batch
> of data.
> But the problem is on reading the data:
> scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
> scala> val events ="parquet-events")
> org.apache.spark.SparkException: Failed to merge incompatible schemas 
> StructType...
> Caused by: org.apache.spark.SparkException: Failed to merge incompatible data 
> types StringType and StructType(StructField(key,StringType,true), 
> StructField(value,StringType,true))
> Indeed the printed schemas contain mismatched types of few fields, e.g.:
> StructField(details,ArrayType(StringType,true),true)
> vs
> StructField(details,ArrayType(StructType(StructField(key,StringType,true), 
> StructField(value,StringType,tru
> e)),true),true)
> It seems that bad thing happened in read.json:  it recognized my array fields 
> differently: when array is empty as containing Strings;  when filled with 
> data as containing structs.
> The code of json/InferSchema indeed suggests that: 
>  canonicalizeType method replaces NullType with StringType in my empty arrays.
> This is a serious problem for someone trying to stream data from json to 
> parquet tables . Does anyone have ideas how to handle this problem? My ideas 
> are (some non-exclusive):
> 1. Have schema perfectly defined on my data. This is a last resort as I 
> wanted to create schema-less solution.
> 2. Write my own schema inference, that removes empty arrays from schema. Then 
> pass schema directly to read method. I could even use & modify InferSchema 
> class from spark source, but it is private unfortunately... So I need to copy 
> paste it.
> 3. Submit a bug to Spark about it. Do you also think it is a bug?
> It's a blocker for me currently, any help will be appreciated!
> Cheers,
> Krzysztof

Reply via email to