[ https://issues.apache.org/jira/browse/SPARK-33314?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon resolved SPARK-33314. ---------------------------------- Fix Version/s: 3.1.0 Resolution: Fixed Issue resolved by pull request 30221 [https://github.com/apache/spark/pull/30221] > Avro reader drops rows > ---------------------- > > Key: SPARK-33314 > URL: https://issues.apache.org/jira/browse/SPARK-33314 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.1.0 > Reporter: Bruce Robbins > Assignee: Bruce Robbins > Priority: Blocker > Labels: correctness > Fix For: 3.1.0 > > > Under certain circumstances, the V1 Avro reader drops rows. For example: > {noformat} > scala> val df = spark.range(0, 25).toDF("index") > df: org.apache.spark.sql.DataFrame = [index: bigint] > scala> df.write.mode("overwrite").format("avro").save("index_avro") > scala> val loaded = spark.read.format("avro").load("index_avro") > loaded: org.apache.spark.sql.DataFrame = [index: bigint] > scala> loaded.collect.size > res1: Int = 25 > scala> loaded.orderBy("index").collect.size > res2: Int = 17 <== expected 25 > scala> > loaded.orderBy("index").write.mode("overwrite").format("parquet").save("index_as_parquet") > scala> spark.read.parquet("index_as_parquet").count > res4: Long = 17 > scala> > {noformat} > SPARK-32346 slightly refactored the AvroFileFormat and > AvroPartitionReaderFactory to use a new iterator-like trait called > AvroUtils#RowReader. RowReader#hasNextRow consumes a raw input record and > stores the deserialized row for the next call to RowReader#nextRow. > Unfortunately, sometimes hasNextRow is called twice before nextRow is called, > resulting in a lost row (see > [BypassMergeSortShuffleWriter#write|https://github.com/apache/spark/blob/69c27f49acf2fe6fbc8335bde2aac4afd4188678/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java#L132], > which calls records.hasNext once before calling it again > [here|https://github.com/apache/spark/blob/69c27f49acf2fe6fbc8335bde2aac4afd4188678/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java#L155]). > RowReader consumes the Avro record in hasNextRow, rather than nextRow, > because AvroDeserializer#deserialize potentially filters out the record. > Two possible fixes that I thought of: > 1) keep state in RowReader such that multiple calls to RowReader#hasNextRow > with no intervening call to RowReader#nextRow avoids consuming more than 1 > Avro record. This requires no changes to any code that extends RowReader, > just RowReader itself. > 2) Move record consumption to RowReader#nextRow (such that RowReader#nextRow > could potentially return None) and wrap any iterator that extends RowReader > with a new iterator created by flatMap. This last iterator will filter out > the Nones and extract rows from the Somes. This requires changes to > AvroFileFormat and AvroPartitionReaderFactory as well as RowReader. > The first one seems simplest and most straightfoward, and doesn't require > changes to AvroFileFormat and AvroPartitionReaderFactory, only to > AvroUtils#RowReader. So I propose this. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org