Repository: spark
Updated Branches:
  refs/heads/master 096df6d93 -> 12bf83240


[SPARK-19796][CORE] Fix serialization of long property values in TaskDescription

## What changes were proposed in this pull request?

The properties that are serialized with a TaskDescription can have very long 
values (eg. "spark.job.description" which is set to the full sql statement with 
the thrift-server).  DataOutputStream.writeUTF() does not work well for long 
strings, so this changes the way those values are serialized to handle longer 
strings.

## How was this patch tested?

Updated existing unit test to reproduce the issue.  All unit tests via jenkins.

Author: Imran Rashid <iras...@cloudera.com>

Closes #17140 from squito/SPARK-19796.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/12bf8324
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/12bf8324
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/12bf8324

Branch: refs/heads/master
Commit: 12bf832407eaaed90d7c599522457cb36b303b6c
Parents: 096df6d
Author: Imran Rashid <iras...@cloudera.com>
Authored: Mon Mar 6 14:06:11 2017 -0600
Committer: Imran Rashid <iras...@cloudera.com>
Committed: Mon Mar 6 14:06:11 2017 -0600

----------------------------------------------------------------------
 .../apache/spark/scheduler/TaskDescription.scala    | 12 ++++++++++--
 .../spark/scheduler/TaskDescriptionSuite.scala      | 16 ++++++++++++++++
 2 files changed, 26 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/12bf8324/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
index 78aa5c4..c98b871 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler
 
 import java.io.{DataInputStream, DataOutputStream}
 import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
 import java.util.Properties
 
 import scala.collection.JavaConverters._
@@ -86,7 +87,10 @@ private[spark] object TaskDescription {
     dataOut.writeInt(taskDescription.properties.size())
     taskDescription.properties.asScala.foreach { case (key, value) =>
       dataOut.writeUTF(key)
-      dataOut.writeUTF(value)
+      // SPARK-19796 -- writeUTF doesn't work for long strings, which can 
happen for property values
+      val bytes = value.getBytes(StandardCharsets.UTF_8)
+      dataOut.writeInt(bytes.length)
+      dataOut.write(bytes)
     }
 
     // Write the task. The task is already serialized, so write it directly to 
the byte buffer.
@@ -124,7 +128,11 @@ private[spark] object TaskDescription {
     val properties = new Properties()
     val numProperties = dataIn.readInt()
     for (i <- 0 until numProperties) {
-      properties.setProperty(dataIn.readUTF(), dataIn.readUTF())
+      val key = dataIn.readUTF()
+      val valueLength = dataIn.readInt()
+      val valueBytes = new Array[Byte](valueLength)
+      dataIn.readFully(valueBytes)
+      properties.setProperty(key, new String(valueBytes, 
StandardCharsets.UTF_8))
     }
 
     // Create a sub-buffer for the serialized task into its own buffer (to be 
deserialized later).

http://git-wip-us.apache.org/repos/asf/spark/blob/12bf8324/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala
index 9f1fe05..97487ce 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.scheduler
 
+import java.io.{ByteArrayOutputStream, DataOutputStream, 
UTFDataFormatException}
 import java.nio.ByteBuffer
 import java.util.Properties
 
@@ -36,6 +37,21 @@ class TaskDescriptionSuite extends SparkFunSuite {
     val originalProperties = new Properties()
     originalProperties.put("property1", "18")
     originalProperties.put("property2", "test value")
+    // SPARK-19796 -- large property values (like a large job description for 
a long sql query)
+    // can cause problems for DataOutputStream, make sure we handle correctly
+    val sb = new StringBuilder()
+    (0 to 10000).foreach(_ => sb.append("1234567890"))
+    val largeString = sb.toString()
+    originalProperties.put("property3", largeString)
+    // make sure we've got a good test case
+    intercept[UTFDataFormatException] {
+      val out = new DataOutputStream(new ByteArrayOutputStream())
+      try {
+        out.writeUTF(largeString)
+      } finally {
+        out.close()
+      }
+    }
 
     // Create a dummy byte buffer for the task.
     val taskBuffer = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to