[ 
https://issues.apache.org/jira/browse/HUDI-1079?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Adrian Tanase updated HUDI-1079:
--------------------------------
    Description: 
I am trying to trigger upserts on a table that has an array field with records 
of just one field.
 Here is the code to reproduce:
{code:scala}
  val spark = SparkSession.builder()
      .master("local[1]")
      .appName("SparkByExamples.com")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate();

  // https://sparkbyexamples.com/spark/spark-dataframe-array-of-struct/

  val arrayStructData = Seq(
    Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))),
    Row("Michael",List(Row("Java","XY",200),Row("Scala","XB",500))),
    Row("Robert",List(Row("Java","XZ",400),Row("Scala","XC",250))),
    Row("Washington",null)
  )

  val arrayStructSchema = new StructType()
      .add("name",StringType)
      .add("booksIntersted",ArrayType(
        new StructType()
          .add("bookName",StringType)
//          .add("author",StringType)
//          .add("pages",IntegerType)
      ))

    val df = 
spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema)
{code}
Running insert following by upsert will fail:
{code:scala}
  df.write
      .format("hudi")
      .options(getQuickstartWriteConfigs)
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "name")
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "name")
      .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE")
      .option(HoodieWriteConfig.TABLE_NAME, tableName)
      .mode(Overwrite)
      .save(basePath)

  df.write
      .format("hudi")
      .options(getQuickstartWriteConfigs)
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "name")
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "name")
      .option(HoodieWriteConfig.TABLE_NAME, tableName)
      .mode(Append)
      .save(basePath)
{code}
If I create the books record with all the fields (at least 2), it works as 
expected.

The relevant part of the exception is this:
{noformat}
Caused by: java.lang.ClassCastException: required binary bookName (UTF8) is not 
a groupCaused by: java.lang.ClassCastException: required binary bookName (UTF8) 
is not a group at org.apache.parquet.schema.Type.asGroupType(Type.java:207) at 
org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:279)
 at 
org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:232)
 at 
org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:78)
 at 
org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:536)
 at 
org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:486)
 at 
org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:289)
 at 
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:141)
 at 
org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95) 
at 
org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
 at 
org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138)
 at 
org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183)
 at org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156) 
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135) at 
org.apache.hudi.client.utils.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
 at 
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
 at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ... 4 
more{noformat}
Another way to test is by changing the generated data in the tips to just the 
amount, by dropping the currency on the tips_history field, tests will start 
failing:
 
[https://github.com/apache/hudi/compare/release-0.5.3...aditanase:avro-arrays-upsert?expand=1]

I have narrowed this down to this block in the parquet-avro integration: 
[https://github.com/apache/parquet-mr/blob/master/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java#L846-L875]

Which always returns false after trying to decide whether reader and writer 
schemas are compatible. Going through that code path makes me thing it's 
related to the fields being optional, as the inferred schema seems to be (null, 
string) with default null instead of (string, null) with no default.

At this point I'm lost, tried to figure something out based on this 
[https://github.com/apache/hudi/pull/1406/files] but I'm not sure where to 
start.

  was:
I am trying to trigger upserts on a table that has an array field with records 
of just one field.
 Here is the code to reproduce:
{code:scala}
  val spark = SparkSession.builder()
      .master("local[1]")
      .appName("SparkByExamples.com")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate();

  // https://sparkbyexamples.com/spark/spark-dataframe-array-of-struct/

  val arrayStructData = Seq(
    Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))),
    Row("Michael",List(Row("Java","XY",200),Row("Scala","XB",500))),
    Row("Robert",List(Row("Java","XZ",400),Row("Scala","XC",250))),
    Row("Washington",null)
  )

  val arrayStructSchema = new StructType()
      .add("name",StringType)
      .add("booksIntersted",ArrayType(
        new StructType()
          .add("bookName",StringType)
//          .add("author",StringType)
//          .add("pages",IntegerType)
      ))

    val df = 
spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema)
{code}
Running insert following by upsert will fail:
{code:scala}
  df.write
      .format("hudi")
      .options(getQuickstartWriteConfigs)
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "name")
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "name")
      .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE")
      .option(HoodieWriteConfig.TABLE_NAME, tableName)
      .mode(Overwrite)
      .save(basePath)

  df.write
      .format("hudi")
      .options(getQuickstartWriteConfigs)
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "name")
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "name")
      .option(HoodieWriteConfig.TABLE_NAME, tableName)
      .mode(Append)
      .save(basePath)
{code}
If I create the books record with all the fields (at least 2), it works as 
expected.

Another way to test is by changing the generated data in the tips to just the 
amount, by dropping the currency on the tips_history field, tests will start 
failing:
 
[https://github.com/apache/hudi/compare/release-0.5.3...aditanase:avro-arrays-upsert?expand=1]

I have narrowed this down to this block in the parquet-avro integration: 
[https://github.com/apache/parquet-mr/blob/master/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java#L846-L875]

Which always returns false after trying to decide whether reader and writer 
schemas are compatible. Going through that code path makes me thing it's 
related to the fields being optional, as the inferred schema seems to be (null, 
string) with default null instead of (string, null) with no default.

At this point I'm lost, tried to figure something out based on this 
[https://github.com/apache/hudi/pull/1406/files] but I'm not sure where to 
start.


> Cannot upsert on schema with Array of Record with single field
> --------------------------------------------------------------
>
>                 Key: HUDI-1079
>                 URL: https://issues.apache.org/jira/browse/HUDI-1079
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: Spark Integration
>    Affects Versions: 0.5.3
>         Environment: spark 2.4.4, local 
>            Reporter: Adrian Tanase
>            Priority: Major
>
> I am trying to trigger upserts on a table that has an array field with 
> records of just one field.
>  Here is the code to reproduce:
> {code:scala}
>   val spark = SparkSession.builder()
>       .master("local[1]")
>       .appName("SparkByExamples.com")
>       .config("spark.serializer", 
> "org.apache.spark.serializer.KryoSerializer")
>       .getOrCreate();
>   // https://sparkbyexamples.com/spark/spark-dataframe-array-of-struct/
>   val arrayStructData = Seq(
>     Row("James",List(Row("Java","XX",120),Row("Scala","XA",300))),
>     Row("Michael",List(Row("Java","XY",200),Row("Scala","XB",500))),
>     Row("Robert",List(Row("Java","XZ",400),Row("Scala","XC",250))),
>     Row("Washington",null)
>   )
>   val arrayStructSchema = new StructType()
>       .add("name",StringType)
>       .add("booksIntersted",ArrayType(
>         new StructType()
>           .add("bookName",StringType)
> //          .add("author",StringType)
> //          .add("pages",IntegerType)
>       ))
>     val df = 
> spark.createDataFrame(spark.sparkContext.parallelize(arrayStructData),arrayStructSchema)
> {code}
> Running insert following by upsert will fail:
> {code:scala}
>   df.write
>       .format("hudi")
>       .options(getQuickstartWriteConfigs)
>       .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "name")
>       .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "name")
>       .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE")
>       .option(HoodieWriteConfig.TABLE_NAME, tableName)
>       .mode(Overwrite)
>       .save(basePath)
>   df.write
>       .format("hudi")
>       .options(getQuickstartWriteConfigs)
>       .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "name")
>       .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "name")
>       .option(HoodieWriteConfig.TABLE_NAME, tableName)
>       .mode(Append)
>       .save(basePath)
> {code}
> If I create the books record with all the fields (at least 2), it works as 
> expected.
> The relevant part of the exception is this:
> {noformat}
> Caused by: java.lang.ClassCastException: required binary bookName (UTF8) is 
> not a groupCaused by: java.lang.ClassCastException: required binary bookName 
> (UTF8) is not a group at 
> org.apache.parquet.schema.Type.asGroupType(Type.java:207) at 
> org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:279)
>  at 
> org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:232)
>  at 
> org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:78)
>  at 
> org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:536)
>  at 
> org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:486)
>  at 
> org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:289)
>  at 
> org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:141)
>  at 
> org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95)
>  at 
> org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
>  at 
> org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138)
>  at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183)
>  at 
> org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156) at 
> org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135) at 
> org.apache.hudi.client.utils.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:49)
>  at 
> org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:45)
>  at 
> org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:92)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ... 4 
> more{noformat}
> Another way to test is by changing the generated data in the tips to just the 
> amount, by dropping the currency on the tips_history field, tests will start 
> failing:
>  
> [https://github.com/apache/hudi/compare/release-0.5.3...aditanase:avro-arrays-upsert?expand=1]
> I have narrowed this down to this block in the parquet-avro integration: 
> [https://github.com/apache/parquet-mr/blob/master/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java#L846-L875]
> Which always returns false after trying to decide whether reader and writer 
> schemas are compatible. Going through that code path makes me thing it's 
> related to the fields being optional, as the inferred schema seems to be 
> (null, string) with default null instead of (string, null) with no default.
> At this point I'm lost, tried to figure something out based on this 
> [https://github.com/apache/hudi/pull/1406/files] but I'm not sure where to 
> start.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to