Repository: spark
Updated Branches:
  refs/heads/master 95dccc633 -> 137f47865


[SPARK-9551][SQL] add a cheap version of copy for UnsafeRow to reuse a copy 
buffer

Author: Wenchen Fan <cloud0...@outlook.com>

Closes #7885 from cloud-fan/cheap-copy and squashes the following commits:

0900ca1 [Wenchen Fan] replace == with ===
73f4ada [Wenchen Fan] add tests
07b865a [Wenchen Fan] add a cheap version of copy


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/137f4786
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/137f4786
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/137f4786

Branch: refs/heads/master
Commit: 137f47865df6e98ab70ae5ba30dc4d441fb41166
Parents: 95dccc6
Author: Wenchen Fan <cloud0...@outlook.com>
Authored: Mon Aug 3 04:21:15 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Mon Aug 3 04:21:15 2015 -0700

----------------------------------------------------------------------
 .../sql/catalyst/expressions/UnsafeRow.java     | 32 +++++++++++++++++
 .../org/apache/spark/sql/UnsafeRowSuite.scala   | 38 ++++++++++++++++++++
 2 files changed, 70 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/137f4786/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index c5d42d7..f4230cf 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -464,6 +464,38 @@ public final class UnsafeRow extends MutableRow {
   }
 
   /**
+   * Creates an empty UnsafeRow from a byte array with specified numBytes and 
numFields.
+   * The returned row is invalid until we call copyFrom on it.
+   */
+  public static UnsafeRow createFromByteArray(int numBytes, int numFields) {
+    final UnsafeRow row = new UnsafeRow();
+    row.pointTo(new byte[numBytes], numFields, numBytes);
+    return row;
+  }
+
+  /**
+   * Copies the input UnsafeRow to this UnsafeRow, and resize the underlying 
byte[] when the
+   * input row is larger than this row.
+   */
+  public void copyFrom(UnsafeRow row) {
+    // copyFrom is only available for UnsafeRow created from byte array.
+    assert (baseObject instanceof byte[]) && baseOffset == 
PlatformDependent.BYTE_ARRAY_OFFSET;
+    if (row.sizeInBytes > this.sizeInBytes) {
+      // resize the underlying byte[] if it's not large enough.
+      this.baseObject = new byte[row.sizeInBytes];
+    }
+    PlatformDependent.copyMemory(
+      row.baseObject,
+      row.baseOffset,
+      this.baseObject,
+      this.baseOffset,
+      row.sizeInBytes
+    );
+    // update the sizeInBytes.
+    this.sizeInBytes = row.sizeInBytes;
+  }
+
+  /**
    * Write this UnsafeRow's underlying bytes to the given OutputStream.
    *
    * @param out the stream to write to.

http://git-wip-us.apache.org/repos/asf/spark/blob/137f4786/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
index e72a1bc..c5faaa6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala
@@ -82,4 +82,42 @@ class UnsafeRowSuite extends SparkFunSuite {
       assert(unsafeRow.get(0, dataType) === null)
     }
   }
+
+  test("createFromByteArray and copyFrom") {
+    val row = InternalRow(1, UTF8String.fromString("abc"))
+    val converter = UnsafeProjection.create(Array[DataType](IntegerType, 
StringType))
+    val unsafeRow = converter.apply(row)
+
+    val emptyRow = UnsafeRow.createFromByteArray(64, 2)
+    val buffer = emptyRow.getBaseObject
+
+    emptyRow.copyFrom(unsafeRow)
+    assert(emptyRow.getSizeInBytes() === unsafeRow.getSizeInBytes)
+    assert(emptyRow.getInt(0) === unsafeRow.getInt(0))
+    assert(emptyRow.getUTF8String(1) === unsafeRow.getUTF8String(1))
+    // make sure we reuse the buffer.
+    assert(emptyRow.getBaseObject === buffer)
+
+    // make sure we really copied the input row.
+    unsafeRow.setInt(0, 2)
+    assert(emptyRow.getInt(0) === 1)
+
+    val longString = UTF8String.fromString((1 to 100).map(_ => "abc").reduce(_ 
+ _))
+    val row2 = InternalRow(3, longString)
+    val unsafeRow2 = converter.apply(row2)
+
+    // make sure we can resize.
+    emptyRow.copyFrom(unsafeRow2)
+    assert(emptyRow.getSizeInBytes() === unsafeRow2.getSizeInBytes)
+    assert(emptyRow.getInt(0) === 3)
+    assert(emptyRow.getUTF8String(1) === longString)
+    // make sure we really resized.
+    assert(emptyRow.getBaseObject != buffer)
+
+    // make sure we can still handle small rows after resize.
+    emptyRow.copyFrom(unsafeRow)
+    assert(emptyRow.getSizeInBytes() === unsafeRow.getSizeInBytes)
+    assert(emptyRow.getInt(0) === unsafeRow.getInt(0))
+    assert(emptyRow.getUTF8String(1) === unsafeRow.getUTF8String(1))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to