Repository: spark
Updated Branches:
  refs/heads/master 470881b24 -> 96136f222


[SPARK-3797] Minor addendum to Yarn shuffle service

I did not realize there was a `network.util.JavaUtils` when I wrote this code. 
This PR moves the `ByteBuffer` string conversion to the appropriate place. I 
tested the changes on a stable yarn cluster.

Author: Andrew Or <and...@databricks.com>

Closes #3144 from andrewor14/yarn-shuffle-util and squashes the following 
commits:

b6c08bf [Andrew Or] Remove unused import
94e205c [Andrew Or] Use netty Unpooled
85202a5 [Andrew Or] Use guava Charsets
057135b [Andrew Or] Reword comment
adf186d [Andrew Or] Move byte buffer String conversion logic to JavaUtils


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/96136f22
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/96136f22
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/96136f22

Branch: refs/heads/master
Commit: 96136f222abd4f3abd10cb78a4ebecdb21f3bde7
Parents: 470881b
Author: Andrew Or <and...@databricks.com>
Authored: Thu Nov 6 17:18:49 2014 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Thu Nov 6 17:18:49 2014 -0800

----------------------------------------------------------------------
 .../apache/spark/network/util/JavaUtils.java    | 20 ++++++++++++++++
 .../network/sasl/ShuffleSecretManager.java      | 24 ++------------------
 .../spark/deploy/yarn/ExecutorRunnable.scala    |  5 ++--
 .../spark/deploy/yarn/ExecutorRunnable.scala    |  5 ++--
 4 files changed, 28 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/96136f22/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java 
b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
index 40b71b0..2856d1c 100644
--- a/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
+++ b/network/common/src/main/java/org/apache/spark/network/util/JavaUtils.java
@@ -17,6 +17,8 @@
 
 package org.apache.spark.network.util;
 
+import java.nio.ByteBuffer;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
@@ -25,6 +27,8 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 
 import com.google.common.io.Closeables;
+import com.google.common.base.Charsets;
+import io.netty.buffer.Unpooled;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,4 +77,20 @@ public class JavaUtils {
     int hash = obj.hashCode();
     return hash != Integer.MIN_VALUE ? Math.abs(hash) : 0;
   }
+
+  /**
+   * Convert the given string to a byte buffer. The resulting buffer can be
+   * converted back to the same string through {@link 
#bytesToString(ByteBuffer)}.
+   */
+  public static ByteBuffer stringToBytes(String s) {
+    return Unpooled.wrappedBuffer(s.getBytes(Charsets.UTF_8)).nioBuffer();
+  }
+
+  /**
+   * Convert the given byte buffer to a string. The resulting string can be
+   * converted back to the same byte buffer through {@link 
#stringToBytes(String)}.
+   */
+  public static String bytesToString(ByteBuffer b) {
+    return Unpooled.wrappedBuffer(b).toString(Charsets.UTF_8);
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/96136f22/network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java
----------------------------------------------------------------------
diff --git 
a/network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java
 
b/network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java
index e66c4af..351c793 100644
--- 
a/network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java
+++ 
b/network/shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java
@@ -19,13 +19,13 @@ package org.apache.spark.network.sasl;
 
 import java.lang.Override;
 import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.spark.network.sasl.SecretKeyHolder;
+import org.apache.spark.network.util.JavaUtils;
 
 /**
  * A class that manages shuffle secret used by the external shuffle service.
@@ -34,30 +34,10 @@ public class ShuffleSecretManager implements 
SecretKeyHolder {
   private final Logger logger = 
LoggerFactory.getLogger(ShuffleSecretManager.class);
   private final ConcurrentHashMap<String, String> shuffleSecretMap;
 
-  private static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
-
   // Spark user used for authenticating SASL connections
   // Note that this must match the value in org.apache.spark.SecurityManager
   private static final String SPARK_SASL_USER = "sparkSaslUser";
 
-  /**
-   * Convert the given string to a byte buffer. The resulting buffer can be 
converted back to
-   * the same string through {@link #bytesToString(ByteBuffer)}. This is used 
if the external
-   * shuffle service represents shuffle secrets as bytes buffers instead of 
strings.
-   */
-  public static ByteBuffer stringToBytes(String s) {
-    return ByteBuffer.wrap(s.getBytes(UTF8_CHARSET));
-  }
-
-  /**
-   * Convert the given byte buffer to a string. The resulting string can be 
converted back to
-   * the same byte buffer through {@link #stringToBytes(String)}. This is used 
if the external
-   * shuffle service represents shuffle secrets as bytes buffers instead of 
strings.
-   */
-  public static String bytesToString(ByteBuffer b) {
-    return new String(b.array(), UTF8_CHARSET);
-  }
-
   public ShuffleSecretManager() {
     shuffleSecretMap = new ConcurrentHashMap<String, String>();
   }
@@ -80,7 +60,7 @@ public class ShuffleSecretManager implements SecretKeyHolder {
    * Register an application with its secret specified as a byte buffer.
    */
   public void registerApp(String appId, ByteBuffer shuffleSecret) {
-    registerApp(appId, bytesToString(shuffleSecret));
+    registerApp(appId, JavaUtils.bytesToString(shuffleSecret));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/96136f22/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
----------------------------------------------------------------------
diff --git 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 5f47c79..7023a11 100644
--- 
a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ 
b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
 
 import org.apache.spark.{SecurityManager, SparkConf, Logging}
-import org.apache.spark.network.sasl.ShuffleSecretManager
+import org.apache.spark.network.util.JavaUtils
 
 @deprecated("use yarn/stable", "1.2.0")
 class ExecutorRunnable(
@@ -98,7 +98,8 @@ class ExecutorRunnable(
       val secretString = securityMgr.getSecretKey()
       val secretBytes =
         if (secretString != null) {
-          ShuffleSecretManager.stringToBytes(secretString)
+          // This conversion must match how the YarnShuffleService decodes our 
secret
+          JavaUtils.stringToBytes(secretString)
         } else {
           // Authentication is not enabled, so just provide dummy metadata
           ByteBuffer.allocate(0)

http://git-wip-us.apache.org/repos/asf/spark/blob/96136f22/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
----------------------------------------------------------------------
diff --git 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 18f48b4..fdd3c23 100644
--- 
a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ 
b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
 
 import org.apache.spark.{SecurityManager, SparkConf, Logging}
-import org.apache.spark.network.sasl.ShuffleSecretManager
+import org.apache.spark.network.util.JavaUtils
 
 
 class ExecutorRunnable(
@@ -97,7 +97,8 @@ class ExecutorRunnable(
       val secretString = securityMgr.getSecretKey()
       val secretBytes =
         if (secretString != null) {
-          ShuffleSecretManager.stringToBytes(secretString)
+          // This conversion must match how the YarnShuffleService decodes our 
secret
+          JavaUtils.stringToBytes(secretString)
         } else {
           // Authentication is not enabled, so just provide dummy metadata
           ByteBuffer.allocate(0)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to