spark git commit: [SPARK-21312][SQL] correct offsetInBytes in UnsafeRow.writeToStream

2017-07-06 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 8f1ca6957 -> 7f7b63bb6


[SPARK-21312][SQL] correct offsetInBytes in UnsafeRow.writeToStream

## What changes were proposed in this pull request?

Corrects offsetInBytes calculation in UnsafeRow.writeToStream. Known failures 
include writes to some DataSources that have own SparkPlan implementations and 
cause EXCHANGE in writes.

## How was this patch tested?

Extended UnsafeRowSuite.writeToStream to include an UnsafeRow over byte array 
having non-zero offset.

Author: Sumedh Wale 

Closes #18535 from sumwale/SPARK-21312.

(cherry picked from commit 14a3bb3a008c302aac908d7deaf0942a98c63be7)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f7b63bb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f7b63bb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f7b63bb

Branch: refs/heads/branch-2.1
Commit: 7f7b63bb634c3b89db80cee99848ee94f9dca6ba
Parents: 8f1ca69
Author: Sumedh Wale 
Authored: Thu Jul 6 14:47:22 2017 +0800
Committer: Wenchen Fan 
Committed: Thu Jul 6 14:48:12 2017 +0800

--
 .../spark/sql/catalyst/expressions/UnsafeRow.java  |  2 +-
 .../scala/org/apache/spark/sql/UnsafeRowSuite.scala| 13 +
 2 files changed, 14 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7f7b63bb/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
--
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index d205547..b8e9388 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -550,7 +550,7 @@ public final class UnsafeRow extends InternalRow implements 
Externalizable, Kryo
*/
   public void writeToStream(OutputStream out, byte[] writeBuffer) throws 
IOException {
 if (baseObject instanceof byte[]) {
-  int offsetInByteArray = (int) (Platform.BYTE_ARRAY_OFFSET - baseOffset);
+  int offsetInByteArray = (int) (baseOffset - Platform.BYTE_ARRAY_OFFSET);
   out.write((byte[]) baseObject, offsetInByteArray, sizeInBytes);
 } else {
   int dataRemaining = sizeInBytes;

http://git-wip-us.apache.org/repos/asf/spark/blob/7f7b63bb/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
index a32763d..a5f904c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
@@ -101,9 +101,22 @@ class UnsafeRowSuite extends SparkFunSuite {
 MemoryAllocator.UNSAFE.free(offheapRowPage)
   }
 }
+val (bytesFromArrayBackedRowWithOffset, 
field0StringFromArrayBackedRowWithOffset) = {
+  val baos = new ByteArrayOutputStream()
+  val numBytes = arrayBackedUnsafeRow.getSizeInBytes
+  val bytesWithOffset = new Array[Byte](numBytes + 100)
+  
System.arraycopy(arrayBackedUnsafeRow.getBaseObject.asInstanceOf[Array[Byte]], 
0,
+bytesWithOffset, 100, numBytes)
+  val arrayBackedRow = new UnsafeRow(arrayBackedUnsafeRow.numFields())
+  arrayBackedRow.pointTo(bytesWithOffset, Platform.BYTE_ARRAY_OFFSET + 
100, numBytes)
+  arrayBackedRow.writeToStream(baos, null)
+  (baos.toByteArray, arrayBackedRow.getString(0))
+}
 
 assert(bytesFromArrayBackedRow === bytesFromOffheapRow)
 assert(field0StringFromArrayBackedRow === field0StringFromOffheapRow)
+assert(bytesFromArrayBackedRow === bytesFromArrayBackedRowWithOffset)
+assert(field0StringFromArrayBackedRow === 
field0StringFromArrayBackedRowWithOffset)
   }
 
   test("calling getDouble() and getFloat() on null columns") {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21312][SQL] correct offsetInBytes in UnsafeRow.writeToStream

2017-07-06 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 770fd2a23 -> 6e1081cbe


[SPARK-21312][SQL] correct offsetInBytes in UnsafeRow.writeToStream

## What changes were proposed in this pull request?

Corrects offsetInBytes calculation in UnsafeRow.writeToStream. Known failures 
include writes to some DataSources that have own SparkPlan implementations and 
cause EXCHANGE in writes.

## How was this patch tested?

Extended UnsafeRowSuite.writeToStream to include an UnsafeRow over byte array 
having non-zero offset.

Author: Sumedh Wale 

Closes #18535 from sumwale/SPARK-21312.

(cherry picked from commit 14a3bb3a008c302aac908d7deaf0942a98c63be7)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e1081cb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e1081cb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e1081cb

Branch: refs/heads/branch-2.2
Commit: 6e1081cbeac58826526b6ff7f2938a556b31ca9e
Parents: 770fd2a
Author: Sumedh Wale 
Authored: Thu Jul 6 14:47:22 2017 +0800
Committer: Wenchen Fan 
Committed: Thu Jul 6 14:47:43 2017 +0800

--
 .../spark/sql/catalyst/expressions/UnsafeRow.java  |  2 +-
 .../scala/org/apache/spark/sql/UnsafeRowSuite.scala| 13 +
 2 files changed, 14 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6e1081cb/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
--
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index 86de909..56994fa 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -550,7 +550,7 @@ public final class UnsafeRow extends InternalRow implements 
Externalizable, Kryo
*/
   public void writeToStream(OutputStream out, byte[] writeBuffer) throws 
IOException {
 if (baseObject instanceof byte[]) {
-  int offsetInByteArray = (int) (Platform.BYTE_ARRAY_OFFSET - baseOffset);
+  int offsetInByteArray = (int) (baseOffset - Platform.BYTE_ARRAY_OFFSET);
   out.write((byte[]) baseObject, offsetInByteArray, sizeInBytes);
 } else {
   int dataRemaining = sizeInBytes;

http://git-wip-us.apache.org/repos/asf/spark/blob/6e1081cb/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
index a32763d..a5f904c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
@@ -101,9 +101,22 @@ class UnsafeRowSuite extends SparkFunSuite {
 MemoryAllocator.UNSAFE.free(offheapRowPage)
   }
 }
+val (bytesFromArrayBackedRowWithOffset, 
field0StringFromArrayBackedRowWithOffset) = {
+  val baos = new ByteArrayOutputStream()
+  val numBytes = arrayBackedUnsafeRow.getSizeInBytes
+  val bytesWithOffset = new Array[Byte](numBytes + 100)
+  
System.arraycopy(arrayBackedUnsafeRow.getBaseObject.asInstanceOf[Array[Byte]], 
0,
+bytesWithOffset, 100, numBytes)
+  val arrayBackedRow = new UnsafeRow(arrayBackedUnsafeRow.numFields())
+  arrayBackedRow.pointTo(bytesWithOffset, Platform.BYTE_ARRAY_OFFSET + 
100, numBytes)
+  arrayBackedRow.writeToStream(baos, null)
+  (baos.toByteArray, arrayBackedRow.getString(0))
+}
 
 assert(bytesFromArrayBackedRow === bytesFromOffheapRow)
 assert(field0StringFromArrayBackedRow === field0StringFromOffheapRow)
+assert(bytesFromArrayBackedRow === bytesFromArrayBackedRowWithOffset)
+assert(field0StringFromArrayBackedRow === 
field0StringFromArrayBackedRowWithOffset)
   }
 
   test("calling getDouble() and getFloat() on null columns") {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21312][SQL] correct offsetInBytes in UnsafeRow.writeToStream

2017-07-06 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 75b168fd3 -> 14a3bb3a0


[SPARK-21312][SQL] correct offsetInBytes in UnsafeRow.writeToStream

## What changes were proposed in this pull request?

Corrects offsetInBytes calculation in UnsafeRow.writeToStream. Known failures 
include writes to some DataSources that have own SparkPlan implementations and 
cause EXCHANGE in writes.

## How was this patch tested?

Extended UnsafeRowSuite.writeToStream to include an UnsafeRow over byte array 
having non-zero offset.

Author: Sumedh Wale 

Closes #18535 from sumwale/SPARK-21312.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14a3bb3a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14a3bb3a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14a3bb3a

Branch: refs/heads/master
Commit: 14a3bb3a008c302aac908d7deaf0942a98c63be7
Parents: 75b168f
Author: Sumedh Wale 
Authored: Thu Jul 6 14:47:22 2017 +0800
Committer: Wenchen Fan 
Committed: Thu Jul 6 14:47:22 2017 +0800

--
 .../spark/sql/catalyst/expressions/UnsafeRow.java  |  2 +-
 .../scala/org/apache/spark/sql/UnsafeRowSuite.scala| 13 +
 2 files changed, 14 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/14a3bb3a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
--
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index 86de909..56994fa 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -550,7 +550,7 @@ public final class UnsafeRow extends InternalRow implements 
Externalizable, Kryo
*/
   public void writeToStream(OutputStream out, byte[] writeBuffer) throws 
IOException {
 if (baseObject instanceof byte[]) {
-  int offsetInByteArray = (int) (Platform.BYTE_ARRAY_OFFSET - baseOffset);
+  int offsetInByteArray = (int) (baseOffset - Platform.BYTE_ARRAY_OFFSET);
   out.write((byte[]) baseObject, offsetInByteArray, sizeInBytes);
 } else {
   int dataRemaining = sizeInBytes;

http://git-wip-us.apache.org/repos/asf/spark/blob/14a3bb3a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
index a32763d..a5f904c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
@@ -101,9 +101,22 @@ class UnsafeRowSuite extends SparkFunSuite {
 MemoryAllocator.UNSAFE.free(offheapRowPage)
   }
 }
+val (bytesFromArrayBackedRowWithOffset, 
field0StringFromArrayBackedRowWithOffset) = {
+  val baos = new ByteArrayOutputStream()
+  val numBytes = arrayBackedUnsafeRow.getSizeInBytes
+  val bytesWithOffset = new Array[Byte](numBytes + 100)
+  
System.arraycopy(arrayBackedUnsafeRow.getBaseObject.asInstanceOf[Array[Byte]], 
0,
+bytesWithOffset, 100, numBytes)
+  val arrayBackedRow = new UnsafeRow(arrayBackedUnsafeRow.numFields())
+  arrayBackedRow.pointTo(bytesWithOffset, Platform.BYTE_ARRAY_OFFSET + 
100, numBytes)
+  arrayBackedRow.writeToStream(baos, null)
+  (baos.toByteArray, arrayBackedRow.getString(0))
+}
 
 assert(bytesFromArrayBackedRow === bytesFromOffheapRow)
 assert(field0StringFromArrayBackedRow === field0StringFromOffheapRow)
+assert(bytesFromArrayBackedRow === bytesFromArrayBackedRowWithOffset)
+assert(field0StringFromArrayBackedRow === 
field0StringFromArrayBackedRowWithOffset)
   }
 
   test("calling getDouble() and getFloat() on null columns") {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org