[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11759#issuecomment-197350986 **[Test build #53320 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53320/consoleFull)** for PR 11759 at commit [`2c2055a`](https://github.com/apache/spark/commit/2c2055a07619b97c431f71fbe7e0a42be2ef5d85). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/11759#issuecomment-198220376 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53502/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/11759#discussion_r56611882 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -218,48 +218,64 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ } /** - * Runs this query returning the result as an array. + * Packing the UnsafeRows into byte array for faster serialization. + * The byte arrays are in the following format: + * [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1] + * + * UnsafeRow is highly compressible (at least 8 bytes for any column), the byte array is also + * compressed. */ - def executeCollect(): Array[InternalRow] = { -// Packing the UnsafeRows into byte array for faster serialization. -// The byte arrays are in the following format: -// [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1] -// -// UnsafeRow is highly compressible (at least 8 bytes for any column), the byte array is also -// compressed. -val byteArrayRdd = execute().mapPartitionsInternal { iter => + private def getByteArrayRdd(n: Int = -1): RDD[Array[Byte]] = { +execute().mapPartitionsInternal { iter => + var count = 0 val buffer = new Array[Byte](4 << 10) // 4K val codec = CompressionCodec.createCodec(SparkEnv.get.conf) val bos = new ByteArrayOutputStream() val out = new DataOutputStream(codec.compressedOutputStream(bos)) - while (iter.hasNext) { + while (iter.hasNext && (n < 0 || count < n)) { val row = iter.next().asInstanceOf[UnsafeRow] out.writeInt(row.getSizeInBytes) row.writeToStream(out, buffer) +count += 1 } out.writeInt(-1) out.flush() out.close() Iterator(bos.toByteArray) } + } -// Collect the byte arrays back to driver, then decode them as UnsafeRows. + /** + * Collect the byte arrays back to driver, then decode them as UnsafeRows. + */ + private def collectRowFromBytes(bytes: Array[Byte]): Array[InternalRow] = { val nFields = schema.length val results = ArrayBuffer[InternalRow]() +val codec = CompressionCodec.createCodec(SparkEnv.get.conf) +val bis = new ByteArrayInputStream(bytes) +val ins = new DataInputStream(codec.compressedInputStream(bis)) +var sizeOfNextRow = ins.readInt() +while (sizeOfNextRow >= 0) { + val bs = new Array[Byte](sizeOfNextRow) + ins.readFully(bs) + val row = new UnsafeRow(nFields) + row.pointTo(bs, sizeOfNextRow) + results += row + sizeOfNextRow = ins.readInt() +} +results.toArray + } + + /** + * Runs this query returning the result as an array. + */ + def executeCollect(): Array[InternalRow] = { +val byteArrayRdd = getByteArrayRdd() + +val results = ArrayBuffer[InternalRow]() byteArrayRdd.collect().foreach { bytes => - val codec = CompressionCodec.createCodec(SparkEnv.get.conf) - val bis = new ByteArrayInputStream(bytes) - val ins = new DataInputStream(codec.compressedInputStream(bis)) - var sizeOfNextRow = ins.readInt() - while (sizeOfNextRow >= 0) { -val bs = new Array[Byte](sizeOfNextRow) -ins.readFully(bs) -val row = new UnsafeRow(nFields) -row.pointTo(bs, sizeOfNextRow) -results += row -sizeOfNextRow = ins.readInt() - } + results ++= collectRowFromBytes(bytes) --- End diff -- I think we should. Updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/11759#discussion_r56455618 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -218,48 +218,64 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ } /** - * Runs this query returning the result as an array. + * Packing the UnsafeRows into byte array for faster serialization. + * The byte arrays are in the following format: + * [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1] + * + * UnsafeRow is highly compressible (at least 8 bytes for any column), the byte array is also + * compressed. */ - def executeCollect(): Array[InternalRow] = { -// Packing the UnsafeRows into byte array for faster serialization. -// The byte arrays are in the following format: -// [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1] -// -// UnsafeRow is highly compressible (at least 8 bytes for any column), the byte array is also -// compressed. -val byteArrayRdd = execute().mapPartitionsInternal { iter => + private def getByteArrayRdd(n: Int = -1): RDD[Array[Byte]] = { +execute().mapPartitionsInternal { iter => + var count = 0 val buffer = new Array[Byte](4 << 10) // 4K val codec = CompressionCodec.createCodec(SparkEnv.get.conf) val bos = new ByteArrayOutputStream() val out = new DataOutputStream(codec.compressedOutputStream(bos)) - while (iter.hasNext) { + while (iter.hasNext && (n < 0 || count < n)) { val row = iter.next().asInstanceOf[UnsafeRow] out.writeInt(row.getSizeInBytes) row.writeToStream(out, buffer) +count += 1 } out.writeInt(-1) out.flush() out.close() Iterator(bos.toByteArray) } + } -// Collect the byte arrays back to driver, then decode them as UnsafeRows. + /** + * Collect the byte arrays back to driver, then decode them as UnsafeRows. + */ + private def collectRowFromBytes(bytes: Array[Byte]): Array[InternalRow] = { val nFields = schema.length val results = ArrayBuffer[InternalRow]() +val codec = CompressionCodec.createCodec(SparkEnv.get.conf) +val bis = new ByteArrayInputStream(bytes) +val ins = new DataInputStream(codec.compressedInputStream(bis)) +var sizeOfNextRow = ins.readInt() +while (sizeOfNextRow >= 0) { + val bs = new Array[Byte](sizeOfNextRow) + ins.readFully(bs) + val row = new UnsafeRow(nFields) + row.pointTo(bs, sizeOfNextRow) + results += row + sizeOfNextRow = ins.readInt() +} +results.toArray + } + + /** + * Runs this query returning the result as an array. + */ + def executeCollect(): Array[InternalRow] = { +val byteArrayRdd = getByteArrayRdd() + +val results = ArrayBuffer[InternalRow]() byteArrayRdd.collect().foreach { bytes => - val codec = CompressionCodec.createCodec(SparkEnv.get.conf) - val bis = new ByteArrayInputStream(bytes) - val ins = new DataInputStream(codec.compressedInputStream(bis)) - var sizeOfNextRow = ins.readInt() - while (sizeOfNextRow >= 0) { -val bs = new Array[Byte](sizeOfNextRow) -ins.readFully(bs) -val row = new UnsafeRow(nFields) -row.pointTo(bs, sizeOfNextRow) -results += row -sizeOfNextRow = ins.readInt() - } + results ++= collectRowFromBytes(bytes) --- End diff -- Should we avoid this copy ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
Github user viirya commented on the pull request: https://github.com/apache/spark/pull/11759#issuecomment-198222176 cc @davies The comments are addressed and tests are passed. Please see if this is ok now. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
Github user viirya commented on the pull request: https://github.com/apache/spark/pull/11759#issuecomment-197665452 cc @davies @rxin --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/11759#discussion_r56455468 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -218,48 +218,64 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ } /** - * Runs this query returning the result as an array. + * Packing the UnsafeRows into byte array for faster serialization. + * The byte arrays are in the following format: + * [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1] --- End diff -- This is the implementation details, it has nothing with the APIs, I'd like to keep these as comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11759#issuecomment-198220270 **[Test build #53502 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53502/consoleFull)** for PR 11759 at commit [`ed9aa30`](https://github.com/apache/spark/commit/ed9aa301fac4af768eee36c5153e38faea9c658b). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11759#issuecomment-198219518 **[Test build #53501 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53501/consoleFull)** for PR 11759 at commit [`6752775`](https://github.com/apache/spark/commit/675277591aa61464bb8d28bb6621e376e51fe6be). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/11759#issuecomment-197392779 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53320/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/11759#issuecomment-198219667 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53501/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11759#issuecomment-197273468 **[Test build #53310 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53310/consoleFull)** for PR 11759 at commit [`d1306ad`](https://github.com/apache/spark/commit/d1306ade2efa407ce0785650d08aef4129aab2c3). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11759#issuecomment-197392401 **[Test build #53320 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53320/consoleFull)** for PR 11759 at commit [`2c2055a`](https://github.com/apache/spark/commit/2c2055a07619b97c431f71fbe7e0a42be2ef5d85). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11759#issuecomment-198203730 **[Test build #53502 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53502/consoleFull)** for PR 11759 at commit [`ed9aa30`](https://github.com/apache/spark/commit/ed9aa301fac4af768eee36c5153e38faea9c658b). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/11759#issuecomment-198219664 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/11759#discussion_r56455490 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -218,48 +218,64 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ } /** - * Runs this query returning the result as an array. + * Packing the UnsafeRows into byte array for faster serialization. + * The byte arrays are in the following format: + * [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1] --- End diff -- nvm, you make it as an function. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/11759#issuecomment-198220375 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
Github user davies commented on a diff in the pull request: https://github.com/apache/spark/pull/11759#discussion_r56455662 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -218,48 +218,64 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ } /** - * Runs this query returning the result as an array. + * Packing the UnsafeRows into byte array for faster serialization. + * The byte arrays are in the following format: + * [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1] + * + * UnsafeRow is highly compressible (at least 8 bytes for any column), the byte array is also + * compressed. */ - def executeCollect(): Array[InternalRow] = { -// Packing the UnsafeRows into byte array for faster serialization. -// The byte arrays are in the following format: -// [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1] -// -// UnsafeRow is highly compressible (at least 8 bytes for any column), the byte array is also -// compressed. -val byteArrayRdd = execute().mapPartitionsInternal { iter => + private def getByteArrayRdd(n: Int = -1): RDD[Array[Byte]] = { +execute().mapPartitionsInternal { iter => + var count = 0 val buffer = new Array[Byte](4 << 10) // 4K val codec = CompressionCodec.createCodec(SparkEnv.get.conf) val bos = new ByteArrayOutputStream() val out = new DataOutputStream(codec.compressedOutputStream(bos)) - while (iter.hasNext) { + while (iter.hasNext && (n < 0 || count < n)) { val row = iter.next().asInstanceOf[UnsafeRow] out.writeInt(row.getSizeInBytes) row.writeToStream(out, buffer) +count += 1 } out.writeInt(-1) out.flush() out.close() Iterator(bos.toByteArray) } + } -// Collect the byte arrays back to driver, then decode them as UnsafeRows. + /** + * Collect the byte arrays back to driver, then decode them as UnsafeRows. + */ + private def collectRowFromBytes(bytes: Array[Byte]): Array[InternalRow] = { --- End diff -- This function has nothong about `collect`, decodeUnsafeRow or deserializeUnsafeRows? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
Github user davies commented on the pull request: https://github.com/apache/spark/pull/11759#issuecomment-198225556 LGTM, merging this into master, thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/11759#issuecomment-197273772 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53310/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/11759 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/11759#discussion_r56611911 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -218,48 +218,64 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ } /** - * Runs this query returning the result as an array. + * Packing the UnsafeRows into byte array for faster serialization. + * The byte arrays are in the following format: + * [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1] + * + * UnsafeRow is highly compressible (at least 8 bytes for any column), the byte array is also + * compressed. */ - def executeCollect(): Array[InternalRow] = { -// Packing the UnsafeRows into byte array for faster serialization. -// The byte arrays are in the following format: -// [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1] -// -// UnsafeRow is highly compressible (at least 8 bytes for any column), the byte array is also -// compressed. -val byteArrayRdd = execute().mapPartitionsInternal { iter => + private def getByteArrayRdd(n: Int = -1): RDD[Array[Byte]] = { +execute().mapPartitionsInternal { iter => + var count = 0 val buffer = new Array[Byte](4 << 10) // 4K val codec = CompressionCodec.createCodec(SparkEnv.get.conf) val bos = new ByteArrayOutputStream() val out = new DataOutputStream(codec.compressedOutputStream(bos)) - while (iter.hasNext) { + while (iter.hasNext && (n < 0 || count < n)) { val row = iter.next().asInstanceOf[UnsafeRow] out.writeInt(row.getSizeInBytes) row.writeToStream(out, buffer) +count += 1 } out.writeInt(-1) out.flush() out.close() Iterator(bos.toByteArray) } + } -// Collect the byte arrays back to driver, then decode them as UnsafeRows. + /** + * Collect the byte arrays back to driver, then decode them as UnsafeRows. + */ + private def collectRowFromBytes(bytes: Array[Byte]): Array[InternalRow] = { --- End diff -- done with decodeUnsafeRow. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/11759#issuecomment-197273768 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11759#issuecomment-198201967 **[Test build #53501 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53501/consoleFull)** for PR 11759 at commit [`6752775`](https://github.com/apache/spark/commit/675277591aa61464bb8d28bb6621e376e51fe6be). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/11759#issuecomment-197392772 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11759#issuecomment-197228487 **[Test build #53310 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53310/consoleFull)** for PR 11759 at commit [`d1306ad`](https://github.com/apache/spark/commit/d1306ade2efa407ce0785650d08aef4129aab2c3). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
Github user viirya commented on the pull request: https://github.com/apache/spark/pull/11759#issuecomment-197227498 retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11759#issuecomment-197225976 **[Test build #53309 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53309/consoleFull)** for PR 11759 at commit [`d1306ad`](https://github.com/apache/spark/commit/d1306ade2efa407ce0785650d08aef4129aab2c3). * This patch **fails to build**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/11759#issuecomment-197225986 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/53309/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/11759#issuecomment-197225982 Merged build finished. Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/11759#issuecomment-197224525 **[Test build #53309 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/53309/consoleFull)** for PR 11759 at commit [`d1306ad`](https://github.com/apache/spark/commit/d1306ade2efa407ce0785650d08aef4129aab2c3). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-13930][SQL] Apply fast serialization on...
GitHub user viirya opened a pull request: https://github.com/apache/spark/pull/11759 [SPARK-13930][SQL] Apply fast serialization on collect limit operator ## What changes were proposed in this pull request? JIRA: https://issues.apache.org/jira/browse/SPARK-13930 Recently the fast serialization has been introduced to collecting DataFrame/Dataset (#11664). The same technology can be used on collect limit operator too. ## How was this patch tested? Add a benchmark for collect limit to `BenchmarkWholeStageCodegen`. Without this patch: model name : Westmere E56xx/L56xx/X56xx (Nehalem-C) collect limit: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative --- collect limit 1 million 3413 / 3768 0.3 3255.0 1.0X collect limit 2 millions9728 / 10440 0.1 9277.3 0.4X With this patch: model name : Westmere E56xx/L56xx/X56xx (Nehalem-C) collect limit: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative --- collect limit 1 million 833 / 1284 1.3 794.4 1.0X collect limit 2 millions 3348 / 4005 0.3 3193.3 0.2X You can merge this pull request into a Git repository by running: $ git pull https://github.com/viirya/spark-1 execute-take Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/11759.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #11759 commit 16ac5fe6325c2cf7638faab854c09dc50de94f18 Author: Liang-Chi HsiehDate: 2016-03-16T08:41:36Z init import. commit d1306ade2efa407ce0785650d08aef4129aab2c3 Author: Liang-Chi Hsieh Date: 2016-03-16T09:06:35Z Use fast serialization on collect limit too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org