Re: writing/reading multiple Parquet files: Failed to merge incompatible data types StringType and StructType
I don’t think this is a bug either. For an empty JSON array |[]|, there’s simply no way to infer its actual data type, and in this case Spark SQL just tries to fill in the “safest” type, which is |StringType|, because basically you can cast any data type to |StringType|. In general, schema inference/evolution is a hard problem. Especially when schemaless data formats like JSON are used in the data pipeline, because type information gets lost along the way. Spark SQL tries to minimize the efforts, but it can’t do all the work for you if the type information of your data is intrinsically incomplete, or the schema is evolving in an incompatible way (required columns become optional, or changing data types of existing columns). Cheng On 7/24/15 12:23 AM, Akhil Das wrote: Currently, the only way for you would be to create proper schema for the data. This is not a bug, but you could open a jira (since this would help others to solve their similar use-cases) for feature and in future version it could be implemented and included. Thanks Best Regards On Tue, Jul 21, 2015 at 4:41 PM, Krzysztof Zarzycki k.zarzy...@gmail.com mailto:k.zarzy...@gmail.com wrote: Hi everyone, I have pretty challenging problem with reading/writing multiple parquet files with streaming, but let me introduce my data flow: I have a lot of json events streaming to my platform. All of them have the same structure, but fields are mostly optional. Some of the fields are arrays with structs inside. These arrays can be empty, but sometimes they contain the data (structs). Now I'm using Spark SQL Streaming to: 0. Stream data from Kafka val stream = KafkaUtils.createDirectStream ... 1. read json data to json dataframe: stream.foreachRDD( rdd = { val dataRdd : RDD[String] = myTransform(rdd) val jsonDf = sql.read.json(dataRdd) 2. write jsonDf to Parquet files: if (firstRun) { jsonRdd.write.parquet(parquet-events) firstRun =false }else { // the table has to exist to be able to append data. jsonRdd.write.mode(SaveMode.Append).parquet(parquet-events) } }) All the writing goes fine. It produces multiple files, each for one batch of data. But the problem is on reading the data: scala val sqlContext = new org.apache.spark.sql.SQLContext(sc) scala val events = sqlContext.read.parquet(parquet-events) org.apache.spark.SparkException: Failed to merge incompatible schemas StructType... Caused by: org.apache.spark.SparkException: Failed to merge incompatible data types StringType and StructType(StructField(key,StringType,true), StructField(value,StringType,true)) Indeed the printed schemas contain mismatched types of few fields, e.g.: StructField(details,ArrayType(StringType,true),true) vs StructField(details,ArrayType(StructType(StructField(key,StringType,true), StructField(value,StringType,tru e)),true),true) It seems that bad thing happened in read.json: itrecognized my array fields differently: when array is empty as containing Strings; when filled with data as containing structs. The code of json/InferSchema indeed suggests that: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala#L127 WherecanonicalizeType method replaces NullType with StringType in my empty arrays. This is a serious problem for someone trying to stream data from json to parquet tables . Does anyone have ideas how to handle this problem? My ideas are (some non-exclusive): 1. Have schema perfectly defined on my data. This is a last resort as I wanted to create schema-less solution. 2. Write my own schema inference, that removes empty arrays from schema. Then pass schema directly to read method. I could even use modify InferSchema class from spark source, but it is private unfortunately... So I need to copy paste it. 3. Submit a bug to Spark about it. Do you also think it is a bug? It's a blocker for me currently, any help will be appreciated! Cheers, Krzysztof
Re: writing/reading multiple Parquet files: Failed to merge incompatible data types StringType and StructType
Currently, the only way for you would be to create proper schema for the data. This is not a bug, but you could open a jira (since this would help others to solve their similar use-cases) for feature and in future version it could be implemented and included. Thanks Best Regards On Tue, Jul 21, 2015 at 4:41 PM, Krzysztof Zarzycki k.zarzy...@gmail.com wrote: Hi everyone, I have pretty challenging problem with reading/writing multiple parquet files with streaming, but let me introduce my data flow: I have a lot of json events streaming to my platform. All of them have the same structure, but fields are mostly optional. Some of the fields are arrays with structs inside. These arrays can be empty, but sometimes they contain the data (structs). Now I'm using Spark SQL Streaming to: 0. Stream data from Kafka val stream = KafkaUtils.createDirectStream ... 1. read json data to json dataframe: stream.foreachRDD( rdd = { val dataRdd : RDD[String] = myTransform(rdd) val jsonDf = sql.read.json(dataRdd) 2. write jsonDf to Parquet files: if (firstRun) { jsonRdd.write.parquet(parquet-events) firstRun = false } else { // the table has to exist to be able to append data. jsonRdd.write.mode(SaveMode.Append).parquet(parquet-events) } }) All the writing goes fine. It produces multiple files, each for one batch of data. But the problem is on reading the data: scala val sqlContext = new org.apache.spark.sql.SQLContext(sc) scala val events = sqlContext.read.parquet(parquet-events) org.apache.spark.SparkException: Failed to merge incompatible schemas StructType... Caused by: org.apache.spark.SparkException: Failed to merge incompatible data types StringType and StructType(StructField(key,StringType,true), StructField(value,StringType,true)) Indeed the printed schemas contain mismatched types of few fields, e.g.: StructField(details,ArrayType(StringType,true),true) vs StructField(details,ArrayType(StructType(StructField(key,StringType,true), StructField(value,StringType,tru e)),true),true) It seems that bad thing happened in read.json: it recognized my array fields differently: when array is empty as containing Strings; when filled with data as containing structs. The code of json/InferSchema indeed suggests that: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala#L127Where canonicalizeType method replaces NullType with StringType in my empty arrays. This is a serious problem for someone trying to stream data from json to parquet tables . Does anyone have ideas how to handle this problem? My ideas are (some non-exclusive): 1. Have schema perfectly defined on my data. This is a last resort as I wanted to create schema-less solution. 2. Write my own schema inference, that removes empty arrays from schema. Then pass schema directly to read method. I could even use modify InferSchema class from spark source, but it is private unfortunately... So I need to copy paste it. 3. Submit a bug to Spark about it. Do you also think it is a bug? It's a blocker for me currently, any help will be appreciated! Cheers, Krzysztof
writing/reading multiple Parquet files: Failed to merge incompatible data types StringType and StructType
Hi everyone, I have pretty challenging problem with reading/writing multiple parquet files with streaming, but let me introduce my data flow: I have a lot of json events streaming to my platform. All of them have the same structure, but fields are mostly optional. Some of the fields are arrays with structs inside. These arrays can be empty, but sometimes they contain the data (structs). Now I'm using Spark SQL Streaming to: 0. Stream data from Kafka val stream = KafkaUtils.createDirectStream ... 1. read json data to json dataframe: stream.foreachRDD( rdd = { val dataRdd : RDD[String] = myTransform(rdd) val jsonDf = sql.read.json(dataRdd) 2. write jsonDf to Parquet files: if (firstRun) { jsonRdd.write.parquet(parquet-events) firstRun = false } else { // the table has to exist to be able to append data. jsonRdd.write.mode(SaveMode.Append).parquet(parquet-events) } }) All the writing goes fine. It produces multiple files, each for one batch of data. But the problem is on reading the data: scala val sqlContext = new org.apache.spark.sql.SQLContext(sc) scala val events = sqlContext.read.parquet(parquet-events) org.apache.spark.SparkException: Failed to merge incompatible schemas StructType... Caused by: org.apache.spark.SparkException: Failed to merge incompatible data types StringType and StructType(StructField(key,StringType,true), StructField(value,StringType,true)) Indeed the printed schemas contain mismatched types of few fields, e.g.: StructField(details,ArrayType(StringType,true),true) vs StructField(details,ArrayType(StructType(StructField(key,StringType,true), StructField(value,StringType,tru e)),true),true) It seems that bad thing happened in read.json: it recognized my array fields differently: when array is empty as containing Strings; when filled with data as containing structs. The code of json/InferSchema indeed suggests that: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala#L127Where canonicalizeType method replaces NullType with StringType in my empty arrays. This is a serious problem for someone trying to stream data from json to parquet tables . Does anyone have ideas how to handle this problem? My ideas are (some non-exclusive): 1. Have schema perfectly defined on my data. This is a last resort as I wanted to create schema-less solution. 2. Write my own schema inference, that removes empty arrays from schema. Then pass schema directly to read method. I could even use modify InferSchema class from spark source, but it is private unfortunately... So I need to copy paste it. 3. Submit a bug to Spark about it. Do you also think it is a bug? It's a blocker for me currently, any help will be appreciated! Cheers, Krzysztof