Repository: spark
Updated Branches:
  refs/heads/branch-1.6 1005ee396 -> 8ac919809


[SPARK-12589][SQL] Fix UnsafeRowParquetRecordReader to properly set the row 
length.

The reader was previously not setting the row length meaning it was wrong if 
there were variable
length columns. This problem does not manifest usually, since the value in the 
column is correct and
projecting the row fixes the issue.

Author: Nong Li <n...@databricks.com>

Closes #10576 from nongli/spark-12589.

(cherry picked from commit 34de24abb518e95c4312b77aa107d061ce02c835)
Signed-off-by: Yin Huai <yh...@databricks.com>

Conflicts:
        
sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8ac91980
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8ac91980
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8ac91980

Branch: refs/heads/branch-1.6
Commit: 8ac9198096d1cef9fbc062df8b8bd94fb9e96829
Parents: 1005ee3
Author: Nong Li <n...@databricks.com>
Authored: Mon Jan 4 14:58:24 2016 -0800
Committer: Yin Huai <yh...@databricks.com>
Committed: Mon Jan 4 15:08:58 2016 -0800

----------------------------------------------------------------------
 .../sql/catalyst/expressions/UnsafeRow.java     |  3 +++
 .../parquet/UnsafeRowParquetRecordReader.java   |  9 ++++++++
 .../datasources/parquet/ParquetIOSuite.scala    | 24 ++++++++++++++++++++
 3 files changed, 36 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8ac91980/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index b6979d0..af687ea 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -214,6 +214,9 @@ public final class UnsafeRow extends MutableRow implements 
Externalizable, KryoS
     pointTo(buf, numFields, sizeInBytes);
   }
 
+  public void setTotalSize(int sizeInBytes) {
+    this.sizeInBytes = sizeInBytes;
+  }
 
   public void setNotNullAt(int i) {
     assertIndexIsValid(i);

http://git-wip-us.apache.org/repos/asf/spark/blob/8ac91980/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
index dade488..b0d0774 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
@@ -264,6 +264,15 @@ public class UnsafeRowParquetRecordReader extends 
SpecificParquetRecordReaderBas
       numBatched = num;
       batchIdx = 0;
     }
+
+    // Update the total row lengths if the schema contained variable length. 
We did not maintain
+    // this as we populated the columns.
+    if (containsVarLenFields) {
+      for (int i = 0; i < numBatched; ++i) {
+        rows[i].setTotalSize(rowWriters[i].holder().totalSize());
+      }
+    }
+
     return true;
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8ac91980/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 159c3c6..4c3d3f9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -40,6 +40,7 @@ import org.apache.parquet.schema.{MessageType, 
MessageTypeParser}
 import org.apache.spark.SparkException
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
@@ -626,6 +627,29 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSQLContext {
       readResourceParquetFile("dec-in-fixed-len.parquet"),
       sqlContext.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 
'fixed_len_dec))
   }
+
+  test("SPARK-12589 copy() on rows returned from reader works for strings") {
+    withTempPath { dir =>
+      val data = (1, "abc") ::(2, "helloabcde") :: Nil
+      data.toDF().write.parquet(dir.getCanonicalPath)
+      var hash1: Int = 0
+      var hash2: Int = 0
+      (false :: true :: Nil).foreach { v =>
+        withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> 
v.toString) {
+          val df = sqlContext.read.parquet(dir.getCanonicalPath)
+          val rows = df.queryExecution.toRdd.map(_.copy()).collect()
+          val unsafeRows = rows.map(_.asInstanceOf[UnsafeRow])
+          if (!v) {
+            hash1 = unsafeRows(0).hashCode()
+            hash2 = unsafeRows(1).hashCode()
+          } else {
+            assert(hash1 == unsafeRows(0).hashCode())
+            assert(hash2 == unsafeRows(1).hashCode())
+          }
+        }
+      }
+    }
+  }
 }
 
 class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: 
TaskAttemptContext)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to