Repository: spark Updated Branches: refs/heads/master 096df6d93 -> 12bf83240
[SPARK-19796][CORE] Fix serialization of long property values in TaskDescription ## What changes were proposed in this pull request? The properties that are serialized with a TaskDescription can have very long values (eg. "spark.job.description" which is set to the full sql statement with the thrift-server). DataOutputStream.writeUTF() does not work well for long strings, so this changes the way those values are serialized to handle longer strings. ## How was this patch tested? Updated existing unit test to reproduce the issue. All unit tests via jenkins. Author: Imran Rashid <iras...@cloudera.com> Closes #17140 from squito/SPARK-19796. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/12bf8324 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/12bf8324 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/12bf8324 Branch: refs/heads/master Commit: 12bf832407eaaed90d7c599522457cb36b303b6c Parents: 096df6d Author: Imran Rashid <iras...@cloudera.com> Authored: Mon Mar 6 14:06:11 2017 -0600 Committer: Imran Rashid <iras...@cloudera.com> Committed: Mon Mar 6 14:06:11 2017 -0600 ---------------------------------------------------------------------- .../apache/spark/scheduler/TaskDescription.scala | 12 ++++++++++-- .../spark/scheduler/TaskDescriptionSuite.scala | 16 ++++++++++++++++ 2 files changed, 26 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/12bf8324/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala index 78aa5c4..c98b871 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala @@ -19,6 +19,7 @@ package org.apache.spark.scheduler import java.io.{DataInputStream, DataOutputStream} import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets import java.util.Properties import scala.collection.JavaConverters._ @@ -86,7 +87,10 @@ private[spark] object TaskDescription { dataOut.writeInt(taskDescription.properties.size()) taskDescription.properties.asScala.foreach { case (key, value) => dataOut.writeUTF(key) - dataOut.writeUTF(value) + // SPARK-19796 -- writeUTF doesn't work for long strings, which can happen for property values + val bytes = value.getBytes(StandardCharsets.UTF_8) + dataOut.writeInt(bytes.length) + dataOut.write(bytes) } // Write the task. The task is already serialized, so write it directly to the byte buffer. @@ -124,7 +128,11 @@ private[spark] object TaskDescription { val properties = new Properties() val numProperties = dataIn.readInt() for (i <- 0 until numProperties) { - properties.setProperty(dataIn.readUTF(), dataIn.readUTF()) + val key = dataIn.readUTF() + val valueLength = dataIn.readInt() + val valueBytes = new Array[Byte](valueLength) + dataIn.readFully(valueBytes) + properties.setProperty(key, new String(valueBytes, StandardCharsets.UTF_8)) } // Create a sub-buffer for the serialized task into its own buffer (to be deserialized later). http://git-wip-us.apache.org/repos/asf/spark/blob/12bf8324/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala index 9f1fe05..97487ce 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler +import java.io.{ByteArrayOutputStream, DataOutputStream, UTFDataFormatException} import java.nio.ByteBuffer import java.util.Properties @@ -36,6 +37,21 @@ class TaskDescriptionSuite extends SparkFunSuite { val originalProperties = new Properties() originalProperties.put("property1", "18") originalProperties.put("property2", "test value") + // SPARK-19796 -- large property values (like a large job description for a long sql query) + // can cause problems for DataOutputStream, make sure we handle correctly + val sb = new StringBuilder() + (0 to 10000).foreach(_ => sb.append("1234567890")) + val largeString = sb.toString() + originalProperties.put("property3", largeString) + // make sure we've got a good test case + intercept[UTFDataFormatException] { + val out = new DataOutputStream(new ByteArrayOutputStream()) + try { + out.writeUTF(largeString) + } finally { + out.close() + } + } // Create a dummy byte buffer for the task. val taskBuffer = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org