spark git commit: [SPARK-21312][SQL] correct offsetInBytes in UnsafeRow.writeToStream
Repository: spark Updated Branches: refs/heads/branch-2.1 8f1ca6957 -> 7f7b63bb6 [SPARK-21312][SQL] correct offsetInBytes in UnsafeRow.writeToStream ## What changes were proposed in this pull request? Corrects offsetInBytes calculation in UnsafeRow.writeToStream. Known failures include writes to some DataSources that have own SparkPlan implementations and cause EXCHANGE in writes. ## How was this patch tested? Extended UnsafeRowSuite.writeToStream to include an UnsafeRow over byte array having non-zero offset. Author: Sumedh WaleCloses #18535 from sumwale/SPARK-21312. (cherry picked from commit 14a3bb3a008c302aac908d7deaf0942a98c63be7) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7f7b63bb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7f7b63bb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7f7b63bb Branch: refs/heads/branch-2.1 Commit: 7f7b63bb634c3b89db80cee99848ee94f9dca6ba Parents: 8f1ca69 Author: Sumedh Wale Authored: Thu Jul 6 14:47:22 2017 +0800 Committer: Wenchen Fan Committed: Thu Jul 6 14:48:12 2017 +0800 -- .../spark/sql/catalyst/expressions/UnsafeRow.java | 2 +- .../scala/org/apache/spark/sql/UnsafeRowSuite.scala| 13 + 2 files changed, 14 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7f7b63bb/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java -- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index d205547..b8e9388 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -550,7 +550,7 @@ public final class UnsafeRow extends InternalRow implements Externalizable, Kryo */ public void writeToStream(OutputStream out, byte[] writeBuffer) throws IOException { if (baseObject instanceof byte[]) { - int offsetInByteArray = (int) (Platform.BYTE_ARRAY_OFFSET - baseOffset); + int offsetInByteArray = (int) (baseOffset - Platform.BYTE_ARRAY_OFFSET); out.write((byte[]) baseObject, offsetInByteArray, sizeInBytes); } else { int dataRemaining = sizeInBytes; http://git-wip-us.apache.org/repos/asf/spark/blob/7f7b63bb/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala index a32763d..a5f904c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala @@ -101,9 +101,22 @@ class UnsafeRowSuite extends SparkFunSuite { MemoryAllocator.UNSAFE.free(offheapRowPage) } } +val (bytesFromArrayBackedRowWithOffset, field0StringFromArrayBackedRowWithOffset) = { + val baos = new ByteArrayOutputStream() + val numBytes = arrayBackedUnsafeRow.getSizeInBytes + val bytesWithOffset = new Array[Byte](numBytes + 100) + System.arraycopy(arrayBackedUnsafeRow.getBaseObject.asInstanceOf[Array[Byte]], 0, +bytesWithOffset, 100, numBytes) + val arrayBackedRow = new UnsafeRow(arrayBackedUnsafeRow.numFields()) + arrayBackedRow.pointTo(bytesWithOffset, Platform.BYTE_ARRAY_OFFSET + 100, numBytes) + arrayBackedRow.writeToStream(baos, null) + (baos.toByteArray, arrayBackedRow.getString(0)) +} assert(bytesFromArrayBackedRow === bytesFromOffheapRow) assert(field0StringFromArrayBackedRow === field0StringFromOffheapRow) +assert(bytesFromArrayBackedRow === bytesFromArrayBackedRowWithOffset) +assert(field0StringFromArrayBackedRow === field0StringFromArrayBackedRowWithOffset) } test("calling getDouble() and getFloat() on null columns") { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21312][SQL] correct offsetInBytes in UnsafeRow.writeToStream
Repository: spark Updated Branches: refs/heads/branch-2.2 770fd2a23 -> 6e1081cbe [SPARK-21312][SQL] correct offsetInBytes in UnsafeRow.writeToStream ## What changes were proposed in this pull request? Corrects offsetInBytes calculation in UnsafeRow.writeToStream. Known failures include writes to some DataSources that have own SparkPlan implementations and cause EXCHANGE in writes. ## How was this patch tested? Extended UnsafeRowSuite.writeToStream to include an UnsafeRow over byte array having non-zero offset. Author: Sumedh WaleCloses #18535 from sumwale/SPARK-21312. (cherry picked from commit 14a3bb3a008c302aac908d7deaf0942a98c63be7) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e1081cb Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e1081cb Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e1081cb Branch: refs/heads/branch-2.2 Commit: 6e1081cbeac58826526b6ff7f2938a556b31ca9e Parents: 770fd2a Author: Sumedh Wale Authored: Thu Jul 6 14:47:22 2017 +0800 Committer: Wenchen Fan Committed: Thu Jul 6 14:47:43 2017 +0800 -- .../spark/sql/catalyst/expressions/UnsafeRow.java | 2 +- .../scala/org/apache/spark/sql/UnsafeRowSuite.scala| 13 + 2 files changed, 14 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/6e1081cb/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java -- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index 86de909..56994fa 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -550,7 +550,7 @@ public final class UnsafeRow extends InternalRow implements Externalizable, Kryo */ public void writeToStream(OutputStream out, byte[] writeBuffer) throws IOException { if (baseObject instanceof byte[]) { - int offsetInByteArray = (int) (Platform.BYTE_ARRAY_OFFSET - baseOffset); + int offsetInByteArray = (int) (baseOffset - Platform.BYTE_ARRAY_OFFSET); out.write((byte[]) baseObject, offsetInByteArray, sizeInBytes); } else { int dataRemaining = sizeInBytes; http://git-wip-us.apache.org/repos/asf/spark/blob/6e1081cb/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala index a32763d..a5f904c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala @@ -101,9 +101,22 @@ class UnsafeRowSuite extends SparkFunSuite { MemoryAllocator.UNSAFE.free(offheapRowPage) } } +val (bytesFromArrayBackedRowWithOffset, field0StringFromArrayBackedRowWithOffset) = { + val baos = new ByteArrayOutputStream() + val numBytes = arrayBackedUnsafeRow.getSizeInBytes + val bytesWithOffset = new Array[Byte](numBytes + 100) + System.arraycopy(arrayBackedUnsafeRow.getBaseObject.asInstanceOf[Array[Byte]], 0, +bytesWithOffset, 100, numBytes) + val arrayBackedRow = new UnsafeRow(arrayBackedUnsafeRow.numFields()) + arrayBackedRow.pointTo(bytesWithOffset, Platform.BYTE_ARRAY_OFFSET + 100, numBytes) + arrayBackedRow.writeToStream(baos, null) + (baos.toByteArray, arrayBackedRow.getString(0)) +} assert(bytesFromArrayBackedRow === bytesFromOffheapRow) assert(field0StringFromArrayBackedRow === field0StringFromOffheapRow) +assert(bytesFromArrayBackedRow === bytesFromArrayBackedRowWithOffset) +assert(field0StringFromArrayBackedRow === field0StringFromArrayBackedRowWithOffset) } test("calling getDouble() and getFloat() on null columns") { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-21312][SQL] correct offsetInBytes in UnsafeRow.writeToStream
Repository: spark Updated Branches: refs/heads/master 75b168fd3 -> 14a3bb3a0 [SPARK-21312][SQL] correct offsetInBytes in UnsafeRow.writeToStream ## What changes were proposed in this pull request? Corrects offsetInBytes calculation in UnsafeRow.writeToStream. Known failures include writes to some DataSources that have own SparkPlan implementations and cause EXCHANGE in writes. ## How was this patch tested? Extended UnsafeRowSuite.writeToStream to include an UnsafeRow over byte array having non-zero offset. Author: Sumedh WaleCloses #18535 from sumwale/SPARK-21312. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14a3bb3a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14a3bb3a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14a3bb3a Branch: refs/heads/master Commit: 14a3bb3a008c302aac908d7deaf0942a98c63be7 Parents: 75b168f Author: Sumedh Wale Authored: Thu Jul 6 14:47:22 2017 +0800 Committer: Wenchen Fan Committed: Thu Jul 6 14:47:22 2017 +0800 -- .../spark/sql/catalyst/expressions/UnsafeRow.java | 2 +- .../scala/org/apache/spark/sql/UnsafeRowSuite.scala| 13 + 2 files changed, 14 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/14a3bb3a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java -- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index 86de909..56994fa 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -550,7 +550,7 @@ public final class UnsafeRow extends InternalRow implements Externalizable, Kryo */ public void writeToStream(OutputStream out, byte[] writeBuffer) throws IOException { if (baseObject instanceof byte[]) { - int offsetInByteArray = (int) (Platform.BYTE_ARRAY_OFFSET - baseOffset); + int offsetInByteArray = (int) (baseOffset - Platform.BYTE_ARRAY_OFFSET); out.write((byte[]) baseObject, offsetInByteArray, sizeInBytes); } else { int dataRemaining = sizeInBytes; http://git-wip-us.apache.org/repos/asf/spark/blob/14a3bb3a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala index a32763d..a5f904c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala @@ -101,9 +101,22 @@ class UnsafeRowSuite extends SparkFunSuite { MemoryAllocator.UNSAFE.free(offheapRowPage) } } +val (bytesFromArrayBackedRowWithOffset, field0StringFromArrayBackedRowWithOffset) = { + val baos = new ByteArrayOutputStream() + val numBytes = arrayBackedUnsafeRow.getSizeInBytes + val bytesWithOffset = new Array[Byte](numBytes + 100) + System.arraycopy(arrayBackedUnsafeRow.getBaseObject.asInstanceOf[Array[Byte]], 0, +bytesWithOffset, 100, numBytes) + val arrayBackedRow = new UnsafeRow(arrayBackedUnsafeRow.numFields()) + arrayBackedRow.pointTo(bytesWithOffset, Platform.BYTE_ARRAY_OFFSET + 100, numBytes) + arrayBackedRow.writeToStream(baos, null) + (baos.toByteArray, arrayBackedRow.getString(0)) +} assert(bytesFromArrayBackedRow === bytesFromOffheapRow) assert(field0StringFromArrayBackedRow === field0StringFromOffheapRow) +assert(bytesFromArrayBackedRow === bytesFromArrayBackedRowWithOffset) +assert(field0StringFromArrayBackedRow === field0StringFromArrayBackedRowWithOffset) } test("calling getDouble() and getFloat() on null columns") { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org