Repository: spark
Updated Branches:
  refs/heads/master d084a2de3 -> 34de24abb


[SPARK-12589][SQL] Fix UnsafeRowParquetRecordReader to properly set the row 
length.

The reader was previously not setting the row length meaning it was wrong if 
there were variable
length columns. This problem does not manifest usually, since the value in the 
column is correct and
projecting the row fixes the issue.

Author: Nong Li <n...@databricks.com>

Closes #10576 from nongli/spark-12589.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/34de24ab
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/34de24ab
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/34de24ab

Branch: refs/heads/master
Commit: 34de24abb518e95c4312b77aa107d061ce02c835
Parents: d084a2d
Author: Nong Li <n...@databricks.com>
Authored: Mon Jan 4 14:58:24 2016 -0800
Committer: Yin Huai <yh...@databricks.com>
Committed: Mon Jan 4 14:58:24 2016 -0800

----------------------------------------------------------------------
 .../sql/catalyst/expressions/UnsafeRow.java     |  4 ++++
 .../parquet/UnsafeRowParquetRecordReader.java   |  9 ++++++++
 .../datasources/parquet/ParquetIOSuite.scala    | 24 ++++++++++++++++++++
 3 files changed, 37 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/34de24ab/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index 7492b88..1a35193 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -177,6 +177,10 @@ public final class UnsafeRow extends MutableRow implements 
Externalizable, KryoS
     pointTo(buf, Platform.BYTE_ARRAY_OFFSET, sizeInBytes);
   }
 
+  public void setTotalSize(int sizeInBytes) {
+    this.sizeInBytes = sizeInBytes;
+  }
+
   public void setNotNullAt(int i) {
     assertIndexIsValid(i);
     BitSetMethods.unset(baseObject, baseOffset, i);

http://git-wip-us.apache.org/repos/asf/spark/blob/34de24ab/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
index a6758bd..198bfb6 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
@@ -256,6 +256,15 @@ public class UnsafeRowParquetRecordReader extends 
SpecificParquetRecordReaderBas
       numBatched = num;
       batchIdx = 0;
     }
+
+    // Update the total row lengths if the schema contained variable length. 
We did not maintain
+    // this as we populated the columns.
+    if (containsVarLenFields) {
+      for (int i = 0; i < numBatched; ++i) {
+        rows[i].setTotalSize(rowWriters[i].holder().totalSize());
+      }
+    }
+
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/34de24ab/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 0c5d488..b0581e8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -38,6 +38,7 @@ import org.apache.parquet.schema.{MessageType, 
MessageTypeParser}
 import org.apache.spark.SparkException
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
@@ -618,6 +619,29 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSQLContext {
       readResourceParquetFile("dec-in-fixed-len.parquet"),
       sqlContext.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 
'fixed_len_dec))
   }
+
+  test("SPARK-12589 copy() on rows returned from reader works for strings") {
+    withTempPath { dir =>
+      val data = (1, "abc") ::(2, "helloabcde") :: Nil
+      data.toDF().write.parquet(dir.getCanonicalPath)
+      var hash1: Int = 0
+      var hash2: Int = 0
+      (false :: true :: Nil).foreach { v =>
+        withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> 
v.toString) {
+          val df = sqlContext.read.parquet(dir.getCanonicalPath)
+          val rows = df.queryExecution.toRdd.map(_.copy()).collect()
+          val unsafeRows = rows.map(_.asInstanceOf[UnsafeRow])
+          if (!v) {
+            hash1 = unsafeRows(0).hashCode()
+            hash2 = unsafeRows(1).hashCode()
+          } else {
+            assert(hash1 == unsafeRows(0).hashCode())
+            assert(hash2 == unsafeRows(1).hashCode())
+          }
+        }
+      }
+    }
+  }
 }
 
 class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: 
TaskAttemptContext)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to