[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-30 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/8880


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-29 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r76658383
  
--- Diff: 
yarn/src/test/scala/org/apache/spark/deploy/yarn/IOEncryptionSuite.scala ---
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.yarn
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.security.PrivilegedExceptionAction
+import java.util.UUID
+
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.mockito.MockitoAnnotations
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Matchers}
+
+import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.config._
+import org.apache.spark.serializer._
+import org.apache.spark.storage._
+
+class IOEncryptionSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll
+  with BeforeAndAfterEach {
+  private[this] val blockId = new TempShuffleBlockId(UUID.randomUUID())
+  private[this] val conf = new SparkConf()
+  private[this] val ugi = 
UserGroupInformation.createUserForTesting("testuser", Array("testgroup"))
+  private[this] val serializer = new KryoSerializer(conf)
+
+  override def beforeAll(): Unit = {
+System.setProperty("SPARK_YARN_MODE", "true")
+ugi.doAs(new PrivilegedExceptionAction[Unit]() {
+  override def run(): Unit = {
+conf.set(IO_ENCRYPTION_ENABLED, true)
+val creds = new Credentials()
+SecurityManager.initIOEncryptionKey(conf, creds)
+SparkHadoopUtil.get.addCurrentUserCredentials(creds)
+  }
+})
+  }
+
+  override def afterAll(): Unit = {
+SparkEnv.set(null)
+System.clearProperty("SPARK_YARN_MODE")
+  }
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+MockitoAnnotations.initMocks(this)
--- End diff --

You don't need this anymore, do you?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-29 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r76658252
  
--- Diff: 
yarn/src/test/scala/org/apache/spark/deploy/yarn/IOEncryptionSuite.scala ---
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.yarn
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.security.PrivilegedExceptionAction
+import java.util.UUID
+
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.mockito.MockitoAnnotations
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Matchers}
+
+import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.config._
+import org.apache.spark.serializer._
+import org.apache.spark.storage._
+
+class IOEncryptionSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll
+  with BeforeAndAfterEach {
+  private[this] val blockId = new TempShuffleBlockId(UUID.randomUUID())
+  private[this] val conf = new SparkConf()
+  private[this] val ugi = 
UserGroupInformation.createUserForTesting("testuser", Array("testgroup"))
+  private[this] val serializer = new KryoSerializer(conf)
+
+  override def beforeAll(): Unit = {
+System.setProperty("SPARK_YARN_MODE", "true")
+ugi.doAs(new PrivilegedExceptionAction[Unit]() {
+  override def run(): Unit = {
+conf.set(IO_ENCRYPTION_ENABLED, true)
+val creds = new Credentials()
+SecurityManager.initIOEncryptionKey(conf, creds)
+SparkHadoopUtil.get.addCurrentUserCredentials(creds)
+  }
+})
+  }
+
+  override def afterAll(): Unit = {
+SparkEnv.set(null)
+System.clearProperty("SPARK_YARN_MODE")
+  }
+
+  override def beforeEach(): Unit = {
+super.beforeEach()
+MockitoAnnotations.initMocks(this)
+  }
+
+  override def afterEach(): Unit = {
+super.afterEach()
+conf.set("spark.shuffle.compress", false.toString)
+conf.set("spark.shuffle.spill.compress", false.toString)
+  }
+
+  test("IO encryption read and write") {
+ugi.doAs(new PrivilegedExceptionAction[Unit] {
+  override def run(): Unit = {
+conf.set(IO_ENCRYPTION_ENABLED, true)
+conf.set("spark.shuffle.compress", false.toString)
+conf.set("spark.shuffle.spill.compress", false.toString)
+testYarnIOEncryptionWriteRead()
+  }
+})
+  }
+
+  test("IO encryption read and write with shuffle compression enabled") {
+ugi.doAs(new PrivilegedExceptionAction[Unit] {
+  override def run(): Unit = {
+conf.set(IO_ENCRYPTION_ENABLED, true)
+conf.set("spark.shuffle.compress", true.toString)
+conf.set("spark.shuffle.spill.compress", true.toString)
+testYarnIOEncryptionWriteRead()
+  }
+})
+  }
+
+  private[this] def testYarnIOEncryptionWriteRead(): Unit = {
+val plainStr = "hello world"
+val outputStream = new ByteArrayOutputStream()
+val serializerManager = new SerializerManager(serializer, conf)
+val wrappedOutputStream = serializerManager.wrapStream(blockId, 
outputStream)
+wrappedOutputStream.write(plainStr.getBytes(StandardCharsets.UTF_8))
+wrappedOutputStream.close()
+
+val encryptedBytes = outputStream.toByteArray
+val encryptedStr = new String(encryptedBytes)
+assert (plainStr !==  encryptedStr)
--- End diff --

nit: no space before `(`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-

[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r76452983
  
--- Diff: 
yarn/src/test/scala/org/apache/spark/deploy/yarn/IOEncryptionSuite.scala ---
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.yarn
+
+import java.io._
+import java.nio.ByteBuffer
+import java.security.PrivilegedExceptionAction
+import java.util.{ArrayList => JArrayList, LinkedList => JLinkedList, UUID}
+
+import com.google.common.collect.HashMultiset
+import com.google.common.io.ByteStreams
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.junit.Assert.assertEquals
+import org.mockito.{Mock, MockitoAnnotations}
+import org.mockito.Answers.RETURNS_SMART_NULLS
+import org.mockito.Matchers.{eq => meq, _}
+import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Matchers}
+import scala.runtime.AbstractFunction1
+
+import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager}
+import org.apache.spark.network.buffer.NioManagedBuffer
+import org.apache.spark.network.util.LimitedInputStream
+import org.apache.spark.security.CryptoStreamUtils
+import org.apache.spark.serializer._
+import org.apache.spark.shuffle._
+import org.apache.spark.shuffle.sort.{SerializedShuffleHandle, 
UnsafeShuffleWriter}
+import org.apache.spark.storage._
+import org.apache.spark.util.Utils
+
+class IOEncryptionSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterAll
+  with BeforeAndAfterEach {
+  @Mock(answer = RETURNS_SMART_NULLS) private[this] var blockManager: 
BlockManager = _
+  @Mock(answer = RETURNS_SMART_NULLS) private[this] var blockResolver: 
IndexShuffleBlockResolver = _
+  @Mock(answer = RETURNS_SMART_NULLS) private[this] var diskBlockManager: 
DiskBlockManager = _
+  @Mock(answer = RETURNS_SMART_NULLS) private[this] var serializerManager: 
SerializerManager = _
--- End diff --

Hmm... do you need to mock this? Wouldn't it be better to test the actual 
code in SerializerManager that creates the streams, instead of the mocked code 
in the tests?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r76452481
  
--- Diff: docs/configuration.md ---
@@ -559,6 +559,39 @@ Apart from these, the following properties are also 
available, and may be useful
 spark.io.compression.codec.
   
 
+
+  spark.io.encryption.enabled
+  false
+  
+Enable IO encryption. It only supports YARN mode.
+  
+
+
+  spark.io.encryption.keySizeBits
+  128
+  
+IO encryption key size in bits. The valid number includes 128, 192 and 
256.
+  
+
+
+  spark.io.encryption.keygen.algorithm
+  HmacSHA1
+  
+The algorithm to generate the key used by IO encryption. The supported 
algorithms are
+described in the KeyGenerator section of the Java Cryptography 
Architecture Standard Algorithm
+Name Documentation.
+  
+
+
+  spark.io.crypto.cipher.transformation
+  AES/CTR/NoPadding
+  
+Cipher transformation for IO encryption. The cipher transformation 
name is identical to the
+transformations described in the Cipher section of the Java 
Cryptography Architecture
+Standard Algorithm Name Documentation. Currently only 
"AES/CTR/NoPadding" algorithm is
--- End diff --

If only a single value is supported, maybe just not mention this in the 
documentation (and mark the config as `.internal()` in the code).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r76452400
  
--- Diff: docs/configuration.md ---
@@ -559,6 +559,39 @@ Apart from these, the following properties are also 
available, and may be useful
 spark.io.compression.codec.
   
 
+
+  spark.io.encryption.enabled
+  false
+  
+Enable IO encryption. It only supports YARN mode.
+  
+
+
+  spark.io.encryption.keySizeBits
+  128
+  
+IO encryption key size in bits. The valid number includes 128, 192 and 
256.
+  
+
+
+  spark.io.encryption.keygen.algorithm
+  HmacSHA1
+  
+The algorithm to generate the key used by IO encryption. The supported 
algorithms are
--- End diff --

"The algorithm to use when generating the IO encryption key."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r76452285
  
--- Diff: docs/configuration.md ---
@@ -559,6 +559,39 @@ Apart from these, the following properties are also 
available, and may be useful
 spark.io.compression.codec.
   
 
+
+  spark.io.encryption.enabled
+  false
+  
+Enable IO encryption. It only supports YARN mode.
+  
+
+
+  spark.io.encryption.keySizeBits
+  128
+  
+IO encryption key size in bits. The valid number includes 128, 192 and 
256.
--- End diff --

"Supported values are x, y and z."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r76452246
  
--- Diff: docs/configuration.md ---
@@ -559,6 +559,39 @@ Apart from these, the following properties are also 
available, and may be useful
 spark.io.compression.codec.
   
 
+
+  spark.io.encryption.enabled
+  false
+  
+Enable IO encryption. It only supports YARN mode.
--- End diff --

"Only supported in YARN mode."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r76334586
  
--- Diff: 
yarn/src/test/scala/org/apache/spark/security/IOEncryptionSuite.scala ---
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.security
--- End diff --

Ah, good point. It should be possible to move the needed functionality to 
`SparkHadoopUtil` now that we dropped Hadoop 1.x support; but that's probably 
better done in a separate change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-25 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r76334131
  
--- Diff: 
yarn/src/test/scala/org/apache/spark/security/IOEncryptionSuite.scala ---
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.security
--- End diff --

I think this test requires YarnSparkHadoopUtil so it cannot be in core.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-25 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r76332258
  
--- Diff: docs/configuration.md ---
@@ -559,6 +559,39 @@ Apart from these, the following properties are also 
available, and may be useful
 spark.io.compression.codec.
   
 
+
+  spark.io.encryption.enabled
+  false
+  
+Enable IO encryption.
--- End diff --

Please say it only supports Yarn mode here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-25 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r76328447
  
--- Diff: 
core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.security
+
+import java.io.{InputStream, OutputStream}
+import java.util.Properties
+import javax.crypto.spec.{IvParameterSpec, SecretKeySpec}
+
+import org.apache.commons.crypto.random._
+import org.apache.commons.crypto.stream._
+import org.apache.hadoop.io.Text
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.config._
+
+/**
+ * A util class for manipulating IO encryption and decryption streams.
+ */
+private[spark] object CryptoStreamUtils {
+  /**
+   * Constants and variables for spark IO encryption
+   */
+  val SPARK_IO_TOKEN = new Text("SPARK_IO_TOKEN")
+
+  // The initialization vector length in bytes.
+  val IV_LENGTH_IN_BYTES = 16
+  // The prefix of IO encryption related configurations in Spark 
configuration.
+  val SPARK_IO_ENCRYPTION_COMMONS_CONFIG_PREFIX = 
"spark.io.encryption.commons.config."
+  // The prefix for the configurations passing to Apache Commons Crypto 
library.
+  val COMMONS_CRYPTO_CONF_PREFIX = "commons.crypto."
+
+  /**
+   * Helper method to wrap [[OutputStream]] with [[CryptoOutputStream]] 
for encryption.
+   */
+  def createCryptoOutputStream(
+  os: OutputStream,
+  sparkConf: SparkConf): OutputStream = {
+val properties = toCryptoConf(sparkConf, 
SPARK_IO_ENCRYPTION_COMMONS_CONFIG_PREFIX,
+  COMMONS_CRYPTO_CONF_PREFIX)
+val iv = createInitializationVector(properties)
+os.write(iv)
+val credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
+val key = credentials.getSecretKey(SPARK_IO_TOKEN)
+val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION)
+new CryptoOutputStream(transformationStr, properties, os,
+  new SecretKeySpec(key, "AES"), new IvParameterSpec(iv))
+  }
+
+  /**
+   * Helper method to wrap [[InputStream]] with [[CryptoInputStream]] for 
decryption.
+   */
+  def createCryptoInputStream(
+  is: InputStream,
+  sparkConf: SparkConf): InputStream = {
+val properties = toCryptoConf(sparkConf, 
SPARK_IO_ENCRYPTION_COMMONS_CONFIG_PREFIX,
+  COMMONS_CRYPTO_CONF_PREFIX)
+val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
+is.read(iv, 0, iv.length)
+val credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
+val key = credentials.getSecretKey(SPARK_IO_TOKEN)
+val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION)
+new CryptoInputStream(transformationStr, properties, is,
+  new SecretKeySpec(key, "AES"), new IvParameterSpec(iv))
+  }
+
+  /**
+   * Get Commons-crypto configurations from Spark configurations 
identified by prefix.
+   */
+  def toCryptoConf(
+  conf: SparkConf,
+  sparkPrefix: String,
+  cryptoPrefix: String): Properties = {
+val props = new Properties()
+conf.getAll.foreach { case (k, v) =>
+  if (k.startsWith(sparkPrefix)) {
+props.put(COMMONS_CRYPTO_CONF_PREFIX + k.substring(
+  SPARK_IO_ENCRYPTION_COMMONS_CONFIG_PREFIX.length()), v)
+  }
+}
+props
+  }
+
+  /**
+   * This method to generate an IV (Initialization Vector) using secure 
random.
+   */
+  private[this] def createInitializationVector(properties: Properties): 
Array[Byte] = {
+val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
+CryptoRandomFactory.getCryptoRandom(properties).nextBytes(iv)
--- End diff --

nit: could you add a warning log when this line is too slow (e.g., more 
than 2 seconds)? Sometimes, it may take several seconds to collect enough 

[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r76323741
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala ---
@@ -103,16 +108,44 @@ private[spark] class 
SerializerManager(defaultSerializer: Serializer, conf: Spar
   }
 
   /**
+   * Wrap an input stream for encryption and compression
+   */
+  def wrapStream(blockId: BlockId, s: InputStream): InputStream = {
+wrapForCompression(blockId, wrapForEncryption(s))
+  }
+
+  /**
+   * Wrap an output stream for encryption and compression
+   */
+  def wrapStream(blockId: BlockId, s: OutputStream): OutputStream = {
+wrapForEncryption(wrapForCompression(blockId, s))
--- End diff --

Yeah this seems to have gotten inverted at some point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-25 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r76323295
  
--- Diff: 
core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---
@@ -522,8 +521,9 @@ private[spark] class ExternalSorter[K, V, C](
   ", batchOffsets = " + batchOffsets.mkString("[", ", ", "]"))
 
 val bufferedStream = new 
BufferedInputStream(ByteStreams.limit(fileStream, end - start))
-val compressedStream = 
serializerManager.wrapForCompression(spill.blockId, bufferedStream)
-serInstance.deserializeStream(compressedStream)
+
+val wrappedStream = 
SparkEnv.get.serializerManager.wrapStream(spill.blockId, bufferedStream)
--- End diff --

nit: `SparkEnv.get.serializerManager` -> `serializerManager`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-25 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r76323076
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala ---
@@ -103,16 +108,44 @@ private[spark] class 
SerializerManager(defaultSerializer: Serializer, conf: Spar
   }
 
   /**
+   * Wrap an input stream for encryption and compression
+   */
+  def wrapStream(blockId: BlockId, s: InputStream): InputStream = {
+wrapForCompression(blockId, wrapForEncryption(s))
+  }
+
+  /**
+   * Wrap an output stream for encryption and compression
+   */
+  def wrapStream(blockId: BlockId, s: OutputStream): OutputStream = {
+wrapForEncryption(wrapForCompression(blockId, s))
--- End diff --

why not compress first then encrypt? The performance will be better since 
it encrypts less data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-25 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r76318724
  
--- Diff: 
core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.security
+
+import java.io.{InputStream, OutputStream}
+import java.util.Properties
+import javax.crypto.spec.{IvParameterSpec, SecretKeySpec}
+
+import org.apache.commons.crypto.random._
+import org.apache.commons.crypto.stream._
+import org.apache.hadoop.io.Text
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.config._
+
+/**
+ * A util class for manipulating IO encryption and decryption streams.
+ */
+private[spark] object CryptoStreamUtils {
+  /**
+   * Constants and variables for spark IO encryption
+   */
+  val SPARK_IO_TOKEN = new Text("SPARK_IO_TOKEN")
+
+  // The initialization vector length in bytes.
+  val IV_LENGTH_IN_BYTES = 16
+  // The prefix of IO encryption related configurations in Spark 
configuration.
+  val SPARK_IO_ENCRYPTION_COMMONS_CONFIG_PREFIX = 
"spark.io.encryption.commons.config."
+  // The prefix for the configurations passing to Apache Commons Crypto 
library.
+  val COMMONS_CRYPTO_CONF_PREFIX = "commons.crypto."
+
+  /**
+   * Helper method to wrap [[OutputStream]] with [[CryptoOutputStream]] 
for encryption.
+   */
+  def createCryptoOutputStream(
+  os: OutputStream,
+  sparkConf: SparkConf): OutputStream = {
+val properties = toCryptoConf(sparkConf, 
SPARK_IO_ENCRYPTION_COMMONS_CONFIG_PREFIX,
+  COMMONS_CRYPTO_CONF_PREFIX)
+val iv = createInitializationVector(properties)
+os.write(iv)
+val credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
+val key = credentials.getSecretKey(SPARK_IO_TOKEN)
+val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION)
+new CryptoOutputStream(transformationStr, properties, os,
+  new SecretKeySpec(key, "AES"), new IvParameterSpec(iv))
+  }
+
+  /**
+   * Helper method to wrap [[InputStream]] with [[CryptoInputStream]] for 
decryption.
+   */
+  def createCryptoInputStream(
+  is: InputStream,
+  sparkConf: SparkConf): InputStream = {
+val properties = toCryptoConf(sparkConf, 
SPARK_IO_ENCRYPTION_COMMONS_CONFIG_PREFIX,
+  COMMONS_CRYPTO_CONF_PREFIX)
+val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
+is.read(iv, 0, iv.length)
+val credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
+val key = credentials.getSecretKey(SPARK_IO_TOKEN)
+val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION)
+new CryptoInputStream(transformationStr, properties, is,
+  new SecretKeySpec(key, "AES"), new IvParameterSpec(iv))
+  }
+
+  /**
+   * Get Commons-crypto configurations from Spark configurations 
identified by prefix.
+   */
+  def toCryptoConf(
+  conf: SparkConf,
+  sparkPrefix: String,
+  cryptoPrefix: String): Properties = {
--- End diff --

nit: you don't need `sparkPrefix` and `cryptoPrefix` any more.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r76298874
  
--- Diff: 
yarn/src/test/scala/org/apache/spark/security/IOEncryptionSuite.scala ---
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.security
--- End diff --

This is still in the "yarn" module. Weren't you going to move it to "core"? 
(As in the physical location of the file, not the scala package name.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r76298505
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -413,6 +414,10 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
 }
 
 if (master == "yarn" && deployMode == "client") 
System.setProperty("SPARK_YARN_MODE", "true")
+if (_conf.get(IO_ENCRYPTION_ENABLED) && 
!SparkHadoopUtil.get.isYarnMode()) {
+  throw new SparkException("IO encryption is only supported in YARN 
mode, please disable it " +
+"by setting spark.io.encryption.enabled to false")
--- End diff --

nit: use `${IO_ENCRYPTION_ENABLED.key}` instead of the hardcoded key name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r76289131
  
--- Diff: 
yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnIOEncryptionSuite.scala ---
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.yarn
+
+import java.io._
+import java.nio.ByteBuffer
+import java.security.PrivilegedExceptionAction
+import java.util.{ArrayList => JArrayList, LinkedList => JLinkedList, UUID}
+
+import scala.runtime.AbstractFunction1
+
+import com.google.common.collect.HashMultiset
+import com.google.common.io.ByteStreams
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.junit.Assert.assertEquals
+import org.mockito.Mock
+import org.mockito.MockitoAnnotations
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.mockito.Answers.RETURNS_SMART_NULLS
+import org.mockito.Matchers.{eq => meq, _}
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Matchers}
+
+import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager}
+import org.apache.spark.network.buffer.NioManagedBuffer
+import org.apache.spark.network.util.LimitedInputStream
+import org.apache.spark.security.CryptoStreamUtils
+import org.apache.spark.serializer._
+import org.apache.spark.shuffle._
+import org.apache.spark.shuffle.sort.{SerializedShuffleHandle, 
UnsafeShuffleWriter}
+import org.apache.spark.storage._
+import org.apache.spark.util.Utils
+
+private[spark] class YarnIOEncryptionSuite extends SparkFunSuite with 
Matchers with
--- End diff --

> Do you mean we need not unset this ENV variable in the tear down block?

I mean that if you change the check in SparkContext to not throw an 
exception when "spark.testing" is set, you shouldn't need to set/unset 
"SPARK_YARN_MODE" in the test.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-24 Thread winningsix
Github user winningsix commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r76175174
  
--- Diff: 
yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnIOEncryptionSuite.scala ---
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.yarn
+
+import java.io._
+import java.nio.ByteBuffer
+import java.security.PrivilegedExceptionAction
+import java.util.{ArrayList => JArrayList, LinkedList => JLinkedList, UUID}
+
+import scala.runtime.AbstractFunction1
+
+import com.google.common.collect.HashMultiset
+import com.google.common.io.ByteStreams
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.junit.Assert.assertEquals
+import org.mockito.Mock
+import org.mockito.MockitoAnnotations
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.mockito.Answers.RETURNS_SMART_NULLS
+import org.mockito.Matchers.{eq => meq, _}
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Matchers}
+
+import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager}
+import org.apache.spark.network.buffer.NioManagedBuffer
+import org.apache.spark.network.util.LimitedInputStream
+import org.apache.spark.security.CryptoStreamUtils
+import org.apache.spark.serializer._
+import org.apache.spark.shuffle._
+import org.apache.spark.shuffle.sort.{SerializedShuffleHandle, 
UnsafeShuffleWriter}
+import org.apache.spark.storage._
+import org.apache.spark.util.Utils
+
+private[spark] class YarnIOEncryptionSuite extends SparkFunSuite with 
Matchers with
--- End diff --

> Could it be moved to core where the rest of the code lives?

Cool, it's a good place to put this test since it's closer to other related 
code.

> If it can be moved to core, you can even relax the check in SparkContext 
to check for "spark.testing", which is set by the root pom file / sbt script,

Yes, this test will benefit if it has some needs to create 
```SparkContext``` in the future.

>  not worry about "SPARK_YARN_MODE".

Do you mean we need not unset this ENV variable in the tear down block?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-24 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r76102771
  
--- Diff: 
yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnIOEncryptionSuite.scala ---
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.yarn
+
+import java.io._
+import java.nio.ByteBuffer
+import java.security.PrivilegedExceptionAction
+import java.util.{ArrayList => JArrayList, LinkedList => JLinkedList, UUID}
+
+import scala.runtime.AbstractFunction1
+
+import com.google.common.collect.HashMultiset
+import com.google.common.io.ByteStreams
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.junit.Assert.assertEquals
+import org.mockito.Mock
+import org.mockito.MockitoAnnotations
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.mockito.Answers.RETURNS_SMART_NULLS
+import org.mockito.Matchers.{eq => meq, _}
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Matchers}
+
+import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager}
+import org.apache.spark.network.buffer.NioManagedBuffer
+import org.apache.spark.network.util.LimitedInputStream
+import org.apache.spark.security.CryptoStreamUtils
+import org.apache.spark.serializer._
+import org.apache.spark.shuffle._
+import org.apache.spark.shuffle.sort.{SerializedShuffleHandle, 
UnsafeShuffleWriter}
+import org.apache.spark.storage._
+import org.apache.spark.util.Utils
+
+private[spark] class YarnIOEncryptionSuite extends SparkFunSuite with 
Matchers with
--- End diff --

Same thing re: `private[spark]`.

A question about this, though: I know the functionality requires YARN at 
runtime because of needing support for propagating the user credentials, but 
this test doesn't seem to need any YARN code (except for setting 
"SPARK_YARN_MODE" before running the test). Could it be moved to core where the 
rest of the code lives?

If it can be moved to core, you can even relax the check in SparkContext to 
check for "spark.testing", which is set by the root pom file / sbt script, and 
not worry about "SPARK_YARN_MODE".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-24 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r76102189
  
--- Diff: 
yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnIOEncryptionSuite.scala ---
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.yarn
+
+import java.io._
+import java.nio.ByteBuffer
+import java.security.PrivilegedExceptionAction
+import java.util.{ArrayList => JArrayList, LinkedList => JLinkedList, UUID}
+
+import scala.runtime.AbstractFunction1
+
+import com.google.common.collect.HashMultiset
+import com.google.common.io.ByteStreams
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.junit.Assert.assertEquals
+import org.mockito.Mock
+import org.mockito.MockitoAnnotations
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.mockito.Answers.RETURNS_SMART_NULLS
+import org.mockito.Matchers.{eq => meq, _}
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Matchers}
+
+import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.internal.config._
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager}
+import org.apache.spark.network.buffer.NioManagedBuffer
+import org.apache.spark.network.util.LimitedInputStream
+import org.apache.spark.security.CryptoStreamUtils
+import org.apache.spark.serializer._
+import org.apache.spark.shuffle._
+import org.apache.spark.shuffle.sort.{SerializedShuffleHandle, 
UnsafeShuffleWriter}
+import org.apache.spark.storage._
+import org.apache.spark.util.Utils
+
+private[spark] class YarnIOEncryptionSuite extends SparkFunSuite with 
Matchers with
+  BeforeAndAfterAll with BeforeAndAfterEach {
+  @Mock(answer = RETURNS_SMART_NULLS) private[this] var blockManager: 
BlockManager = _
+  @Mock(answer = RETURNS_SMART_NULLS) private[this] var blockResolver: 
IndexShuffleBlockResolver = _
+  @Mock(answer = RETURNS_SMART_NULLS) private[this] var diskBlockManager: 
DiskBlockManager = _
+  @Mock(answer = RETURNS_SMART_NULLS) private[this] var serializerManager: 
SerializerManager = _
+  @Mock(answer = RETURNS_SMART_NULLS) private[this] var taskContext: 
TaskContext = _
+  @Mock(
+answer = RETURNS_SMART_NULLS) private[this] var shuffleDep: 
ShuffleDependency[Int, Int, Int] = _
+
+  private[this] val NUM_MAPS = 1
+  private[this] val NUM_PARTITITONS = 4
+  private[this] val REDUCE_ID = 1
+  private[this] val SHUFFLE_ID = 0
+  private[this] val conf = new SparkConf()
+  private[this] val memoryManager = new TestMemoryManager(conf)
+  private[this] val hashPartitioner = new HashPartitioner(NUM_PARTITITONS)
+  private[this] val serializer = new KryoSerializer(conf)
+  private[this] val spillFilesCreated = new JLinkedList[File]()
+  private[this] val taskMemoryManager = new 
TaskMemoryManager(memoryManager, 0)
+  private[this] val taskMetrics = new TaskMetrics()
+
+  private[this] var tempDir: File = _
+  private[this] var mergedOutputFile: File = _
+  private[this] var partitionSizesInMergedFile: Array[Long] = _
+  private[this] val ugi = 
UserGroupInformation.createUserForTesting("testuser", Array("testgroup"))
+
+  // Create a mocked shuffle handle to pass into HashShuffleReader.
+  private[this] val shuffleHandle = {
+val dependency = mock(classOf[ShuffleDependency[Int, Int, Int]])
+when(dependency.serializer).thenReturn(serializer)
+when(dependency.aggregator).thenReturn(None)
+when(dependency.keyOrdering).thenReturn(None)
+new BaseShuffleHandle(SHUFFLE_ID, NUM_MAPS, dependency)
+  }
+
+
+  // Make a mocked MapOutputTracker for the shuffle reader to use to 
determine what
+  // 

[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-24 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r76101996
  
--- Diff: 
core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.security
+
+import java.security.PrivilegedExceptionAction
+
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
+import org.apache.spark.security.CryptoStreamUtils._
+
+private[spark] class CryptoStreamUtilsSuite extends SparkFunSuite {
--- End diff --

nit: no need for `private[spark]` in tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-24 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r76101464
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -114,4 +114,24 @@ package object config {
   private[spark] val PYSPARK_PYTHON = ConfigBuilder("spark.pyspark.python")
 .stringConf
 .createOptional
+
+  private[spark] val SPARK_IO_ENCRYPTION_ENABLED = 
ConfigBuilder("spark.io.encryption.enabled")
--- End diff --

nit: can you remove `SPARK_` from the name of these constants? It's kinda 
obvious that they're part of Spark.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-24 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r76101343
  
--- Diff: core/src/main/scala/org/apache/spark/SecurityManager.scala ---
@@ -551,4 +555,20 @@ private[spark] object SecurityManager {
 
   // key used to store the spark secret in the Hadoop UGI
   val SECRET_LOOKUP_KEY = "sparkCookie"
+
+  /**
+   * Setup the cryptographic key used by IO encryption in credentials. The 
key is generated using
+   * [[KeyGenerator]]. The algorithm and key length is specified by the 
[[SparkConf]].
+   */
+  def initIOEncryptionKey(conf: SparkConf, credentials: Credentials): Unit 
= {
+if (credentials.getSecretKey(SPARK_IO_TOKEN) == null) {
+  val keyLen = conf.get(SPARK_IO_ENCRYPTION_KEY_SIZE_BITS)
+  val IOKeyGenAlgorithm = 
conf.get(SPARK_IO_ENCRYPTION_KEYGEN_ALGORITHM)
+  val keyGen = KeyGenerator.getInstance(IOKeyGenAlgorithm)
+  keyGen.init(keyLen)
+
+  val IOKey = keyGen.generateKey()
--- End diff --

nit: `ioKey`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-24 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r76101309
  
--- Diff: core/src/main/scala/org/apache/spark/SecurityManager.scala ---
@@ -551,4 +555,20 @@ private[spark] object SecurityManager {
 
   // key used to store the spark secret in the Hadoop UGI
   val SECRET_LOOKUP_KEY = "sparkCookie"
+
+  /**
+   * Setup the cryptographic key used by IO encryption in credentials. The 
key is generated using
+   * [[KeyGenerator]]. The algorithm and key length is specified by the 
[[SparkConf]].
+   */
+  def initIOEncryptionKey(conf: SparkConf, credentials: Credentials): Unit 
= {
+if (credentials.getSecretKey(SPARK_IO_TOKEN) == null) {
+  val keyLen = conf.get(SPARK_IO_ENCRYPTION_KEY_SIZE_BITS)
+  val IOKeyGenAlgorithm = 
conf.get(SPARK_IO_ENCRYPTION_KEYGEN_ALGORITHM)
--- End diff --

nit: `ioKeyGenAlgorithm`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r75913749
  
--- Diff: 
core/src/test/scala/org/apache/spark/crypto/ShuffleEncryptionSuite.scala ---
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.crypto
--- End diff --

This should move to the same package as `CryptoStreamUtils`, and probably 
be called `CryptoStreamUtilsSuite`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r75913315
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala ---
@@ -143,7 +179,25 @@ private[spark] class 
SerializerManager(defaultSerializer: Serializer, conf: Spar
 val stream = new BufferedInputStream(inputStream)
 getSerializer(implicitly[ClassTag[T]])
   .newInstance()
-  .deserializeStream(wrapForCompression(blockId, stream))
+  .deserializeStream(wrapStream(blockId, stream))
   .asIterator.asInstanceOf[Iterator[T]]
   }
 }
+
+private[spark] object SerializerManager {
+  /**
+   * Setup the cryptographic key used by IO encryption in credentials. The 
key is generated using
+   * [[KeyGenerator]]. The algorithm and key length is specified by the 
[[SparkConf]].
+   */
+  def initShuffleEncryptionKey(conf: SparkConf, credentials: Credentials): 
Unit = {
--- End diff --

I still think this makes more sense in `SecurityManager`; also, this is 
more than shuffle, so maybe just `initEncryptionKey` or ``initIOEncryptionKey`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r75912904
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala ---
@@ -61,6 +66,9 @@ private[spark] class SerializerManager(defaultSerializer: 
Serializer, conf: Spar
   // Whether to compress shuffle output temporarily spilled to disk
   private[this] val compressShuffleSpill = 
conf.getBoolean("spark.shuffle.spill.compress", true)
 
+  // Whether to encrypt IO encryption
--- End diff --

"Whether to enable..."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r75912796
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala ---
@@ -19,17 +19,22 @@ package org.apache.spark.serializer
 
 import java.io.{BufferedInputStream, BufferedOutputStream, InputStream, 
OutputStream}
 import java.nio.ByteBuffer
+import javax.crypto.KeyGenerator
 
+import org.apache.hadoop.security.Credentials
--- End diff --

hmm, not sure how this passed the style checker... but it should be in a 
separate group after `scala.*`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r75912344
  
--- Diff: 
core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.security
+
+import java.io.{InputStream, OutputStream}
+import java.util.Properties
+import javax.crypto.spec.{IvParameterSpec, SecretKeySpec}
+
+import org.apache.commons.crypto.random._
+import org.apache.commons.crypto.stream._
+import org.apache.hadoop.io.Text
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.config._
+
+/**
+ * A util class for manipulating IO encryption and decryption streams.
+ */
+private[spark] object CryptoStreamUtils {
+  /**
+   * Constants and variables for spark IO encryption
+   */
+  val SPARK_IO_TOKEN = new Text("SPARK_IO_TOKEN")
+
+  // The initialization vector length in bytes.
+  val IV_LENGTH_IN_BYTES = 16
+  // The prefix of Crypto related configurations in Spark configuration.
+  val SPARK_COMMONS_CRYPTO_CONF_PREFIX = "spark.commons.crypto."
--- End diff --

I understand this is the configuration of the "commons.crypto" library; but 
this name makes it sound like some more common thing than it is.

For example, if you add commons crypto support somewhere else (e.g. RPC), 
would you use this same config, or would you have a separate config so that 
on-disk and RPC encryption could use different values?

So it would be better to at least put this in the same namespace as other 
io encryption config options. Like `spark.io.encryption.lib.config` or even 
``spark.io.encryption.commons.config`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r75911800
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -114,4 +114,24 @@ package object config {
   private[spark] val PYSPARK_PYTHON = ConfigBuilder("spark.pyspark.python")
 .stringConf
 .createOptional
+
+  private[spark] val SPARK_IO_ENCRYPTION_ENABLED = 
ConfigBuilder("spark.io.encryption.enabled")
+.booleanConf
+.createWithDefault(false)
+
+  private[spark] val SPARK_IO_ENCRYPTION_KEYGEN_ALGORITHM = ConfigBuilder(
+"spark.io.encryption.keygen.algorithm")
--- End diff --

nit: follow this pattern


https://github.com/apache/spark/pull/8880/files#diff-6bdad48cfc34314e89599655442ff210R101


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r75911631
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -413,6 +414,10 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
 }
 
 if (master == "yarn" && deployMode == "client") 
System.setProperty("SPARK_YARN_MODE", "true")
+if (_conf.get(SPARK_IO_ENCRYPTION_ENABLED) && 
!SparkHadoopUtil.get.isYarnMode()) {
+  throw new SparkException("IO encryption is only supported in Yarn 
mode, please disable it " +
--- End diff --

nit: "YARN"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-22 Thread winningsix
Github user winningsix commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r75794260
  
--- Diff: 
core/src/main/scala/org/apache/spark/crypto/CryptoStreamUtils.scala ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.crypto
+
+import java.io.{InputStream, OutputStream}
+import java.util.Properties
+import javax.crypto.spec.{IvParameterSpec, SecretKeySpec}
+
+import org.apache.commons.crypto.random._
+import org.apache.commons.crypto.stream._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.crypto.CryptoConf._
+import org.apache.spark.deploy.SparkHadoopUtil
+
+/**
+ * A util class for manipulating file shuffle encryption and decryption 
streams.
+ */
+private[spark] object CryptoStreamUtils {
+  // The initialization vector length in bytes.
+  val IV_LENGTH_IN_BYTES = 16
+  // The prefix of Crypto related configurations in Spark configuration.
+  val SPARK_COMMONS_CRYPTO_CONF_PREFIX = "spark.commons.crypto."
--- End diff --

This name comes from the ```Apache Commons Crypto```. User can pass 
```Apache Commons Crypto``` related configurations along with prefix "spark." 
to the crypto stream to be created.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-22 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r75712664
  
--- Diff: core/src/main/scala/org/apache/spark/crypto/CryptoConf.scala ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.crypto
+
+import javax.crypto.KeyGenerator
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.Credentials
+
+import org.apache.spark.SparkConf
+
+/**
+ * CryptoConf is a class for Crypto configuration
+ */
+private[spark] object CryptoConf {
+  /**
+   * Constants and variables for spark shuffle file encryption
+   */
+  val SPARK_SHUFFLE_TOKEN = new Text("SPARK_SHUFFLE_TOKEN")
+  val SPARK_SHUFFLE_ENCRYPTION_ENABLED = "spark.shuffle.encryption.enabled"
--- End diff --

Actually, I take that back, since `spark.serializer` is used for more than 
just disk data...

Maybe `spark.io.encryption.*`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-22 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r75712428
  
--- Diff: core/src/main/scala/org/apache/spark/crypto/CryptoConf.scala ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.crypto
+
+import javax.crypto.KeyGenerator
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.Credentials
+
+import org.apache.spark.SparkConf
+
+/**
+ * CryptoConf is a class for Crypto configuration
+ */
+private[spark] object CryptoConf {
+  /**
+   * Constants and variables for spark shuffle file encryption
+   */
+  val SPARK_SHUFFLE_TOKEN = new Text("SPARK_SHUFFLE_TOKEN")
+  val SPARK_SHUFFLE_ENCRYPTION_ENABLED = "spark.shuffle.encryption.enabled"
--- End diff --

Sounds better; but I'd call it `spark.serializer.encryption.enabled` to 
follow other Spark config names.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-22 Thread winningsix
Github user winningsix commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r75631172
  
--- Diff: core/src/main/scala/org/apache/spark/crypto/CryptoConf.scala ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.crypto
+
+import javax.crypto.KeyGenerator
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.Credentials
+
+import org.apache.spark.SparkConf
+
+/**
+ * CryptoConf is a class for Crypto configuration
+ */
+private[spark] object CryptoConf {
+  /**
+   * Constants and variables for spark shuffle file encryption
+   */
+  val SPARK_SHUFFLE_TOKEN = new Text("SPARK_SHUFFLE_TOKEN")
+  val SPARK_SHUFFLE_ENCRYPTION_ENABLED = "spark.shuffle.encryption.enabled"
--- End diff --

How about the name ```spark.serialization.encryption.enabled```?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-19 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r75554156
  
--- Diff: 
yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleEncryptionSuite.scala
 ---
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.yarn
+
+import java.io._
+import java.nio.ByteBuffer
+import java.security.PrivilegedExceptionAction
+import java.util.{ArrayList => JArrayList, LinkedList => JLinkedList, UUID}
+
+import scala.runtime.AbstractFunction1
+
+import com.google.common.collect.HashMultiset
+import com.google.common.io.ByteStreams
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.junit.Assert.assertEquals
+import org.mockito.Mock
+import org.mockito.MockitoAnnotations
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.mockito.Answers.RETURNS_SMART_NULLS
+import org.mockito.Matchers.{eq => meq, _}
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Matchers}
+
+import org.apache.spark._
+import org.apache.spark.crypto.{CryptoConf, CryptoStreamUtils}
+import org.apache.spark.crypto.CryptoConf._
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager}
+import org.apache.spark.network.buffer.NioManagedBuffer
+import org.apache.spark.network.util.LimitedInputStream
+import org.apache.spark.serializer.{DeserializationStream, KryoSerializer, 
SerializerInstance,
+SerializerManager}
+import org.apache.spark.shuffle.{BaseShuffleHandle, 
BlockStoreShuffleReader,
+  IndexShuffleBlockResolver, RecordingManagedBuffer}
+import org.apache.spark.shuffle.sort.{SerializedShuffleHandle, 
UnsafeShuffleWriter}
+import org.apache.spark.storage._
+import org.apache.spark.util.Utils
+
+private[spark] class YarnShuffleEncryptionSuite extends SparkFunSuite with 
Matchers with
+BeforeAndAfterAll 
with BeforeAndAfterEach {
+  @Mock(answer = RETURNS_SMART_NULLS) private[this] var blockManager: 
BlockManager = _
+  @Mock(answer = RETURNS_SMART_NULLS) private[this] var blockResolver: 
IndexShuffleBlockResolver = _
+  @Mock(answer = RETURNS_SMART_NULLS) private[this] var diskBlockManager: 
DiskBlockManager = _
+  @Mock(answer = RETURNS_SMART_NULLS) private[this] var serializerManager: 
SerializerManager = _
+  @Mock(answer = RETURNS_SMART_NULLS) private[this] var taskContext: 
TaskContext = _
+  @Mock(
+answer = RETURNS_SMART_NULLS) private[this] var shuffleDep: 
ShuffleDependency[Int, Int, Int] = _
+
+  private[this] val NUM_MAPS = 1
+  private[this] val NUM_PARTITITONS = 4
+  private[this] val REDUCE_ID = 1
+  private[this] val SHUFFLE_ID = 0
+  private[this] val conf = new SparkConf()
+  private[this] val memoryManager = new TestMemoryManager(conf)
+  private[this] val hashPartitioner = new HashPartitioner(NUM_PARTITITONS)
+  private[this] val serializer = new KryoSerializer(conf)
+  private[this] val spillFilesCreated = new JLinkedList[File]()
+  private[this] val taskMemoryManager = new 
TaskMemoryManager(memoryManager, 0)
+  private[this] val taskMetrics = new TaskMetrics()
+
+  private[this] var tempDir: File = _
+  private[this] var mergedOutputFile: File = _
+  private[this] var partitionSizesInMergedFile: Array[Long] = _
+  private[this] val ugi = 
UserGroupInformation.createUserForTesting("testuser", Array("testgroup"))
+
+  // Create a mocked shuffle handle to pass into HashShuffleReader.
+  private[this] val shuffleHandle = {
+val dependency = mock(classOf[ShuffleDependency[Int, Int, Int]])
+when(dependency.serializer).thenReturn(serializer)
+

[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-19 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r75553873
  
--- Diff: 
yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleEncryptionSuite.scala
 ---
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.yarn
+
+import java.io._
+import java.nio.ByteBuffer
+import java.security.PrivilegedExceptionAction
+import java.util.{ArrayList => JArrayList, LinkedList => JLinkedList, UUID}
+
+import scala.runtime.AbstractFunction1
+
+import com.google.common.collect.HashMultiset
+import com.google.common.io.ByteStreams
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.junit.Assert.assertEquals
+import org.mockito.Mock
+import org.mockito.MockitoAnnotations
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.mockito.Answers.RETURNS_SMART_NULLS
+import org.mockito.Matchers.{eq => meq, _}
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Matchers}
+
+import org.apache.spark._
+import org.apache.spark.crypto.{CryptoConf, CryptoStreamUtils}
+import org.apache.spark.crypto.CryptoConf._
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager}
+import org.apache.spark.network.buffer.NioManagedBuffer
+import org.apache.spark.network.util.LimitedInputStream
+import org.apache.spark.serializer.{DeserializationStream, KryoSerializer, 
SerializerInstance,
+SerializerManager}
+import org.apache.spark.shuffle.{BaseShuffleHandle, 
BlockStoreShuffleReader,
+  IndexShuffleBlockResolver, RecordingManagedBuffer}
+import org.apache.spark.shuffle.sort.{SerializedShuffleHandle, 
UnsafeShuffleWriter}
+import org.apache.spark.storage._
+import org.apache.spark.util.Utils
+
+private[spark] class YarnShuffleEncryptionSuite extends SparkFunSuite with 
Matchers with
+BeforeAndAfterAll 
with BeforeAndAfterEach {
+  @Mock(answer = RETURNS_SMART_NULLS) private[this] var blockManager: 
BlockManager = _
+  @Mock(answer = RETURNS_SMART_NULLS) private[this] var blockResolver: 
IndexShuffleBlockResolver = _
+  @Mock(answer = RETURNS_SMART_NULLS) private[this] var diskBlockManager: 
DiskBlockManager = _
+  @Mock(answer = RETURNS_SMART_NULLS) private[this] var serializerManager: 
SerializerManager = _
+  @Mock(answer = RETURNS_SMART_NULLS) private[this] var taskContext: 
TaskContext = _
+  @Mock(
+answer = RETURNS_SMART_NULLS) private[this] var shuffleDep: 
ShuffleDependency[Int, Int, Int] = _
+
+  private[this] val NUM_MAPS = 1
+  private[this] val NUM_PARTITITONS = 4
+  private[this] val REDUCE_ID = 1
+  private[this] val SHUFFLE_ID = 0
+  private[this] val conf = new SparkConf()
+  private[this] val memoryManager = new TestMemoryManager(conf)
+  private[this] val hashPartitioner = new HashPartitioner(NUM_PARTITITONS)
+  private[this] val serializer = new KryoSerializer(conf)
+  private[this] val spillFilesCreated = new JLinkedList[File]()
+  private[this] val taskMemoryManager = new 
TaskMemoryManager(memoryManager, 0)
+  private[this] val taskMetrics = new TaskMetrics()
+
+  private[this] var tempDir: File = _
+  private[this] var mergedOutputFile: File = _
+  private[this] var partitionSizesInMergedFile: Array[Long] = _
+  private[this] val ugi = 
UserGroupInformation.createUserForTesting("testuser", Array("testgroup"))
+
+  // Create a mocked shuffle handle to pass into HashShuffleReader.
+  private[this] val shuffleHandle = {
+val dependency = mock(classOf[ShuffleDependency[Int, Int, Int]])
+when(dependency.serializer).thenReturn(serializer)
+

[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-19 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r75553741
  
--- Diff: 
yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleEncryptionSuite.scala
 ---
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.yarn
+
+import java.io._
+import java.nio.ByteBuffer
+import java.security.PrivilegedExceptionAction
+import java.util.{ArrayList => JArrayList, LinkedList => JLinkedList, UUID}
+
+import scala.runtime.AbstractFunction1
+
+import com.google.common.collect.HashMultiset
+import com.google.common.io.ByteStreams
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.junit.Assert.assertEquals
+import org.mockito.Mock
+import org.mockito.MockitoAnnotations
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.mockito.Answers.RETURNS_SMART_NULLS
+import org.mockito.Matchers.{eq => meq, _}
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Matchers}
+
+import org.apache.spark._
+import org.apache.spark.crypto.{CryptoConf, CryptoStreamUtils}
+import org.apache.spark.crypto.CryptoConf._
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager}
+import org.apache.spark.network.buffer.NioManagedBuffer
+import org.apache.spark.network.util.LimitedInputStream
+import org.apache.spark.serializer.{DeserializationStream, KryoSerializer, 
SerializerInstance,
+SerializerManager}
+import org.apache.spark.shuffle.{BaseShuffleHandle, 
BlockStoreShuffleReader,
+  IndexShuffleBlockResolver, RecordingManagedBuffer}
+import org.apache.spark.shuffle.sort.{SerializedShuffleHandle, 
UnsafeShuffleWriter}
+import org.apache.spark.storage._
+import org.apache.spark.util.Utils
+
+private[spark] class YarnShuffleEncryptionSuite extends SparkFunSuite with 
Matchers with
+BeforeAndAfterAll 
with BeforeAndAfterEach {
--- End diff --

nit: indented too far.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-19 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r75553689
  
--- Diff: dev/deps/spark-deps-hadoop-2.7 ---
@@ -112,6 +113,7 @@ jets3t-0.9.3.jar
 jetty-6.1.26.jar
 jetty-util-6.1.26.jar
 jline-2.12.1.jar
+jna-4.2.2.jar
--- End diff --

Hope this one doesn't add problems... is it really required by the crypto 
library? Or could we exclude it (and the library falls back to JCE if JNI 
libraries are not available)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-19 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r75553471
  
--- Diff: 
yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleEncryptionSuite.scala
 ---
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.yarn
+
+import java.io._
+import java.nio.ByteBuffer
+import java.security.PrivilegedExceptionAction
+import java.util.{ArrayList => JArrayList, LinkedList => JLinkedList, UUID}
+
+import scala.runtime.AbstractFunction1
+
+import com.google.common.collect.HashMultiset
+import com.google.common.io.ByteStreams
+import org.apache.hadoop.security.{Credentials, UserGroupInformation}
+import org.junit.Assert.assertEquals
+import org.mockito.Mock
+import org.mockito.MockitoAnnotations
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.mockito.Answers.RETURNS_SMART_NULLS
+import org.mockito.Matchers.{eq => meq, _}
+import org.mockito.Mockito._
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Matchers}
+
+import org.apache.spark._
+import org.apache.spark.crypto.{CryptoConf, CryptoStreamUtils}
+import org.apache.spark.crypto.CryptoConf._
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.memory.{TaskMemoryManager, TestMemoryManager}
+import org.apache.spark.network.buffer.NioManagedBuffer
+import org.apache.spark.network.util.LimitedInputStream
+import org.apache.spark.serializer.{DeserializationStream, KryoSerializer, 
SerializerInstance,
+SerializerManager}
--- End diff --

indent. but, in general, if you get to the point where you need to wrap the 
line, just import `_`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-19 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r75551862
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala ---
@@ -103,6 +107,20 @@ private[spark] class 
SerializerManager(defaultSerializer: Serializer, conf: Spar
   }
 
   /**
+   * Wrap an input stream for encryption if shuffle encryption is enabled
+   */
+  def wrapForEncryption(s: InputStream): InputStream = {
--- End diff --

Doing this would also simplify other interfaces, e.g., you wouldn't need 
the dummy `EncryptStream` classes in a bunch of tests because 
`DiskBlockObjectWriter` would just take a generic `wrapStream` function instead 
of separate ones for compression and encryption.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-19 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r75551184
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala ---
@@ -103,6 +107,20 @@ private[spark] class 
SerializerManager(defaultSerializer: Serializer, conf: Spar
   }
 
   /**
+   * Wrap an input stream for encryption if shuffle encryption is enabled
+   */
+  def wrapForEncryption(s: InputStream): InputStream = {
--- End diff --

How about having a "wrap" method that just sets up everything for you? That 
avoids having the callers in a bunch of places call both `wrapForEncryption` 
and `wrapForCompression` and have to worry about doing it in the right order.

I'd prefer if the existing `wrapFor...` methods became private or went 
away, but if they make testing easier, then you can keep them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-19 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r75550687
  
--- Diff: core/src/main/scala/org/apache/spark/crypto/CryptoConf.scala ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.crypto
+
+import javax.crypto.KeyGenerator
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.Credentials
+
+import org.apache.spark.SparkConf
+
+/**
+ * CryptoConf is a class for Crypto configuration
+ */
+private[spark] object CryptoConf {
+  /**
+   * Constants and variables for spark shuffle file encryption
+   */
+  val SPARK_SHUFFLE_TOKEN = new Text("SPARK_SHUFFLE_TOKEN")
+  val SPARK_SHUFFLE_ENCRYPTION_ENABLED = "spark.shuffle.encryption.enabled"
--- End diff --

After looking at `SerializerManager`, it feels like this option name is 
misleading. The option applies not just to shuffle files, but to any streams 
created by `SerializerManager`. So maybe the config name should be more generic?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-19 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r75549736
  
--- Diff: 
core/src/main/scala/org/apache/spark/crypto/CryptoStreamUtils.scala ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.crypto
--- End diff --

If you follow my suggestion in the previous class, this will be the only 
class in this package... maybe at that point it's better to move it to 
`core/src/main/scala/org/apache/spark/security`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-19 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r75548733
  
--- Diff: 
core/src/main/scala/org/apache/spark/crypto/CryptoStreamUtils.scala ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.crypto
+
+import java.io.{InputStream, OutputStream}
+import java.util.Properties
+import javax.crypto.spec.{IvParameterSpec, SecretKeySpec}
+
+import org.apache.commons.crypto.random._
+import org.apache.commons.crypto.stream._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.crypto.CryptoConf._
+import org.apache.spark.deploy.SparkHadoopUtil
+
+/**
+ * A util class for manipulating file shuffle encryption and decryption 
streams.
+ */
+private[spark] object CryptoStreamUtils {
+  // The initialization vector length in bytes.
+  val IV_LENGTH_IN_BYTES = 16
+  // The prefix of Crypto related configurations in Spark configuration.
+  val SPARK_COMMONS_CRYPTO_CONF_PREFIX = "spark.commons.crypto."
+  // The prefix for the configurations passing to Commons-crypto library.
+  val COMMONS_CRYPTO_CONF_PREFIX = "commons.crypto."
+
+
+  /**
+   * Helper method to wrap [[OutputStream]] with [[CryptoOutputStream]] 
for encryption.
+   */
+  def createCryptoOutputStream(
+  os: OutputStream,
+  sparkConf: SparkConf): OutputStream = {
+val properties = toCryptoConf(sparkConf, 
SPARK_COMMONS_CRYPTO_CONF_PREFIX,
+  COMMONS_CRYPTO_CONF_PREFIX)
+val iv: Array[Byte] = createInitializationVector(properties)
+os.write(iv)
+val credentials = SparkHadoopUtil.get.getCurrentUserCredentials()
+val key = credentials.getSecretKey(SPARK_SHUFFLE_TOKEN)
+val transformationStr = sparkConf.get(
+  "spark.shuffle.crypto.cipher.transformation", "AES/CTR/NoPadding")
--- End diff --

Add a new config constant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-19 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r75548671
  
--- Diff: 
core/src/main/scala/org/apache/spark/crypto/CryptoStreamUtils.scala ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.crypto
+
+import java.io.{InputStream, OutputStream}
+import java.util.Properties
+import javax.crypto.spec.{IvParameterSpec, SecretKeySpec}
+
+import org.apache.commons.crypto.random._
+import org.apache.commons.crypto.stream._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.crypto.CryptoConf._
+import org.apache.spark.deploy.SparkHadoopUtil
+
+/**
+ * A util class for manipulating file shuffle encryption and decryption 
streams.
+ */
+private[spark] object CryptoStreamUtils {
+  // The initialization vector length in bytes.
+  val IV_LENGTH_IN_BYTES = 16
+  // The prefix of Crypto related configurations in Spark configuration.
+  val SPARK_COMMONS_CRYPTO_CONF_PREFIX = "spark.commons.crypto."
+  // The prefix for the configurations passing to Commons-crypto library.
+  val COMMONS_CRYPTO_CONF_PREFIX = "commons.crypto."
+
+
+  /**
+   * Helper method to wrap [[OutputStream]] with [[CryptoOutputStream]] 
for encryption.
+   */
+  def createCryptoOutputStream(
+  os: OutputStream,
+  sparkConf: SparkConf): OutputStream = {
+val properties = toCryptoConf(sparkConf, 
SPARK_COMMONS_CRYPTO_CONF_PREFIX,
+  COMMONS_CRYPTO_CONF_PREFIX)
+val iv: Array[Byte] = createInitializationVector(properties)
--- End diff --

nit: type is not necessary


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-19 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r75546604
  
--- Diff: 
core/src/main/scala/org/apache/spark/crypto/CryptoStreamUtils.scala ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.crypto
+
+import java.io.{InputStream, OutputStream}
+import java.util.Properties
+import javax.crypto.spec.{IvParameterSpec, SecretKeySpec}
+
+import org.apache.commons.crypto.random._
+import org.apache.commons.crypto.stream._
+
+import org.apache.spark.SparkConf
+import org.apache.spark.crypto.CryptoConf._
+import org.apache.spark.deploy.SparkHadoopUtil
+
+/**
+ * A util class for manipulating file shuffle encryption and decryption 
streams.
+ */
+private[spark] object CryptoStreamUtils {
+  // The initialization vector length in bytes.
+  val IV_LENGTH_IN_BYTES = 16
+  // The prefix of Crypto related configurations in Spark configuration.
+  val SPARK_COMMONS_CRYPTO_CONF_PREFIX = "spark.commons.crypto."
--- End diff --

Not a fan of this name. Maybe just `spark.crypto`? Or `spark.crypto.lib`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-19 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r75546361
  
--- Diff: core/src/main/scala/org/apache/spark/crypto/CryptoConf.scala ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.crypto
+
+import javax.crypto.KeyGenerator
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.Credentials
+
+import org.apache.spark.SparkConf
+
+/**
+ * CryptoConf is a class for Crypto configuration
+ */
+private[spark] object CryptoConf {
+  /**
+   * Constants and variables for spark shuffle file encryption
+   */
+  val SPARK_SHUFFLE_TOKEN = new Text("SPARK_SHUFFLE_TOKEN")
+  val SPARK_SHUFFLE_ENCRYPTION_ENABLED = "spark.shuffle.encryption.enabled"
+  val SPARK_SHUFFLE_ENCRYPTION_KEYGEN_ALGORITHM = 
"spark.shuffle.encryption.keygen.algorithm"
+  val DEFAULT_SPARK_SHUFFLE_ENCRYPTION_KEYGEN_ALGORITHM = "HmacSHA1"
+  val SPARK_SHUFFLE_ENCRYPTION_KEY_SIZE_BITS = 
"spark.shuffle.encryption.keySizeBits"
+  val DEFAULT_SPARK_SHUFFLE_ENCRYPTION_KEY_SIZE_BITS = 128
+
+  /**
+   * Check whether shuffle file encryption is enabled. It is disabled by 
default.
+   */
+  def isShuffleEncryptionEnabled(sparkConf: SparkConf): Boolean = {
+if (sparkConf != null) {
+  sparkConf.getBoolean(SPARK_SHUFFLE_ENCRYPTION_ENABLED, false)
+} else {
+  false
+}
+  }
+
+  /**
+   * Setup the cryptographic key used by file shuffle encryption in 
credentials. The key is
+   * generated using [[KeyGenerator]]. The algorithm and key length is 
specified by the
+   * [[SparkConf]].
+   */
+  def initSparkShuffleCredentials(conf: SparkConf, credentials: 
Credentials): Unit = {
--- End diff --

Since all the config stuff now should move to a new location, maybe this 
method could instead be added to `SecurityManager`? Avoids adding a new class. 
Then call it `initShuffleEncryptionKey` or something along those lines.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-19 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r75545710
  
--- Diff: core/src/main/scala/org/apache/spark/crypto/CryptoConf.scala ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.crypto
+
+import javax.crypto.KeyGenerator
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.Credentials
+
+import org.apache.spark.SparkConf
+
+/**
+ * CryptoConf is a class for Crypto configuration
+ */
+private[spark] object CryptoConf {
+  /**
+   * Constants and variables for spark shuffle file encryption
+   */
+  val SPARK_SHUFFLE_TOKEN = new Text("SPARK_SHUFFLE_TOKEN")
+  val SPARK_SHUFFLE_ENCRYPTION_ENABLED = "spark.shuffle.encryption.enabled"
+  val SPARK_SHUFFLE_ENCRYPTION_KEYGEN_ALGORITHM = 
"spark.shuffle.encryption.keygen.algorithm"
+  val DEFAULT_SPARK_SHUFFLE_ENCRYPTION_KEYGEN_ALGORITHM = "HmacSHA1"
+  val SPARK_SHUFFLE_ENCRYPTION_KEY_SIZE_BITS = 
"spark.shuffle.encryption.keySizeBits"
+  val DEFAULT_SPARK_SHUFFLE_ENCRYPTION_KEY_SIZE_BITS = 128
+
+  /**
+   * Check whether shuffle file encryption is enabled. It is disabled by 
default.
+   */
+  def isShuffleEncryptionEnabled(sparkConf: SparkConf): Boolean = {
+if (sparkConf != null) {
+  sparkConf.getBoolean(SPARK_SHUFFLE_ENCRYPTION_ENABLED, false)
+} else {
+  false
+}
+  }
+
+  /**
+   * Setup the cryptographic key used by file shuffle encryption in 
credentials. The key is
+   * generated using [[KeyGenerator]]. The algorithm and key length is 
specified by the
+   * [[SparkConf]].
+   */
+  def initSparkShuffleCredentials(conf: SparkConf, credentials: 
Credentials): Unit = {
+if (credentials.getSecretKey(SPARK_SHUFFLE_TOKEN) == null) {
+  val keyLen = conf.getInt(SPARK_SHUFFLE_ENCRYPTION_KEY_SIZE_BITS,
+DEFAULT_SPARK_SHUFFLE_ENCRYPTION_KEY_SIZE_BITS)
+  require(keyLen == 128 || keyLen == 192 || keyLen == 256)
--- End diff --

As an example of the new config stuff, this can all be encapsulated in the 
config itself.

```
val SPARK_SHUFFLE_ENCRYPTION_KEY_SIZE_BITS = ConfigBuilder("...")
  .intConf
  .checkValues(Set(128, 192, 256))
  .createWithDefault(128)
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-19 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r75545302
  
--- Diff: core/src/main/scala/org/apache/spark/crypto/CryptoConf.scala ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.crypto
+
+import javax.crypto.KeyGenerator
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.Credentials
+
+import org.apache.spark.SparkConf
+
+/**
+ * CryptoConf is a class for Crypto configuration
+ */
+private[spark] object CryptoConf {
+  /**
+   * Constants and variables for spark shuffle file encryption
+   */
+  val SPARK_SHUFFLE_TOKEN = new Text("SPARK_SHUFFLE_TOKEN")
+  val SPARK_SHUFFLE_ENCRYPTION_ENABLED = "spark.shuffle.encryption.enabled"
+  val SPARK_SHUFFLE_ENCRYPTION_KEYGEN_ALGORITHM = 
"spark.shuffle.encryption.keygen.algorithm"
+  val DEFAULT_SPARK_SHUFFLE_ENCRYPTION_KEYGEN_ALGORITHM = "HmacSHA1"
+  val SPARK_SHUFFLE_ENCRYPTION_KEY_SIZE_BITS = 
"spark.shuffle.encryption.keySizeBits"
+  val DEFAULT_SPARK_SHUFFLE_ENCRYPTION_KEY_SIZE_BITS = 128
+
+  /**
+   * Check whether shuffle file encryption is enabled. It is disabled by 
default.
+   */
+  def isShuffleEncryptionEnabled(sparkConf: SparkConf): Boolean = {
+if (sparkConf != null) {
--- End diff --

With the above suggestion this method will go away, and instead you should 
use `sparkConf.get(SHUFFLE_ENCRYPTION_ENABLED)` at the call site. Which means 
you'll need a SparkConf to read it.

Not sure in which case you expect SparkConf to be null here, but that looks 
a little suspicious anyway.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #8880: [SPARK-5682][Core] Add encrypted shuffle in spark

2016-08-19 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/8880#discussion_r75545187
  
--- Diff: core/src/main/scala/org/apache/spark/crypto/CryptoConf.scala ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.crypto
+
+import javax.crypto.KeyGenerator
+
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.security.Credentials
+
+import org.apache.spark.SparkConf
+
+/**
+ * CryptoConf is a class for Crypto configuration
+ */
+private[spark] object CryptoConf {
+  /**
+   * Constants and variables for spark shuffle file encryption
+   */
+  val SPARK_SHUFFLE_TOKEN = new Text("SPARK_SHUFFLE_TOKEN")
--- End diff --

Since this code was written we've added a new config API, it would be 
better if these were added to 
`core/src/main/scala/org/apache/spark/internal/config/package.scala` instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org