[ 
https://issues.apache.org/jira/browse/HUDI-1079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17155004#comment-17155004
 ] 

Udit Mehrotra commented on HUDI-1079:
-------------------------------------

Also if you just try the following in Spark:
{code:java}
val df = Seq(
 ("100", "2015-01-01", "event_name_900", "2015-01-01T13:51:39.340396Z", 
"type1", Array(("abcd"), ("xyz")))
).toDF("event_id", "event_date", "event_name", "event_ts", "event_type", 
"event_array")

df.schema

res7: org.apache.spark.sql.types.StructType = 
StructType(StructField(event_id,StringType,true), 
StructField(event_date,StringType,true), 
StructField(event_name,StringType,true), StructField(event_ts,StringType,true), 
StructField(event_type,StringType,true), 
StructField(event_desc,ArrayType(StringType,true),true))
{code}
vs
{code:java}
val df = Seq(
 ("100", "2015-01-01", "event_name_900", "2015-01-01T13:51:39.340396Z", 
"type1", Array(("abcd", 10), ("xyz", 11)))
).toDF("event_id", "event_date", "event_name", "event_ts", "event_type", 
"event_desc")

df.schema

res8: org.apache.spark.sql.types.StructType = 
StructType(StructField(event_id,StringType,true), 
StructField(event_date,StringType,true), 
StructField(event_name,StringType,true), StructField(event_ts,StringType,true), 
StructField(event_type,StringType,true), 
StructField(event_desc,ArrayType(StructType(StructField(_1,StringType,true), 
StructField(_2,IntegerType,false)),true),true))
{code}
You will see that even Spark by default does not treat the sub type as 
*StructType* when there is only one field in the *Array*. When there are more 
than one field it interprets it as *StructType*. So I am thinking if this 
should be an issue in the first place.

> Cannot upsert on schema with Array of Record with single field
> --------------------------------------------------------------
>
>                 Key: HUDI-1079
>                 URL: https://issues.apache.org/jira/browse/HUDI-1079
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: Spark Integration
>    Affects Versions: 0.5.3
>         Environment: spark 2.4.4, local 
>            Reporter: Adrian Tanase
>            Priority: Major
>             Fix For: 0.6.0
>
>
> I am trying to trigger upserts on a table that has an array field with 
> records of just one field.
>  Here is the code to reproduce:
> {code:scala}
>   val spark = SparkSession.builder()
>       .master("local[1]")
>       .appName("SparkByExamples.com")
>       .config("spark.serializer", 
> "org.apache.spark.serializer.KryoSerializer")
>       .getOrCreate();
>   // https://sparkbyexamples.com/spark/spark-dataframe-array-of-struct/
>   val arrayStructData = Seq(
>     Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))),
>     Row("Michael",List(Row("Java","XY",200),Row("Scala","XB",500))),
>     Row("Robert",List(Row("Java","XZ",400),Row("Scala","XC",250))),
>     Row("Washington",null)
>   )
>   val arrayStructSchema = new StructType()
>       .add("name",StringType)
>       .add("booksIntersted",ArrayType(
>         new StructType()
>           .add("bookName",StringType)
> //          .add("author",StringType)
> //          .add("pages",IntegerType)
>       ))
>     val df = 
> spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema)
> {code}
> Running insert following by upsert will fail:
> {code:scala}
>   df.write
>       .format("hudi")
>       .options(getQuickstartWriteConfigs)
>       .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "name")
>       .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "name")
>       .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE")
>       .option(HoodieWriteConfig.TABLE_NAME, tableName)
>       .mode(Overwrite)
>       .save(basePath)
>   df.write
>       .format("hudi")
>       .options(getQuickstartWriteConfigs)
>       .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "name")
>       .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "name")
>       .option(HoodieWriteConfig.TABLE_NAME, tableName)
>       .mode(Append)
>       .save(basePath)
> {code}
> If I create the books record with all the fields (at least 2), it works as 
> expected.
> The relevant part of the exception is this:
> {noformat}
> Caused by: java.lang.ClassCastException: required binary bookName (UTF8) is 
> not a groupCaused by: java.lang.ClassCastException: required binary bookName 
> (UTF8) is not a group at 
> org.apache.parquet.schema.Type.asGroupType(Type.java:207) at 
> org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:279)
>  at 
> org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:232)
>  at 
> org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:78)
>  at 
> org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:536)
>  at 
> org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:486)
>  at 
> org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:289)
>  at 
> org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:141)
>  at 
> org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95)
>  at 
> org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
>  at 
> org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138)
>  at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183)
>  at 
> org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156) at 
> org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135) at 
> org.apache.hudi.client.utils.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
>  at 
> org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
>  at 
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ... 4 
> more{noformat}
> Another way to test is by changing the generated data in the tips to just the 
> amount, by dropping the currency on the tips_history field, tests will start 
> failing:
>  
> [https://github.com/apache/hudi/compare/release-0.5.3...aditanase:avro-arrays-upsert?expand=1]
> I have narrowed this down to this block in the parquet-avro integration: 
> [https://github.com/apache/parquet-mr/blob/master/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java#L846-L875]
> Which always returns false after trying to decide whether reader and writer 
> schemas are compatible. Going through that code path makes me thing it's 
> related to the fields being optional, as the inferred schema seems to be 
> (null, string) with default null instead of (string, null) with no default.
> At this point I'm lost, tried to figure something out based on this 
> [https://github.com/apache/hudi/pull/1406/files] but I'm not sure where to 
> start.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to