Repository: spark Updated Branches: refs/heads/master d084a2de3 -> 34de24abb
[SPARK-12589][SQL] Fix UnsafeRowParquetRecordReader to properly set the row length. The reader was previously not setting the row length meaning it was wrong if there were variable length columns. This problem does not manifest usually, since the value in the column is correct and projecting the row fixes the issue. Author: Nong Li <n...@databricks.com> Closes #10576 from nongli/spark-12589. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/34de24ab Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/34de24ab Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/34de24ab Branch: refs/heads/master Commit: 34de24abb518e95c4312b77aa107d061ce02c835 Parents: d084a2d Author: Nong Li <n...@databricks.com> Authored: Mon Jan 4 14:58:24 2016 -0800 Committer: Yin Huai <yh...@databricks.com> Committed: Mon Jan 4 14:58:24 2016 -0800 ---------------------------------------------------------------------- .../sql/catalyst/expressions/UnsafeRow.java | 4 ++++ .../parquet/UnsafeRowParquetRecordReader.java | 9 ++++++++ .../datasources/parquet/ParquetIOSuite.scala | 24 ++++++++++++++++++++ 3 files changed, 37 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/34de24ab/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index 7492b88..1a35193 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -177,6 +177,10 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS pointTo(buf, Platform.BYTE_ARRAY_OFFSET, sizeInBytes); } + public void setTotalSize(int sizeInBytes) { + this.sizeInBytes = sizeInBytes; + } + public void setNotNullAt(int i) { assertIndexIsValid(i); BitSetMethods.unset(baseObject, baseOffset, i); http://git-wip-us.apache.org/repos/asf/spark/blob/34de24ab/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java index a6758bd..198bfb6 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java @@ -256,6 +256,15 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas numBatched = num; batchIdx = 0; } + + // Update the total row lengths if the schema contained variable length. We did not maintain + // this as we populated the columns. + if (containsVarLenFields) { + for (int i = 0; i < numBatched; ++i) { + rows[i].setTotalSize(rowWriters[i].holder().totalSize()); + } + } + return true; } http://git-wip-us.apache.org/repos/asf/spark/blob/34de24ab/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 0c5d488..b0581e8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -38,6 +38,7 @@ import org.apache.parquet.schema.{MessageType, MessageTypeParser} import org.apache.spark.SparkException import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.ScalaReflection +import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -618,6 +619,29 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { readResourceParquetFile("dec-in-fixed-len.parquet"), sqlContext.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 'fixed_len_dec)) } + + test("SPARK-12589 copy() on rows returned from reader works for strings") { + withTempPath { dir => + val data = (1, "abc") ::(2, "helloabcde") :: Nil + data.toDF().write.parquet(dir.getCanonicalPath) + var hash1: Int = 0 + var hash2: Int = 0 + (false :: true :: Nil).foreach { v => + withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> v.toString) { + val df = sqlContext.read.parquet(dir.getCanonicalPath) + val rows = df.queryExecution.toRdd.map(_.copy()).collect() + val unsafeRows = rows.map(_.asInstanceOf[UnsafeRow]) + if (!v) { + hash1 = unsafeRows(0).hashCode() + hash2 = unsafeRows(1).hashCode() + } else { + assert(hash1 == unsafeRows(0).hashCode()) + assert(hash2 == unsafeRows(1).hashCode()) + } + } + } + } + } } class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org