Repository: spark Updated Branches: refs/heads/master 95dccc633 -> 137f47865
[SPARK-9551][SQL] add a cheap version of copy for UnsafeRow to reuse a copy buffer Author: Wenchen Fan <cloud0...@outlook.com> Closes #7885 from cloud-fan/cheap-copy and squashes the following commits: 0900ca1 [Wenchen Fan] replace == with === 73f4ada [Wenchen Fan] add tests 07b865a [Wenchen Fan] add a cheap version of copy Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/137f4786 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/137f4786 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/137f4786 Branch: refs/heads/master Commit: 137f47865df6e98ab70ae5ba30dc4d441fb41166 Parents: 95dccc6 Author: Wenchen Fan <cloud0...@outlook.com> Authored: Mon Aug 3 04:21:15 2015 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Mon Aug 3 04:21:15 2015 -0700 ---------------------------------------------------------------------- .../sql/catalyst/expressions/UnsafeRow.java | 32 +++++++++++++++++ .../org/apache/spark/sql/UnsafeRowSuite.scala | 38 ++++++++++++++++++++ 2 files changed, 70 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/137f4786/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java index c5d42d7..f4230cf 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java @@ -464,6 +464,38 @@ public final class UnsafeRow extends MutableRow { } /** + * Creates an empty UnsafeRow from a byte array with specified numBytes and numFields. + * The returned row is invalid until we call copyFrom on it. + */ + public static UnsafeRow createFromByteArray(int numBytes, int numFields) { + final UnsafeRow row = new UnsafeRow(); + row.pointTo(new byte[numBytes], numFields, numBytes); + return row; + } + + /** + * Copies the input UnsafeRow to this UnsafeRow, and resize the underlying byte[] when the + * input row is larger than this row. + */ + public void copyFrom(UnsafeRow row) { + // copyFrom is only available for UnsafeRow created from byte array. + assert (baseObject instanceof byte[]) && baseOffset == PlatformDependent.BYTE_ARRAY_OFFSET; + if (row.sizeInBytes > this.sizeInBytes) { + // resize the underlying byte[] if it's not large enough. + this.baseObject = new byte[row.sizeInBytes]; + } + PlatformDependent.copyMemory( + row.baseObject, + row.baseOffset, + this.baseObject, + this.baseOffset, + row.sizeInBytes + ); + // update the sizeInBytes. + this.sizeInBytes = row.sizeInBytes; + } + + /** * Write this UnsafeRow's underlying bytes to the given OutputStream. * * @param out the stream to write to. http://git-wip-us.apache.org/repos/asf/spark/blob/137f4786/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala index e72a1bc..c5faaa6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala @@ -82,4 +82,42 @@ class UnsafeRowSuite extends SparkFunSuite { assert(unsafeRow.get(0, dataType) === null) } } + + test("createFromByteArray and copyFrom") { + val row = InternalRow(1, UTF8String.fromString("abc")) + val converter = UnsafeProjection.create(Array[DataType](IntegerType, StringType)) + val unsafeRow = converter.apply(row) + + val emptyRow = UnsafeRow.createFromByteArray(64, 2) + val buffer = emptyRow.getBaseObject + + emptyRow.copyFrom(unsafeRow) + assert(emptyRow.getSizeInBytes() === unsafeRow.getSizeInBytes) + assert(emptyRow.getInt(0) === unsafeRow.getInt(0)) + assert(emptyRow.getUTF8String(1) === unsafeRow.getUTF8String(1)) + // make sure we reuse the buffer. + assert(emptyRow.getBaseObject === buffer) + + // make sure we really copied the input row. + unsafeRow.setInt(0, 2) + assert(emptyRow.getInt(0) === 1) + + val longString = UTF8String.fromString((1 to 100).map(_ => "abc").reduce(_ + _)) + val row2 = InternalRow(3, longString) + val unsafeRow2 = converter.apply(row2) + + // make sure we can resize. + emptyRow.copyFrom(unsafeRow2) + assert(emptyRow.getSizeInBytes() === unsafeRow2.getSizeInBytes) + assert(emptyRow.getInt(0) === 3) + assert(emptyRow.getUTF8String(1) === longString) + // make sure we really resized. + assert(emptyRow.getBaseObject != buffer) + + // make sure we can still handle small rows after resize. + emptyRow.copyFrom(unsafeRow) + assert(emptyRow.getSizeInBytes() === unsafeRow.getSizeInBytes) + assert(emptyRow.getInt(0) === unsafeRow.getInt(0)) + assert(emptyRow.getUTF8String(1) === unsafeRow.getUTF8String(1)) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org