This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new e5bef51  [SPARK-31703][SQL] Parquet RLE float/double are read 
incorrectly on big endian platforms
e5bef51 is described below

commit e5bef51826dc2ff4020879e35ae7eb9019aa7fcd
Author: Tin Hang To <ti...@us.ibm.com>
AuthorDate: Thu Aug 13 03:48:33 2020 +0000

    [SPARK-31703][SQL] Parquet RLE float/double are read incorrectly on big 
endian platforms
    
    ### What changes were proposed in this pull request?
    (back-porting from 
https://github.com/apache/spark/commit/9a3811dbf5f1234c1587337a3d74823d1d163b53)
    
    This PR fixes the issue introduced during SPARK-26985.
    
    SPARK-26985 changes the `putDoubles()` and `putFloats()` methods to respect 
the platform's endian-ness.  However, that causes the RLE paths in 
VectorizedRleValuesReader.java to read the RLE entries in parquet as BIG_ENDIAN 
on big endian platforms (i.e., as is), even though parquet data is always in 
little endian format.
    
    The comments in `WriteableColumnVector.java` say those methods are used for 
"ieee formatted doubles in platform native endian" (or floats), but since the 
data in parquet is always in little endian format, use of those methods appears 
to be inappropriate.
    
    To demonstrate the problem with spark-shell:
    
    ```scala
    import org.apache.spark._
    import org.apache.spark.sql._
    import org.apache.spark.sql.types._
    
    var data = Seq(
      (1.0, 0.1),
      (2.0, 0.2),
      (0.3, 3.0),
      (4.0, 4.0),
      (5.0, 5.0))
    
    var df = 
spark.createDataFrame(data).write.mode(SaveMode.Overwrite).parquet("/tmp/data.parquet2")
    var df2 = spark.read.parquet("/tmp/data.parquet2")
    df2.show()
    ```
    
    result:
    
    ```scala
    +--------------------+--------------------+
    |                  _1|                  _2|
    +--------------------+--------------------+
    |           3.16E-322|-1.54234871366845...|
    |         2.0553E-320|         2.0553E-320|
    |          2.561E-320|          2.561E-320|
    |4.66726145843124E-62|         1.0435E-320|
    |        3.03865E-319|-1.54234871366757...|
    +--------------------+--------------------+
    ```
    
    Also tests in ParquetIOSuite that involve float/double data would fail, 
e.g.,
    
    - basic data types (without binary)
    - read raw Parquet file
    
    /examples/src/main/python/mllib/isotonic_regression_example.py would fail 
as well.
    
    Purposed code change is to add `putDoublesLittleEndian()` and 
`putFloatsLittleEndian()` methods for parquet to invoke, just like the existing 
`putIntsLittleEndian()` and `putLongsLittleEndian()`.  On little endian 
platforms they would call `putDoubles()` and `putFloats()`, on big endian they 
would read the entries as little endian like pre-SPARK-26985.
    
    No new unit-test is introduced as the existing ones are actually sufficient.
    
    ### Why are the changes needed?
    RLE float/double data in parquet files will not be read back correctly on 
big endian platforms.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    All unit tests (mvn test) were ran and OK.
    
    Closes #29419 from tinhto-000/SPARK-31703-2.4.
    
    Lead-authored-by: Tin Hang To <ti...@us.ibm.com>
    Co-authored-by: angerszhu <angers....@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../parquet/VectorizedPlainValuesReader.java       |   4 +-
 .../execution/vectorized/OffHeapColumnVector.java  |  26 +++++
 .../execution/vectorized/OnHeapColumnVector.java   |  24 +++++
 .../execution/vectorized/WritableColumnVector.java |  12 +++
 .../execution/vectorized/ColumnarBatchSuite.scala  | 116 ++++++++++++++++++---
 5 files changed, 166 insertions(+), 16 deletions(-)

diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
index c62dc3d..fd228ee 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -103,7 +103,7 @@ public class VectorizedPlainValuesReader extends 
ValuesReader implements Vectori
 
     if (buffer.hasArray()) {
       int offset = buffer.arrayOffset() + buffer.position();
-      c.putFloats(rowId, total, buffer.array(), offset);
+      c.putFloatsLittleEndian(rowId, total, buffer.array(), offset);
     } else {
       for (int i = 0; i < total; i += 1) {
         c.putFloat(rowId + i, buffer.getFloat());
@@ -118,7 +118,7 @@ public class VectorizedPlainValuesReader extends 
ValuesReader implements Vectori
 
     if (buffer.hasArray()) {
       int offset = buffer.arrayOffset() + buffer.position();
-      c.putDoubles(rowId, total, buffer.array(), offset);
+      c.putDoublesLittleEndian(rowId, total, buffer.array(), offset);
     } else {
       for (int i = 0; i < total; i += 1) {
         c.putDouble(rowId + i, buffer.getDouble());
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 3b919c7..982a26c 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -426,6 +426,19 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
   }
 
   @Override
+  public void putFloatsLittleEndian(int rowId, int count, byte[] src, int 
srcIndex) {
+    if (!bigEndianPlatform) {
+      putFloats(rowId, count, src, srcIndex);
+    } else {
+      ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
+      long offset = data + 4L * rowId;
+      for (int i = 0; i < count; ++i, offset += 4) {
+        Platform.putFloat(null, offset, bb.getFloat(srcIndex + (4 * i)));
+      }
+    }
+  }
+
+  @Override
   public float getFloat(int rowId) {
     if (dictionary == null) {
       return Platform.getFloat(null, data + rowId * 4L);
@@ -481,6 +494,19 @@ public final class OffHeapColumnVector extends 
WritableColumnVector {
   }
 
   @Override
+  public void putDoublesLittleEndian(int rowId, int count, byte[] src, int 
srcIndex) {
+    if (!bigEndianPlatform) {
+      putDoubles(rowId, count, src, srcIndex);
+    } else {
+      ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
+      long offset = data + 8L * rowId;
+      for (int i = 0; i < count; ++i, offset += 8) {
+        Platform.putDouble(null, offset, bb.getDouble(srcIndex + (8 * i)));
+      }
+    }
+  }
+
+  @Override
   public double getDouble(int rowId) {
     if (dictionary == null) {
       return Platform.getDouble(null, data + rowId * 8L);
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index 1bf3126..625b78b 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -404,6 +404,18 @@ public final class OnHeapColumnVector extends 
WritableColumnVector {
   }
 
   @Override
+  public void putFloatsLittleEndian(int rowId, int count, byte[] src, int 
srcIndex) {
+    if (!bigEndianPlatform) {
+      putFloats(rowId, count, src, srcIndex);
+    } else {
+      ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
+      for (int i = 0; i < count; ++i) {
+        floatData[i + rowId] = bb.getFloat(srcIndex + (4 * i));
+      }
+    }
+  }
+
+  @Override
   public float getFloat(int rowId) {
     if (dictionary == null) {
       return floatData[rowId];
@@ -453,6 +465,18 @@ public final class OnHeapColumnVector extends 
WritableColumnVector {
   }
 
   @Override
+  public void putDoublesLittleEndian(int rowId, int count, byte[] src, int 
srcIndex) {
+    if (!bigEndianPlatform) {
+      putDoubles(rowId, count, src, srcIndex);
+    } else {
+      ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.LITTLE_ENDIAN);
+      for (int i = 0; i < count; ++i) {
+        doubleData[i + rowId] = bb.getDouble(srcIndex + (8 * i));
+      }
+    }
+  }
+
+  @Override
   public double getDouble(int rowId) {
     if (dictionary == null) {
       return doubleData[rowId];
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
index b0e119d..05e8152 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -311,6 +311,12 @@ public abstract class WritableColumnVector extends 
ColumnVector {
   public abstract void putFloats(int rowId, int count, byte[] src, int 
srcIndex);
 
   /**
+   * Sets values from [src[srcIndex], src[srcIndex + count * 4]) to [rowId, 
rowId + count)
+   * The data in src must be ieee formatted floats in little endian.
+   */
+  public abstract void putFloatsLittleEndian(int rowId, int count, byte[] src, 
int srcIndex);
+
+  /**
    * Sets `value` to the value at rowId.
    */
   public abstract void putDouble(int rowId, double value);
@@ -332,6 +338,12 @@ public abstract class WritableColumnVector extends 
ColumnVector {
   public abstract void putDoubles(int rowId, int count, byte[] src, int 
srcIndex);
 
   /**
+   * Sets values from [src[srcIndex], src[srcIndex + count * 8]) to [rowId, 
rowId + count)
+   * The data in src must be ieee formatted doubles in little endian.
+   */
+  public abstract void putDoublesLittleEndian(int rowId, int count, byte[] 
src, int srcIndex);
+
+  /**
    * Puts a byte array that already exists in this column.
    */
   public abstract void putArray(int rowId, int offset, int length);
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index f57f07b..086d88b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -444,13 +444,6 @@ class ColumnarBatchSuite extends SparkFunSuite {
       Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET, 2.234f)
       Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET + 4, 1.123f)
 
-      if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) {
-        // Ensure array contains Little Endian floats
-        val bb = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN)
-        Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET, bb.getFloat(0))
-        Platform.putFloat(buffer, Platform.BYTE_ARRAY_OFFSET + 4, 
bb.getFloat(4))
-      }
-
       column.putFloats(idx, 1, buffer, 4)
       column.putFloats(idx + 1, 1, buffer, 0)
       reference += 1.123f
@@ -488,6 +481,57 @@ class ColumnarBatchSuite extends SparkFunSuite {
       }
   }
 
+  testVector("[SPARK-31703] Float API - Little Endian", 1024, FloatType) {
+    column =>
+      val seed = System.currentTimeMillis()
+      val random = new Random(seed)
+      val reference = mutable.ArrayBuffer.empty[Float]
+
+      var idx = 0
+
+      val littleEndian = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN)
+      littleEndian.putFloat(0, 1.357f)
+      littleEndian.putFloat(4, 2.468f)
+      val arr = new Array[Byte](littleEndian.remaining)
+      littleEndian.get(arr)
+
+      column.putFloatsLittleEndian(idx, 1, arr, 4)
+      column.putFloatsLittleEndian(idx + 1, 1, arr, 0)
+      reference += 2.468f
+      reference += 1.357f
+      idx += 2
+
+      column.putFloatsLittleEndian(idx, 2, arr, 0)
+      reference += 1.357f
+      reference += 2.468f
+      idx += 2
+
+      while (idx < column.capacity) {
+        val single = random.nextBoolean()
+        if (single) {
+          val v = random.nextFloat()
+          column.putFloat(idx, v)
+          reference += v
+          idx += 1
+        } else {
+          val n = math.min(random.nextInt(column.capacity / 20), 
column.capacity - idx)
+          val v = random.nextFloat()
+          column.putFloats(idx, n, v)
+          var i = 0
+          while (i < n) {
+            reference += v
+            i += 1
+          }
+          idx += n
+        }
+      }
+
+      reference.zipWithIndex.foreach { v =>
+        assert(v._1 == column.getFloat(v._2),
+          "Seed = " + seed + " VectorType=" + column.getClass.getSimpleName)
+      }
+  }
+
   testVector("Double APIs", 1024, DoubleType) {
     column =>
       val seed = System.currentTimeMillis()
@@ -528,13 +572,6 @@ class ColumnarBatchSuite extends SparkFunSuite {
       Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET, 2.234)
       Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET + 8, 1.123)
 
-      if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) {
-        // Ensure array contains Little Endian doubles
-        val bb = ByteBuffer.wrap(buffer).order(ByteOrder.LITTLE_ENDIAN)
-        Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET, bb.getDouble(0))
-        Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET + 8, 
bb.getDouble(8))
-      }
-
       column.putDoubles(idx, 1, buffer, 8)
       column.putDoubles(idx + 1, 1, buffer, 0)
       reference += 1.123
@@ -572,6 +609,57 @@ class ColumnarBatchSuite extends SparkFunSuite {
       }
   }
 
+  testVector("[SPARK-31703] Double API - Little Endian", 1024, DoubleType) {
+    column =>
+      val seed = System.currentTimeMillis()
+      val random = new Random(seed)
+      val reference = mutable.ArrayBuffer.empty[Double]
+
+      var idx = 0
+
+      val littleEndian = ByteBuffer.allocate(16).order(ByteOrder.LITTLE_ENDIAN)
+      littleEndian.putDouble(0, 1.357)
+      littleEndian.putDouble(8, 2.468)
+      val arr = new Array[Byte](littleEndian.remaining)
+      littleEndian.get(arr)
+
+      column.putDoublesLittleEndian(idx, 1, arr, 8)
+      column.putDoublesLittleEndian(idx + 1, 1, arr, 0)
+      reference += 2.468
+      reference += 1.357
+      idx += 2
+
+      column.putDoublesLittleEndian(idx, 2, arr, 0)
+      reference += 1.357
+      reference += 2.468
+      idx += 2
+
+      while (idx < column.capacity) {
+        val single = random.nextBoolean()
+        if (single) {
+          val v = random.nextDouble()
+          column.putDouble(idx, v)
+          reference += v
+          idx += 1
+        } else {
+          val n = math.min(random.nextInt(column.capacity / 20), 
column.capacity - idx)
+          val v = random.nextDouble()
+          column.putDoubles(idx, n, v)
+          var i = 0
+          while (i < n) {
+            reference += v
+            i += 1
+          }
+          idx += n
+        }
+      }
+
+      reference.zipWithIndex.foreach { v =>
+        assert(v._1 == column.getDouble(v._2),
+          "Seed = " + seed + " VectorType=" + column.getClass.getSimpleName)
+      }
+  }
+
   testVector("String APIs", 7, StringType) {
     column =>
       val reference = mutable.ArrayBuffer.empty[String]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to