[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-19 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/11759#issuecomment-197350986
  
**[Test build #53320 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53320/consoleFull)**
 for PR 11759 at commit 
[`2c2055a`](https://github.com/apache/spark/commit/2c2055a07619b97c431f71fbe7e0a42be2ef5d85).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/11759#issuecomment-198220376
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53502/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-19 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/11759#discussion_r56611882
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -218,48 +218,64 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
   }
 
   /**
-   * Runs this query returning the result as an array.
+   * Packing the UnsafeRows into byte array for faster serialization.
+   * The byte arrays are in the following format:
+   * [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1]
+   *
+   * UnsafeRow is highly compressible (at least 8 bytes for any column), 
the byte array is also
+   * compressed.
*/
-  def executeCollect(): Array[InternalRow] = {
-// Packing the UnsafeRows into byte array for faster serialization.
-// The byte arrays are in the following format:
-// [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1]
-//
-// UnsafeRow is highly compressible (at least 8 bytes for any column), 
the byte array is also
-// compressed.
-val byteArrayRdd = execute().mapPartitionsInternal { iter =>
+  private def getByteArrayRdd(n: Int = -1): RDD[Array[Byte]] = {
+execute().mapPartitionsInternal { iter =>
+  var count = 0
   val buffer = new Array[Byte](4 << 10)  // 4K
   val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
   val bos = new ByteArrayOutputStream()
   val out = new DataOutputStream(codec.compressedOutputStream(bos))
-  while (iter.hasNext) {
+  while (iter.hasNext && (n < 0 || count < n)) {
 val row = iter.next().asInstanceOf[UnsafeRow]
 out.writeInt(row.getSizeInBytes)
 row.writeToStream(out, buffer)
+count += 1
   }
   out.writeInt(-1)
   out.flush()
   out.close()
   Iterator(bos.toByteArray)
 }
+  }
 
-// Collect the byte arrays back to driver, then decode them as 
UnsafeRows.
+  /**
+   * Collect the byte arrays back to driver, then decode them as 
UnsafeRows.
+   */
+  private def collectRowFromBytes(bytes: Array[Byte]): Array[InternalRow] 
= {
 val nFields = schema.length
 val results = ArrayBuffer[InternalRow]()
 
+val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
+val bis = new ByteArrayInputStream(bytes)
+val ins = new DataInputStream(codec.compressedInputStream(bis))
+var sizeOfNextRow = ins.readInt()
+while (sizeOfNextRow >= 0) {
+  val bs = new Array[Byte](sizeOfNextRow)
+  ins.readFully(bs)
+  val row = new UnsafeRow(nFields)
+  row.pointTo(bs, sizeOfNextRow)
+  results += row
+  sizeOfNextRow = ins.readInt()
+}
+results.toArray
+  }
+
+  /**
+   * Runs this query returning the result as an array.
+   */
+  def executeCollect(): Array[InternalRow] = {
+val byteArrayRdd = getByteArrayRdd()
+
+val results = ArrayBuffer[InternalRow]()
 byteArrayRdd.collect().foreach { bytes =>
-  val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
-  val bis = new ByteArrayInputStream(bytes)
-  val ins = new DataInputStream(codec.compressedInputStream(bis))
-  var sizeOfNextRow = ins.readInt()
-  while (sizeOfNextRow >= 0) {
-val bs = new Array[Byte](sizeOfNextRow)
-ins.readFully(bs)
-val row = new UnsafeRow(nFields)
-row.pointTo(bs, sizeOfNextRow)
-results += row
-sizeOfNextRow = ins.readInt()
-  }
+  results ++= collectRowFromBytes(bytes)
--- End diff --

I think we should. Updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-19 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/11759#discussion_r56455618
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -218,48 +218,64 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
   }
 
   /**
-   * Runs this query returning the result as an array.
+   * Packing the UnsafeRows into byte array for faster serialization.
+   * The byte arrays are in the following format:
+   * [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1]
+   *
+   * UnsafeRow is highly compressible (at least 8 bytes for any column), 
the byte array is also
+   * compressed.
*/
-  def executeCollect(): Array[InternalRow] = {
-// Packing the UnsafeRows into byte array for faster serialization.
-// The byte arrays are in the following format:
-// [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1]
-//
-// UnsafeRow is highly compressible (at least 8 bytes for any column), 
the byte array is also
-// compressed.
-val byteArrayRdd = execute().mapPartitionsInternal { iter =>
+  private def getByteArrayRdd(n: Int = -1): RDD[Array[Byte]] = {
+execute().mapPartitionsInternal { iter =>
+  var count = 0
   val buffer = new Array[Byte](4 << 10)  // 4K
   val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
   val bos = new ByteArrayOutputStream()
   val out = new DataOutputStream(codec.compressedOutputStream(bos))
-  while (iter.hasNext) {
+  while (iter.hasNext && (n < 0 || count < n)) {
 val row = iter.next().asInstanceOf[UnsafeRow]
 out.writeInt(row.getSizeInBytes)
 row.writeToStream(out, buffer)
+count += 1
   }
   out.writeInt(-1)
   out.flush()
   out.close()
   Iterator(bos.toByteArray)
 }
+  }
 
-// Collect the byte arrays back to driver, then decode them as 
UnsafeRows.
+  /**
+   * Collect the byte arrays back to driver, then decode them as 
UnsafeRows.
+   */
+  private def collectRowFromBytes(bytes: Array[Byte]): Array[InternalRow] 
= {
 val nFields = schema.length
 val results = ArrayBuffer[InternalRow]()
 
+val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
+val bis = new ByteArrayInputStream(bytes)
+val ins = new DataInputStream(codec.compressedInputStream(bis))
+var sizeOfNextRow = ins.readInt()
+while (sizeOfNextRow >= 0) {
+  val bs = new Array[Byte](sizeOfNextRow)
+  ins.readFully(bs)
+  val row = new UnsafeRow(nFields)
+  row.pointTo(bs, sizeOfNextRow)
+  results += row
+  sizeOfNextRow = ins.readInt()
+}
+results.toArray
+  }
+
+  /**
+   * Runs this query returning the result as an array.
+   */
+  def executeCollect(): Array[InternalRow] = {
+val byteArrayRdd = getByteArrayRdd()
+
+val results = ArrayBuffer[InternalRow]()
 byteArrayRdd.collect().foreach { bytes =>
-  val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
-  val bis = new ByteArrayInputStream(bytes)
-  val ins = new DataInputStream(codec.compressedInputStream(bis))
-  var sizeOfNextRow = ins.readInt()
-  while (sizeOfNextRow >= 0) {
-val bs = new Array[Byte](sizeOfNextRow)
-ins.readFully(bs)
-val row = new UnsafeRow(nFields)
-row.pointTo(bs, sizeOfNextRow)
-results += row
-sizeOfNextRow = ins.readInt()
-  }
+  results ++= collectRowFromBytes(bytes)
--- End diff --

Should we avoid this copy ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-19 Thread viirya
Github user viirya commented on the pull request:

https://github.com/apache/spark/pull/11759#issuecomment-198222176
  
cc @davies The comments are addressed and tests are passed. Please see if 
this is ok now. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-19 Thread viirya
Github user viirya commented on the pull request:

https://github.com/apache/spark/pull/11759#issuecomment-197665452
  
cc @davies @rxin 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-19 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/11759#discussion_r56455468
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -218,48 +218,64 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
   }
 
   /**
-   * Runs this query returning the result as an array.
+   * Packing the UnsafeRows into byte array for faster serialization.
+   * The byte arrays are in the following format:
+   * [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1]
--- End diff --

This is the implementation details, it has nothing with the APIs, I'd like 
to keep these as comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-19 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/11759#issuecomment-198220270
  
**[Test build #53502 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53502/consoleFull)**
 for PR 11759 at commit 
[`ed9aa30`](https://github.com/apache/spark/commit/ed9aa301fac4af768eee36c5153e38faea9c658b).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-19 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/11759#issuecomment-198219518
  
**[Test build #53501 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53501/consoleFull)**
 for PR 11759 at commit 
[`6752775`](https://github.com/apache/spark/commit/675277591aa61464bb8d28bb6621e376e51fe6be).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/11759#issuecomment-197392779
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53320/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/11759#issuecomment-198219667
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53501/
Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-19 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/11759#issuecomment-197273468
  
**[Test build #53310 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53310/consoleFull)**
 for PR 11759 at commit 
[`d1306ad`](https://github.com/apache/spark/commit/d1306ade2efa407ce0785650d08aef4129aab2c3).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-19 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/11759#issuecomment-197392401
  
**[Test build #53320 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53320/consoleFull)**
 for PR 11759 at commit 
[`2c2055a`](https://github.com/apache/spark/commit/2c2055a07619b97c431f71fbe7e0a42be2ef5d85).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-19 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/11759#issuecomment-198203730
  
**[Test build #53502 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53502/consoleFull)**
 for PR 11759 at commit 
[`ed9aa30`](https://github.com/apache/spark/commit/ed9aa301fac4af768eee36c5153e38faea9c658b).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/11759#issuecomment-198219664
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-19 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/11759#discussion_r56455490
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -218,48 +218,64 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
   }
 
   /**
-   * Runs this query returning the result as an array.
+   * Packing the UnsafeRows into byte array for faster serialization.
+   * The byte arrays are in the following format:
+   * [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1]
--- End diff --

nvm, you make it as an function.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/11759#issuecomment-198220375
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-19 Thread davies
Github user davies commented on a diff in the pull request:

https://github.com/apache/spark/pull/11759#discussion_r56455662
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -218,48 +218,64 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
   }
 
   /**
-   * Runs this query returning the result as an array.
+   * Packing the UnsafeRows into byte array for faster serialization.
+   * The byte arrays are in the following format:
+   * [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1]
+   *
+   * UnsafeRow is highly compressible (at least 8 bytes for any column), 
the byte array is also
+   * compressed.
*/
-  def executeCollect(): Array[InternalRow] = {
-// Packing the UnsafeRows into byte array for faster serialization.
-// The byte arrays are in the following format:
-// [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1]
-//
-// UnsafeRow is highly compressible (at least 8 bytes for any column), 
the byte array is also
-// compressed.
-val byteArrayRdd = execute().mapPartitionsInternal { iter =>
+  private def getByteArrayRdd(n: Int = -1): RDD[Array[Byte]] = {
+execute().mapPartitionsInternal { iter =>
+  var count = 0
   val buffer = new Array[Byte](4 << 10)  // 4K
   val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
   val bos = new ByteArrayOutputStream()
   val out = new DataOutputStream(codec.compressedOutputStream(bos))
-  while (iter.hasNext) {
+  while (iter.hasNext && (n < 0 || count < n)) {
 val row = iter.next().asInstanceOf[UnsafeRow]
 out.writeInt(row.getSizeInBytes)
 row.writeToStream(out, buffer)
+count += 1
   }
   out.writeInt(-1)
   out.flush()
   out.close()
   Iterator(bos.toByteArray)
 }
+  }
 
-// Collect the byte arrays back to driver, then decode them as 
UnsafeRows.
+  /**
+   * Collect the byte arrays back to driver, then decode them as 
UnsafeRows.
+   */
+  private def collectRowFromBytes(bytes: Array[Byte]): Array[InternalRow] 
= {
--- End diff --

This function has nothong about `collect`, decodeUnsafeRow or 
deserializeUnsafeRows?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-19 Thread davies
Github user davies commented on the pull request:

https://github.com/apache/spark/pull/11759#issuecomment-198225556
  
LGTM, merging this into master, thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-19 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/11759#issuecomment-197273772
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53310/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/11759


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-18 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/11759#discussion_r56611911
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -218,48 +218,64 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
   }
 
   /**
-   * Runs this query returning the result as an array.
+   * Packing the UnsafeRows into byte array for faster serialization.
+   * The byte arrays are in the following format:
+   * [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1]
+   *
+   * UnsafeRow is highly compressible (at least 8 bytes for any column), 
the byte array is also
+   * compressed.
*/
-  def executeCollect(): Array[InternalRow] = {
-// Packing the UnsafeRows into byte array for faster serialization.
-// The byte arrays are in the following format:
-// [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1]
-//
-// UnsafeRow is highly compressible (at least 8 bytes for any column), 
the byte array is also
-// compressed.
-val byteArrayRdd = execute().mapPartitionsInternal { iter =>
+  private def getByteArrayRdd(n: Int = -1): RDD[Array[Byte]] = {
+execute().mapPartitionsInternal { iter =>
+  var count = 0
   val buffer = new Array[Byte](4 << 10)  // 4K
   val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
   val bos = new ByteArrayOutputStream()
   val out = new DataOutputStream(codec.compressedOutputStream(bos))
-  while (iter.hasNext) {
+  while (iter.hasNext && (n < 0 || count < n)) {
 val row = iter.next().asInstanceOf[UnsafeRow]
 out.writeInt(row.getSizeInBytes)
 row.writeToStream(out, buffer)
+count += 1
   }
   out.writeInt(-1)
   out.flush()
   out.close()
   Iterator(bos.toByteArray)
 }
+  }
 
-// Collect the byte arrays back to driver, then decode them as 
UnsafeRows.
+  /**
+   * Collect the byte arrays back to driver, then decode them as 
UnsafeRows.
+   */
+  private def collectRowFromBytes(bytes: Array[Byte]): Array[InternalRow] 
= {
--- End diff --

done with decodeUnsafeRow.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/11759#issuecomment-197273768
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-18 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/11759#issuecomment-198201967
  
**[Test build #53501 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53501/consoleFull)**
 for PR 11759 at commit 
[`6752775`](https://github.com/apache/spark/commit/675277591aa61464bb8d28bb6621e376e51fe6be).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-18 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/11759#issuecomment-197392772
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-16 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/11759#issuecomment-197228487
  
**[Test build #53310 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53310/consoleFull)**
 for PR 11759 at commit 
[`d1306ad`](https://github.com/apache/spark/commit/d1306ade2efa407ce0785650d08aef4129aab2c3).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-16 Thread viirya
Github user viirya commented on the pull request:

https://github.com/apache/spark/pull/11759#issuecomment-197227498
  
retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-16 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/11759#issuecomment-197225976
  
**[Test build #53309 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53309/consoleFull)**
 for PR 11759 at commit 
[`d1306ad`](https://github.com/apache/spark/commit/d1306ade2efa407ce0785650d08aef4129aab2c3).
 * This patch **fails to build**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-16 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/11759#issuecomment-197225986
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53309/
Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-16 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/11759#issuecomment-197225982
  
Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-16 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/11759#issuecomment-197224525
  
**[Test build #53309 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53309/consoleFull)**
 for PR 11759 at commit 
[`d1306ad`](https://github.com/apache/spark/commit/d1306ade2efa407ce0785650d08aef4129aab2c3).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...

2016-03-16 Thread viirya
GitHub user viirya opened a pull request:

https://github.com/apache/spark/pull/11759

[SPARK-13930][SQL] Apply fast serialization on collect limit operator

## What changes were proposed in this pull request?

JIRA: https://issues.apache.org/jira/browse/SPARK-13930

Recently the fast serialization has been introduced to collecting 
DataFrame/Dataset (#11664). The same technology can be used on collect limit 
operator too.

## How was this patch tested?

Add a benchmark for collect limit to `BenchmarkWholeStageCodegen`.

Without this patch:

model name  : Westmere E56xx/L56xx/X56xx (Nehalem-C)
collect limit:  Best/Avg Time(ms)Rate(M/s)   
Per Row(ns)   Relative

---
collect limit 1 million  3413 / 3768  0.3   
 3255.0   1.0X
collect limit 2 millions9728 / 10440  0.1   
 9277.3   0.4X

With this patch:

model name  : Westmere E56xx/L56xx/X56xx (Nehalem-C)
collect limit:  Best/Avg Time(ms)Rate(M/s)   
Per Row(ns)   Relative

---
collect limit 1 million   833 / 1284  1.3   
  794.4   1.0X
collect limit 2 millions 3348 / 4005  0.3   
 3193.3   0.2X


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/viirya/spark-1 execute-take

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/11759.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #11759


commit 16ac5fe6325c2cf7638faab854c09dc50de94f18
Author: Liang-Chi Hsieh 
Date:   2016-03-16T08:41:36Z

init import.

commit d1306ade2efa407ce0785650d08aef4129aab2c3
Author: Liang-Chi Hsieh 
Date:   2016-03-16T09:06:35Z

Use fast serialization on collect limit too.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org