Re: writing/reading multiple Parquet files: Failed to merge incompatible data types StringType and StructType

2015-07-24 Thread Cheng Lian
I don’t think this is a bug either. For an empty JSON array |[]|, 
there’s simply no way to infer its actual data type, and in this case 
Spark SQL just tries to fill in the “safest” type, which is 
|StringType|, because basically you can cast any data type to |StringType|.


In general, schema inference/evolution is a hard problem. Especially 
when schemaless data formats like JSON are used in the data pipeline, 
because type information gets lost along the way. Spark SQL tries to 
minimize the efforts, but it can’t do all the work for you if the type 
information of your data is intrinsically incomplete, or the schema is 
evolving in an incompatible way (required columns become optional, or 
changing data types of existing columns).


Cheng

On 7/24/15 12:23 AM, Akhil Das wrote:

Currently, the only way for you would be to create proper schema for 
the data. This is not a bug, but you could open a jira (since this 
would help others to solve their similar use-cases) for feature and in 
future version it could be implemented and included.


Thanks
Best Regards

On Tue, Jul 21, 2015 at 4:41 PM, Krzysztof Zarzycki 
k.zarzy...@gmail.com mailto:k.zarzy...@gmail.com wrote:


Hi everyone,
I have pretty challenging problem with reading/writing multiple
parquet files with streaming, but let me introduce my data flow:

I have a lot of json events streaming to my platform. All of them
have the same structure, but fields are mostly optional. Some of
the fields are arrays with structs inside.
These arrays can be empty, but sometimes they contain the data
(structs).

Now I'm using Spark SQL  Streaming to:
0. Stream data from Kafka

val stream = KafkaUtils.createDirectStream ...

1. read json data to json dataframe:

stream.foreachRDD( rdd = {

val dataRdd : RDD[String] = myTransform(rdd)

val jsonDf = sql.read.json(dataRdd)

2. write jsonDf to Parquet files:

if (firstRun) {

   jsonRdd.write.parquet(parquet-events)

   firstRun =false

}else { // the table has to exist to be able to append data.

   jsonRdd.write.mode(SaveMode.Append).parquet(parquet-events)

}

})

All the writing goes fine. It produces multiple files, each for
one batch of data.

But the problem is on reading the data:

scala val sqlContext = new org.apache.spark.sql.SQLContext(sc)

scala val events = sqlContext.read.parquet(parquet-events)

org.apache.spark.SparkException: Failed to merge incompatible
schemas StructType...

Caused by: org.apache.spark.SparkException: Failed to merge
incompatible data types StringType and
StructType(StructField(key,StringType,true),
StructField(value,StringType,true))


Indeed the printed schemas contain mismatched types of few fields,
e.g.:

StructField(details,ArrayType(StringType,true),true)

vs

StructField(details,ArrayType(StructType(StructField(key,StringType,true),
StructField(value,StringType,tru e)),true),true)

It seems that bad thing happened in read.json: itrecognized my
array fields differently: when array is empty as containing
Strings; when filled with data as containing structs.

The code of json/InferSchema indeed suggests that:

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala#L127
WherecanonicalizeType method replaces NullType with StringType in
my empty arrays.

This is a serious problem for someone trying to stream data from
json to parquet tables . Does anyone have ideas how to handle this
problem? My ideas are (some non-exclusive):

1. Have schema perfectly defined on my data. This is a last resort
as I wanted to create schema-less solution.

2. Write my own schema inference, that removes empty arrays from
schema. Then pass schema directly to read method. I could even use
 modify InferSchema class from spark source, but it is private
unfortunately... So I need to copy paste it.

3. Submit a bug to Spark about it. Do you also think it is a bug?

It's a blocker for me currently, any help will be appreciated!

Cheers,

Krzysztof



​


Re: writing/reading multiple Parquet files: Failed to merge incompatible data types StringType and StructType

2015-07-23 Thread Akhil Das
Currently, the only way for you would be to create proper schema for the
data. This is not a bug, but you could open a jira (since this would help
others to solve their similar use-cases) for feature and in future version
it could be implemented and included.

Thanks
Best Regards

On Tue, Jul 21, 2015 at 4:41 PM, Krzysztof Zarzycki k.zarzy...@gmail.com
wrote:

 Hi everyone,
 I have pretty challenging problem with reading/writing multiple parquet
 files with streaming, but let me introduce my data flow:

 I have a lot of json events streaming to my platform. All of them have the
 same structure, but fields are mostly optional. Some of the fields are
 arrays with structs inside.
 These arrays can be empty, but sometimes they contain the data (structs).

 Now I'm using Spark SQL  Streaming to:
 0. Stream data from Kafka

 val stream = KafkaUtils.createDirectStream ...

 1. read json data to json dataframe:

 stream.foreachRDD( rdd = {

 val dataRdd : RDD[String] = myTransform(rdd)

 val jsonDf = sql.read.json(dataRdd)

 2. write jsonDf to Parquet files:

 if (firstRun) {

   jsonRdd.write.parquet(parquet-events)

   firstRun = false

 } else { // the table has to exist to be able to append data.

   jsonRdd.write.mode(SaveMode.Append).parquet(parquet-events)

 }

 })

 All the writing goes fine. It produces multiple files, each for one batch
 of data.

 But the problem is on reading the data:

 scala val sqlContext = new org.apache.spark.sql.SQLContext(sc)

 scala val events = sqlContext.read.parquet(parquet-events)

 org.apache.spark.SparkException: Failed to merge incompatible schemas 
 StructType...

 Caused by: org.apache.spark.SparkException: Failed to merge incompatible data 
 types StringType and StructType(StructField(key,StringType,true), 
 StructField(value,StringType,true))


 Indeed the printed schemas contain mismatched types of few fields, e.g.:

 StructField(details,ArrayType(StringType,true),true)

 vs

 StructField(details,ArrayType(StructType(StructField(key,StringType,true), 
 StructField(value,StringType,tru
 e)),true),true)


 It seems that bad thing happened in read.json:  it recognized my array fields 
 differently: when array is empty as containing Strings;  when filled with 
 data as containing structs.

 The code of json/InferSchema indeed suggests that: 
 https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala#L127Where
  canonicalizeType method replaces NullType with StringType in my empty arrays.

 This is a serious problem for someone trying to stream data from json to 
 parquet tables . Does anyone have ideas how to handle this problem? My ideas 
 are (some non-exclusive):

 1. Have schema perfectly defined on my data. This is a last resort as I 
 wanted to create schema-less solution.

 2. Write my own schema inference, that removes empty arrays from schema. Then 
 pass schema directly to read method. I could even use  modify InferSchema 
 class from spark source, but it is private unfortunately... So I need to copy 
 paste it.

 3. Submit a bug to Spark about it. Do you also think it is a bug?

 It's a blocker for me currently, any help will be appreciated!

 Cheers,

 Krzysztof











writing/reading multiple Parquet files: Failed to merge incompatible data types StringType and StructType

2015-07-21 Thread Krzysztof Zarzycki
Hi everyone,
I have pretty challenging problem with reading/writing multiple parquet
files with streaming, but let me introduce my data flow:

I have a lot of json events streaming to my platform. All of them have the
same structure, but fields are mostly optional. Some of the fields are
arrays with structs inside.
These arrays can be empty, but sometimes they contain the data (structs).

Now I'm using Spark SQL  Streaming to:
0. Stream data from Kafka

val stream = KafkaUtils.createDirectStream ...

1. read json data to json dataframe:

stream.foreachRDD( rdd = {

val dataRdd : RDD[String] = myTransform(rdd)

val jsonDf = sql.read.json(dataRdd)

2. write jsonDf to Parquet files:

if (firstRun) {

  jsonRdd.write.parquet(parquet-events)

  firstRun = false

} else { // the table has to exist to be able to append data.

  jsonRdd.write.mode(SaveMode.Append).parquet(parquet-events)

}

})

All the writing goes fine. It produces multiple files, each for one batch
of data.

But the problem is on reading the data:

scala val sqlContext = new org.apache.spark.sql.SQLContext(sc)

scala val events = sqlContext.read.parquet(parquet-events)

org.apache.spark.SparkException: Failed to merge incompatible schemas
StructType...

Caused by: org.apache.spark.SparkException: Failed to merge
incompatible data types StringType and
StructType(StructField(key,StringType,true),
StructField(value,StringType,true))


Indeed the printed schemas contain mismatched types of few fields, e.g.:

StructField(details,ArrayType(StringType,true),true)

vs

StructField(details,ArrayType(StructType(StructField(key,StringType,true),
StructField(value,StringType,tru
e)),true),true)


It seems that bad thing happened in read.json:  it recognized my array
fields differently: when array is empty as containing Strings;  when
filled with data as containing structs.

The code of json/InferSchema indeed suggests that:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/json/InferSchema.scala#L127Where
canonicalizeType method replaces NullType with StringType in my empty
arrays.

This is a serious problem for someone trying to stream data from json
to parquet tables . Does anyone have ideas how to handle this problem?
My ideas are (some non-exclusive):

1. Have schema perfectly defined on my data. This is a last resort as
I wanted to create schema-less solution.

2. Write my own schema inference, that removes empty arrays from
schema. Then pass schema directly to read method. I could even use 
modify InferSchema class from spark source, but it is private
unfortunately... So I need to copy paste it.

3. Submit a bug to Spark about it. Do you also think it is a bug?

It's a blocker for me currently, any help will be appreciated!

Cheers,

Krzysztof