Currently, the only way for you would be to create proper schema for the
data. This is not a bug, but you could open a jira (since this would help
others to solve their similar use-cases) for feature and in future version
it could be implemented and included.

Thanks
Best Regards

On Tue, Jul 21, 2015 at 4:41 PM, Krzysztof Zarzycki <k.zarzy...@gmail.com>
wrote:

> Hi everyone,
> I have pretty challenging problem with reading/writing multiple parquet
> files with streaming, but let me introduce my data flow:
>
> I have a lot of json events streaming to my platform. All of them have the
> same structure, but fields are mostly optional. Some of the fields are
> arrays with structs inside.
> These arrays can be empty, but sometimes they contain the data (structs).
>
> Now I'm using Spark SQL & Streaming to:
> 0. Stream data from Kafka
>
> val stream = KafkaUtils.createDirectStream ...
>
> 1. read json data to json dataframe:
>
> stream.foreachRDD( rdd => {
>
> val dataRdd : RDD[String] = myTransform(rdd)
>
> val jsonDf = sql.read.json(dataRdd)
>
> 2. write jsonDf to Parquet files:
>
> if (firstRun) {
>
>   jsonRdd.write.parquet("parquet-events")
>
>   firstRun = false
>
> } else { // the table has to exist to be able to append data.
>
>   jsonRdd.write.mode(SaveMode.Append).parquet("parquet-events")
>
> }
>
> })
>
> All the writing goes fine. It produces multiple files, each for one batch
> of data.
>
> But the problem is on reading the data:
>
> scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
>
> scala> val events = sqlContext.read.parquet("parquet-events")
>
> org.apache.spark.SparkException: Failed to merge incompatible schemas 
> StructType...
>
> Caused by: org.apache.spark.SparkException: Failed to merge incompatible data 
> types StringType and StructType(StructField(key,StringType,true), 
> StructField(value,StringType,true))
>
>
> Indeed the printed schemas contain mismatched types of few fields, e.g.:
>
> StructField(details,ArrayType(StringType,true),true)
>
> vs
>
> StructField(details,ArrayType(StructType(StructField(key,StringType,true), 
> StructField(value,StringType,tru
> e)),true),true)
>
>
> It seems that bad thing happened in read.json:  it recognized my array fields 
> differently: when array is empty as containing Strings;  when filled with 
> data as containing structs.
>
> The code of json/InferSchema indeed suggests that: 
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala#L127Where
>  canonicalizeType method replaces NullType with StringType in my empty arrays.
>
> This is a serious problem for someone trying to stream data from json to 
> parquet tables . Does anyone have ideas how to handle this problem? My ideas 
> are (some non-exclusive):
>
> 1. Have schema perfectly defined on my data. This is a last resort as I 
> wanted to create schema-less solution.
>
> 2. Write my own schema inference, that removes empty arrays from schema. Then 
> pass schema directly to read method. I could even use & modify InferSchema 
> class from spark source, but it is private unfortunately... So I need to copy 
> paste it.
>
> 3. Submit a bug to Spark about it. Do you also think it is a bug?
>
> It's a blocker for me currently, any help will be appreciated!
>
> Cheers,
>
> Krzysztof
>
>
>
>
>
>
>
>
>

Reply via email to