[ 
https://issues.apache.org/jira/browse/SPARK-33314?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim updated SPARK-33314:
---------------------------------
    Priority: Blocker  (was: Major)

> Avro reader drops rows
> ----------------------
>
>                 Key: SPARK-33314
>                 URL: https://issues.apache.org/jira/browse/SPARK-33314
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.1.0
>            Reporter: Bruce Robbins
>            Priority: Blocker
>              Labels: correctness
>
> Under certain circumstances, the V1 Avro reader drops rows. For example:
> {noformat}
> scala> val df = spark.range(0, 25).toDF("index")
> df: org.apache.spark.sql.DataFrame = [index: bigint]
> scala> df.write.mode("overwrite").format("avro").save("index_avro")
> scala> val loaded = spark.read.format("avro").load("index_avro")
> loaded: org.apache.spark.sql.DataFrame = [index: bigint]
> scala> loaded.collect.size
> res1: Int = 25
> scala> loaded.orderBy("index").collect.size
> res2: Int = 17   <== expected 25
> scala> 
> loaded.orderBy("index").write.mode("overwrite").format("parquet").save("index_as_parquet")
> scala> spark.read.parquet("index_as_parquet").count
> res4: Long = 17
> scala>
> {noformat}
> SPARK-32346 slightly refactored the AvroFileFormat and 
> AvroPartitionReaderFactory to use a new iterator-like trait called 
> AvroUtils#RowReader. RowReader#hasNextRow consumes a raw input record and 
> stores the deserialized row for the next call to RowReader#nextRow. 
> Unfortunately, sometimes hasNextRow is called twice before nextRow is called, 
> resulting in a lost row (see 
> [BypassMergeSortShuffleWriter#write|https://github.com/apache/spark/blob/69c27f49acf2fe6fbc8335bde2aac4afd4188678/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java#L132],
>  which calls records.hasNext once before calling it again 
> [here|https://github.com/apache/spark/blob/69c27f49acf2fe6fbc8335bde2aac4afd4188678/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java#L155]).
> RowReader consumes the Avro record in hasNextRow, rather than nextRow, 
> because AvroDeserializer#deserialize potentially filters out the record.
> Two possible fixes that I thought of:
> 1) keep state in RowReader such that multiple calls to RowReader#hasNextRow 
> with no intervening call to RowReader#nextRow avoids consuming more than 1 
> Avro record. This requires no changes to any code that extends RowReader, 
> just RowReader itself.
>  2) Move record consumption to RowReader#nextRow (such that RowReader#nextRow 
> could potentially return None) and wrap any iterator that extends RowReader 
> with a new iterator created by flatMap. This last iterator will filter out 
> the Nones and extract rows from the Somes. This requires changes to 
> AvroFileFormat and AvroPartitionReaderFactory as well as RowReader.
> The first one seems simplest and most straightfoward, and doesn't require 
> changes to AvroFileFormat and AvroPartitionReaderFactory, only to 
> AvroUtils#RowReader. So I propose this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to