[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-29 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/17295


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r108087419
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala ---
@@ -17,31 +17,52 @@
 
 package org.apache.spark.storage
 
-import org.apache.spark.network.buffer.{ManagedBuffer, NettyManagedBuffer}
+import java.io.InputStream
+import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.spark.network.buffer.ManagedBuffer
 import org.apache.spark.util.io.ChunkedByteBuffer
 
 /**
- * This [[ManagedBuffer]] wraps a [[ChunkedByteBuffer]] retrieved from the 
[[BlockManager]]
+ * This [[ManagedBuffer]] wraps a [[BlockData]] instance retrieved from 
the [[BlockManager]]
  * so that the corresponding block's read lock can be released once this 
buffer's references
  * are released.
  *
+ * If `dispose` is set to try, the [[BlockData]]will be disposed when the 
buffer's reference
--- End diff --

`is set to try` -> `is set to true`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r108046997
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -56,6 +57,49 @@ private[spark] class BlockResult(
 val bytes: Long)
 
 /**
+ * Abstracts away how blocks are stored and provides different ways to 
read the underlying block
+ * data. Callers should call [[dispose()]] when they're done with the 
block.
+ */
+private[spark] trait BlockData {
+
+  def toInputStream(): InputStream
+
+  /**
+   * Returns a Netty-friendly wrapper for the block's data.
+   *
+   * @see [[ManagedBuffer#convertToNetty()]]
+   */
+  def toNetty(): Object
+
+  def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer
+
+  def toByteBuffer(): ByteBuffer
+
+  def size: Long
+
+  def dispose(): Unit
+
+}
+
+private[spark] class ByteBufferBlockData(val buffer: ChunkedByteBuffer) 
extends BlockData {
+
+  override def toInputStream(): InputStream = buffer.toInputStream(dispose 
= false)
+
+  override def toNetty(): Object = buffer.toNetty
+
+  override def toChunkedByteBuffer(allocator: Int => ByteBuffer): 
ChunkedByteBuffer = {
+buffer.copy(allocator)
+  }
+
+  override def toByteBuffer(): ByteBuffer = buffer.toByteBuffer
+
+  override def size: Long = buffer.size
+
+  override def dispose(): Unit = buffer.unmap()
--- End diff --

BTW I'm really starting to think the fix in #16499, while technically 
correct, is more confusing that it should be. The problem is not that the code 
was disposing of off-heap buffers; the problem is that buffers read from the 
memory store should not be disposed of, while buffers read from the disk store 
should.

So it's not really a matter of dispose vs. unmap, but a matter of where the 
buffer come from. (Which is kinda what I had in this patch with the 
`autoDispose` parameter to `ByteBufferBlockData`. Perhaps I should revive that 
and get rid of `StorageUtils.unmap`, which is just confusing.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r108046686
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -56,6 +57,49 @@ private[spark] class BlockResult(
 val bytes: Long)
 
 /**
+ * Abstracts away how blocks are stored and provides different ways to 
read the underlying block
+ * data. Callers should call [[dispose()]] when they're done with the 
block.
+ */
+private[spark] trait BlockData {
+
+  def toInputStream(): InputStream
+
+  /**
+   * Returns a Netty-friendly wrapper for the block's data.
+   *
+   * @see [[ManagedBuffer#convertToNetty()]]
+   */
+  def toNetty(): Object
+
+  def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer
+
+  def toByteBuffer(): ByteBuffer
+
+  def size: Long
+
+  def dispose(): Unit
+
+}
+
+private[spark] class ByteBufferBlockData(val buffer: ChunkedByteBuffer) 
extends BlockData {
+
+  override def toInputStream(): InputStream = buffer.toInputStream(dispose 
= false)
+
+  override def toNetty(): Object = buffer.toNetty
+
+  override def toChunkedByteBuffer(allocator: Int => ByteBuffer): 
ChunkedByteBuffer = {
+buffer.copy(allocator)
+  }
+
+  override def toByteBuffer(): ByteBuffer = buffer.toByteBuffer
+
+  override def size: Long = buffer.size
+
+  override def dispose(): Unit = buffer.unmap()
--- End diff --

I think `BlockData.dispose()` is pretty well defined. "Release any 
resources held by the object." What's confusing is that there's both 
`dispose()` and `unmap()` in `ChunkedByteBuffer`, when there used to be only 
`dispose()`. It's confusing to have two different methods for releasing 
resources, and that confusion is not being caused by this patch.

`BlockData` is not just a wrapper around `ChunkedByteBuffer`; if it were 
there wouldn't be a need for it. Which is why calling the method `unmmap()` 
wouldn't make any sense here, since that's very specific to memory-mapped byte 
buffers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r108035391
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -56,6 +57,49 @@ private[spark] class BlockResult(
 val bytes: Long)
 
 /**
+ * Abstracts away how blocks are stored and provides different ways to 
read the underlying block
+ * data. Callers should call [[dispose()]] when they're done with the 
block.
+ */
+private[spark] trait BlockData {
+
+  def toInputStream(): InputStream
+
+  /**
+   * Returns a Netty-friendly wrapper for the block's data.
+   *
+   * @see [[ManagedBuffer#convertToNetty()]]
+   */
+  def toNetty(): Object
+
+  def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer
+
+  def toByteBuffer(): ByteBuffer
+
+  def size: Long
+
+  def dispose(): Unit
+
+}
+
+private[spark] class ByteBufferBlockData(val buffer: ChunkedByteBuffer) 
extends BlockData {
+
+  override def toInputStream(): InputStream = buffer.toInputStream(dispose 
= false)
+
+  override def toNetty(): Object = buffer.toNetty
+
+  override def toChunkedByteBuffer(allocator: Int => ByteBuffer): 
ChunkedByteBuffer = {
+buffer.copy(allocator)
+  }
+
+  override def toByteBuffer(): ByteBuffer = buffer.toByteBuffer
+
+  override def size: Long = buffer.size
+
+  override def dispose(): Unit = buffer.unmap()
--- End diff --

can we define the semantic of the `BlockData.dispose` clearly? It's quite 
confusing here that the `dispose` method call `buffer.unmap` while 
`ChunkedByteBuffer` also has a `dispose` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-24 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r107952007
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1065,7 +1084,7 @@ private[spark] class BlockManager(
   try {
 replicate(blockId, bytesToReplicate, level, remoteClassTag)
   } finally {
-bytesToReplicate.unmap()
+bytesToReplicate.dispose()
--- End diff --

`BlockData.dispose` calls `ChunkedByteBuffer.unmap`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r107832519
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1065,7 +1084,7 @@ private[spark] class BlockManager(
   try {
 replicate(blockId, bytesToReplicate, level, remoteClassTag)
   } finally {
-bytesToReplicate.unmap()
+bytesToReplicate.dispose()
--- End diff --

I'm afraid this may counteract the effort we made in 
https://github.com/apache/spark/pull/16499

Ideally `unmap` and `dispose` do different things

cc @mallman 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r107789203
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -56,6 +57,44 @@ private[spark] class BlockResult(
 val bytes: Long)
 
 /**
+ * Abstracts away how blocks are stored and provides different ways to 
read the underlying block
+ * data. Callers should call [[dispose()]] when they're done with the 
block.
+ */
+private[spark] trait BlockData {
+
+  def toInputStream(): InputStream
+
+  def toNetty(): Object
+
+  def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer
+
+  def toByteBuffer(): ByteBuffer
--- End diff --

I added scaladoc for `toNetty()`, but the others seem self-explanatory to 
me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r107787884
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -56,6 +57,44 @@ private[spark] class BlockResult(
 val bytes: Long)
 
 /**
+ * Abstracts away how blocks are stored and provides different ways to 
read the underlying block
+ * data. Callers should call [[dispose()]] when they're done with the 
block.
+ */
+private[spark] trait BlockData {
+
+  def toInputStream(): InputStream
+
+  def toNetty(): Object
--- End diff --

See `ManagedBuffer.convertToNetty()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r107787818
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -17,48 +17,67 @@
 
 package org.apache.spark.storage
 
-import java.io.{FileOutputStream, IOException, RandomAccessFile}
+import java.io._
 import java.nio.ByteBuffer
+import java.nio.channels.{Channels, ReadableByteChannel, 
WritableByteChannel}
 import java.nio.channels.FileChannel.MapMode
+import java.nio.charset.StandardCharsets.UTF_8
+import java.util.concurrent.ConcurrentHashMap
 
-import com.google.common.io.Closeables
+import scala.collection.mutable.ListBuffer
 
-import org.apache.spark.SparkConf
+import com.google.common.io.{ByteStreams, Closeables, Files}
+import io.netty.channel.FileRegion
+import io.netty.util.AbstractReferenceCounted
+
+import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.Utils
+import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.security.CryptoStreamUtils
+import org.apache.spark.util.{ByteBufferInputStream, Utils}
 import org.apache.spark.util.io.ChunkedByteBuffer
 
 /**
  * Stores BlockManager blocks on disk.
  */
-private[spark] class DiskStore(conf: SparkConf, diskManager: 
DiskBlockManager) extends Logging {
+private[spark] class DiskStore(
+conf: SparkConf,
+diskManager: DiskBlockManager,
+securityManager: SecurityManager) extends Logging {
 
   private val minMemoryMapBytes = 
conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
+  private val blockSizes = new ConcurrentHashMap[String, Long]()
 
-  def getSize(blockId: BlockId): Long = {
-diskManager.getFile(blockId.name).length
-  }
+  def getSize(blockId: BlockId): Long = blockSizes.get(blockId.name)
 
   /**
* Invokes the provided callback function to write the specific block.
*
* @throws IllegalStateException if the block already exists in the disk 
store.
*/
-  def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = {
+  def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit 
= {
 if (contains(blockId)) {
   throw new IllegalStateException(s"Block $blockId is already present 
in the disk store")
 }
 logDebug(s"Attempting to put block $blockId")
 val startTime = System.currentTimeMillis
 val file = diskManager.getFile(blockId)
-val fileOutputStream = new FileOutputStream(file)
+val out = new CountingWritableChannel(openForWrite(file))
 var threwException: Boolean = true
 try {
-  writeFunc(fileOutputStream)
+  writeFunc(out)
+  blockSizes.put(blockId.name, out.getCount)
   threwException = false
 } finally {
   try {
-Closeables.close(fileOutputStream, threwException)
+out.close()
+  } catch {
+case ioe: IOException =>
--- End diff --

The code needs to catch any exception thrown by `out.close()` and also 
remove the block in that case. That wasn't done before.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r107787099
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala ---
@@ -31,17 +35,31 @@ import org.apache.spark.util.io.ChunkedByteBuffer
 private[storage] class BlockManagerManagedBuffer(
 blockInfoManager: BlockInfoManager,
 blockId: BlockId,
-chunkedBuffer: ChunkedByteBuffer) extends 
NettyManagedBuffer(chunkedBuffer.toNetty) {
+data: BlockData,
+dispose: Boolean) extends ManagedBuffer {
--- End diff --

Hmm, I prefer `dispose`, because it's not about needing to dispose the 
buffer, but wanting to dispose the buffer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r107786888
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1065,7 +1084,7 @@ private[spark] class BlockManager(
   try {
 replicate(blockId, bytesToReplicate, level, remoteClassTag)
   } finally {
-bytesToReplicate.unmap()
+bytesToReplicate.dispose()
--- End diff --

Because there's no `BlockData.unmap()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r107786384
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala ---
@@ -167,30 +167,26 @@ private[spark] class SerializerManager(
 val byteStream = new BufferedOutputStream(outputStream)
 val autoPick = !blockId.isInstanceOf[StreamBlockId]
 val ser = getSerializer(implicitly[ClassTag[T]], 
autoPick).newInstance()
-ser.serializeStream(wrapStream(blockId, 
byteStream)).writeAll(values).close()
--- End diff --

They're still used in a bunch of places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r107786072
  
--- Diff: 
core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
@@ -63,12 +84,27 @@ private[spark] object CryptoStreamUtils extends Logging 
{
   is: InputStream,
   sparkConf: SparkConf,
   key: Array[Byte]): InputStream = {
-val properties = toCryptoConf(sparkConf)
 val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
-is.read(iv, 0, iv.length)
-val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION)
-new CryptoInputStream(transformationStr, properties, is,
-  new SecretKeySpec(key, "AES"), new IvParameterSpec(iv))
+ByteStreams.readFully(is, iv)
+val params = new CryptoParams(key, sparkConf)
+new CryptoInputStream(params.transformation, params.conf, is, 
params.keySpec,
+  new IvParameterSpec(iv))
+  }
+
+  /**
+   * Wrap a `ReadableByteChannel` for decryption.
+   */
+  def createReadableChannel(
+  channel: ReadableByteChannel,
+  sparkConf: SparkConf,
+  key: Array[Byte]): ReadableByteChannel = {
+val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
+val buf = ByteBuffer.wrap(iv)
+JavaUtils.readFully(channel, buf)
--- End diff --

There's no `ByteStreams.readFully` for `ReadableByteChannel` that I'm aware 
of.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r107785760
  
--- Diff: 
core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
@@ -48,12 +51,30 @@ private[spark] object CryptoStreamUtils extends Logging 
{
   os: OutputStream,
   sparkConf: SparkConf,
   key: Array[Byte]): OutputStream = {
-val properties = toCryptoConf(sparkConf)
-val iv = createInitializationVector(properties)
+val params = new CryptoParams(key, sparkConf)
+val iv = createInitializationVector(params.conf)
 os.write(iv)
-val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION)
-new CryptoOutputStream(transformationStr, properties, os,
-  new SecretKeySpec(key, "AES"), new IvParameterSpec(iv))
+new CryptoOutputStream(params.transformation, params.conf, os, 
params.keySpec,
+  new IvParameterSpec(iv))
+  }
+
+  /**
+   * Wrap a `WritableByteChannel` for encryption.
+   */
+  def createWritableChannel(
+  channel: WritableByteChannel,
+  sparkConf: SparkConf,
+  key: Array[Byte]): WritableByteChannel = {
+val params = new CryptoParams(key, sparkConf)
+val iv = createInitializationVector(params.conf)
+val buf = ByteBuffer.wrap(iv)
+while (buf.hasRemaining()) {
--- End diff --

No, there's no infinite loop here, because a failure would cause an 
exception. Yeah, using the helper should work too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r107327362
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -56,6 +57,44 @@ private[spark] class BlockResult(
 val bytes: Long)
 
 /**
+ * Abstracts away how blocks are stored and provides different ways to 
read the underlying block
+ * data. Callers should call [[dispose()]] when they're done with the 
block.
+ */
+private[spark] trait BlockData {
+
+  def toInputStream(): InputStream
+
+  def toNetty(): Object
--- End diff --

why the return type is `Object`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r107327188
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -17,48 +17,67 @@
 
 package org.apache.spark.storage
 
-import java.io.{FileOutputStream, IOException, RandomAccessFile}
+import java.io._
 import java.nio.ByteBuffer
+import java.nio.channels.{Channels, ReadableByteChannel, 
WritableByteChannel}
 import java.nio.channels.FileChannel.MapMode
+import java.nio.charset.StandardCharsets.UTF_8
+import java.util.concurrent.ConcurrentHashMap
 
-import com.google.common.io.Closeables
+import scala.collection.mutable.ListBuffer
 
-import org.apache.spark.SparkConf
+import com.google.common.io.{ByteStreams, Closeables, Files}
+import io.netty.channel.FileRegion
+import io.netty.util.AbstractReferenceCounted
+
+import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.Utils
+import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.network.util.JavaUtils
+import org.apache.spark.security.CryptoStreamUtils
+import org.apache.spark.util.{ByteBufferInputStream, Utils}
 import org.apache.spark.util.io.ChunkedByteBuffer
 
 /**
  * Stores BlockManager blocks on disk.
  */
-private[spark] class DiskStore(conf: SparkConf, diskManager: 
DiskBlockManager) extends Logging {
+private[spark] class DiskStore(
+conf: SparkConf,
+diskManager: DiskBlockManager,
+securityManager: SecurityManager) extends Logging {
 
   private val minMemoryMapBytes = 
conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
+  private val blockSizes = new ConcurrentHashMap[String, Long]()
 
-  def getSize(blockId: BlockId): Long = {
-diskManager.getFile(blockId.name).length
-  }
+  def getSize(blockId: BlockId): Long = blockSizes.get(blockId.name)
 
   /**
* Invokes the provided callback function to write the specific block.
*
* @throws IllegalStateException if the block already exists in the disk 
store.
*/
-  def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = {
+  def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit 
= {
 if (contains(blockId)) {
   throw new IllegalStateException(s"Block $blockId is already present 
in the disk store")
 }
 logDebug(s"Attempting to put block $blockId")
 val startTime = System.currentTimeMillis
 val file = diskManager.getFile(blockId)
-val fileOutputStream = new FileOutputStream(file)
+val out = new CountingWritableChannel(openForWrite(file))
 var threwException: Boolean = true
 try {
-  writeFunc(fileOutputStream)
+  writeFunc(out)
+  blockSizes.put(blockId.name, out.getCount)
   threwException = false
 } finally {
   try {
-Closeables.close(fileOutputStream, threwException)
+out.close()
+  } catch {
+case ioe: IOException =>
--- End diff --

why this? `threwException` starts with `true`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r107326715
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1065,7 +1084,7 @@ private[spark] class BlockManager(
   try {
 replicate(blockId, bytesToReplicate, level, remoteClassTag)
   } finally {
-bytesToReplicate.unmap()
+bytesToReplicate.dispose()
--- End diff --

why change `unmap` to `dispose`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r107324613
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala ---
@@ -31,17 +35,31 @@ import org.apache.spark.util.io.ChunkedByteBuffer
 private[storage] class BlockManagerManagedBuffer(
 blockInfoManager: BlockInfoManager,
 blockId: BlockId,
-chunkedBuffer: ChunkedByteBuffer) extends 
NettyManagedBuffer(chunkedBuffer.toNetty) {
+data: BlockData,
+dispose: Boolean) extends ManagedBuffer {
--- End diff --

`needDispose` may be a better name


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r107324480
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala ---
@@ -31,17 +35,31 @@ import org.apache.spark.util.io.ChunkedByteBuffer
 private[storage] class BlockManagerManagedBuffer(
 blockInfoManager: BlockInfoManager,
 blockId: BlockId,
-chunkedBuffer: ChunkedByteBuffer) extends 
NettyManagedBuffer(chunkedBuffer.toNetty) {
+data: BlockData,
+dispose: Boolean) extends ManagedBuffer {
+
+  private val refCount = new AtomicInteger(1)
--- End diff --

maybe we should mention it in the class doc that the `BlockData` will be 
disposed automatically via reference count.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r107324269
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala ---
@@ -17,11 +17,15 @@
 
 package org.apache.spark.storage
 
-import org.apache.spark.network.buffer.{ManagedBuffer, NettyManagedBuffer}
+import java.io.InputStream
+import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.spark.network.buffer.ManagedBuffer
 import org.apache.spark.util.io.ChunkedByteBuffer
 
 /**
- * This [[ManagedBuffer]] wraps a [[ChunkedByteBuffer]] retrieved from the 
[[BlockManager]]
+ * This [[ManagedBuffer]] wraps a ManagedBuffer retrieved from the 
[[BlockManager]]
--- End diff --

`wraps a [[BlockData]]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r107323983
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -56,6 +57,44 @@ private[spark] class BlockResult(
 val bytes: Long)
 
 /**
+ * Abstracts away how blocks are stored and provides different ways to 
read the underlying block
+ * data. Callers should call [[dispose()]] when they're done with the 
block.
+ */
+private[spark] trait BlockData {
+
+  def toInputStream(): InputStream
+
+  def toNetty(): Object
+
+  def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer
+
+  def toByteBuffer(): ByteBuffer
--- End diff --

it will be great to add some document for these 4 methods about when they 
will be called.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r107323833
  
--- Diff: 
core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala ---
@@ -167,30 +167,26 @@ private[spark] class SerializerManager(
 val byteStream = new BufferedOutputStream(outputStream)
 val autoPick = !blockId.isInstanceOf[StreamBlockId]
 val ser = getSerializer(implicitly[ClassTag[T]], 
autoPick).newInstance()
-ser.serializeStream(wrapStream(blockId, 
byteStream)).writeAll(values).close()
--- End diff --

the `wrapStream` and `wrapForEncryption` methods can be removed from this 
class


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r107323552
  
--- Diff: 
core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
@@ -48,12 +51,30 @@ private[spark] object CryptoStreamUtils extends Logging 
{
   os: OutputStream,
   sparkConf: SparkConf,
   key: Array[Byte]): OutputStream = {
-val properties = toCryptoConf(sparkConf)
-val iv = createInitializationVector(properties)
+val params = new CryptoParams(key, sparkConf)
+val iv = createInitializationVector(params.conf)
 os.write(iv)
-val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION)
-new CryptoOutputStream(transformationStr, properties, os,
-  new SecretKeySpec(key, "AES"), new IvParameterSpec(iv))
+new CryptoOutputStream(params.transformation, params.conf, os, 
params.keySpec,
+  new IvParameterSpec(iv))
+  }
+
+  /**
+   * Wrap a `WritableByteChannel` for encryption.
+   */
+  def createWritableChannel(
+  channel: WritableByteChannel,
+  sparkConf: SparkConf,
+  key: Array[Byte]): WritableByteChannel = {
+val params = new CryptoParams(key, sparkConf)
+val iv = createInitializationVector(params.conf)
+val buf = ByteBuffer.wrap(iv)
+while (buf.hasRemaining()) {
--- End diff --

actually this logic is same with `CryptoHelperChannel`. Shall we create 
`CryptoHelperChannel` first and simply call `helper.write(buf)` here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r107323246
  
--- Diff: 
core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
@@ -63,12 +84,27 @@ private[spark] object CryptoStreamUtils extends Logging 
{
   is: InputStream,
   sparkConf: SparkConf,
   key: Array[Byte]): InputStream = {
-val properties = toCryptoConf(sparkConf)
 val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
-is.read(iv, 0, iv.length)
-val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION)
-new CryptoInputStream(transformationStr, properties, is,
-  new SecretKeySpec(key, "AES"), new IvParameterSpec(iv))
+ByteStreams.readFully(is, iv)
+val params = new CryptoParams(key, sparkConf)
+new CryptoInputStream(params.transformation, params.conf, is, 
params.keySpec,
+  new IvParameterSpec(iv))
+  }
+
+  /**
+   * Wrap a `ReadableByteChannel` for decryption.
+   */
+  def createReadableChannel(
+  channel: ReadableByteChannel,
+  sparkConf: SparkConf,
+  key: Array[Byte]): ReadableByteChannel = {
+val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
+val buf = ByteBuffer.wrap(iv)
+JavaUtils.readFully(channel, buf)
--- End diff --

why not use `ByteStreams.readFully`? the `buf` is not used else where


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r107323085
  
--- Diff: 
core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
@@ -48,12 +51,30 @@ private[spark] object CryptoStreamUtils extends Logging 
{
   os: OutputStream,
   sparkConf: SparkConf,
   key: Array[Byte]): OutputStream = {
-val properties = toCryptoConf(sparkConf)
-val iv = createInitializationVector(properties)
+val params = new CryptoParams(key, sparkConf)
+val iv = createInitializationVector(params.conf)
 os.write(iv)
-val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION)
-new CryptoOutputStream(transformationStr, properties, os,
-  new SecretKeySpec(key, "AES"), new IvParameterSpec(iv))
+new CryptoOutputStream(params.transformation, params.conf, os, 
params.keySpec,
+  new IvParameterSpec(iv))
+  }
+
+  /**
+   * Wrap a `WritableByteChannel` for encryption.
+   */
+  def createWritableChannel(
+  channel: WritableByteChannel,
+  sparkConf: SparkConf,
+  key: Array[Byte]): WritableByteChannel = {
+val params = new CryptoParams(key, sparkConf)
+val iv = createInitializationVector(params.conf)
+val buf = ByteBuffer.wrap(iv)
+while (buf.hasRemaining()) {
--- End diff --

is there any possibility this may be an infinite loop?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-21 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r107322905
  
--- Diff: 
core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
@@ -219,18 +219,22 @@ private[spark] class TorrentBroadcast[T: 
ClassTag](obj: T, id: Long)
 case None =>
   logInfo("Started reading broadcast variable " + id)
   val startTimeMs = System.currentTimeMillis()
-  val blocks = readBlocks().flatMap(_.getChunks())
+  val blocks = readBlocks()
   logInfo("Reading broadcast variable " + id + " took" + 
Utils.getUsedTimeMs(startTimeMs))
 
-  val obj = TorrentBroadcast.unBlockifyObject[T](
-blocks, SparkEnv.get.serializer, compressionCodec)
-  // Store the merged copy in BlockManager so other tasks on this 
executor don't
-  // need to re-fetch it.
-  val storageLevel = StorageLevel.MEMORY_AND_DISK
-  if (!blockManager.putSingle(broadcastId, obj, storageLevel, 
tellMaster = false)) {
-throw new SparkException(s"Failed to store $broadcastId in 
BlockManager")
+  try {
+val obj = TorrentBroadcast.unBlockifyObject[T](
+  blocks.map(_.toInputStream()), SparkEnv.get.serializer, 
compressionCodec)
+// Store the merged copy in BlockManager so other tasks on 
this executor don't
+// need to re-fetch it.
+val storageLevel = StorageLevel.MEMORY_AND_DISK
+if (!blockManager.putSingle(broadcastId, obj, storageLevel, 
tellMaster = false)) {
+  throw new SparkException(s"Failed to store $broadcastId in 
BlockManager")
+}
+obj
+  } finally {
+blocks.foreach(_.dispose())
--- End diff --

ah good catch! we should dispose the blocks here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-20 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r107007132
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -56,6 +57,43 @@ private[spark] class BlockResult(
 val bytes: Long)
 
 /**
+ * Abstracts away how blocks are stored and provides different ways to 
read the underlying block
+ * data. The data for a BlockData instance can only be read once, since it 
may be backed by open
+ * file descriptors that change state as data is read.
+ */
+private[spark] trait BlockData {
+
+  def toInputStream(): InputStream
+
+  def toManagedBuffer(): ManagedBuffer
+
+  def toByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer
+
+  def size: Long
+
+  def dispose(): Unit
+
+}
+
+private[spark] class ByteBufferBlockData(
+val buffer: ChunkedByteBuffer,
+autoDispose: Boolean = true) extends BlockData {
+
+  override def toInputStream(): InputStream = buffer.toInputStream(dispose 
= autoDispose)
+
+  override def toManagedBuffer(): ManagedBuffer = new 
NettyManagedBuffer(buffer.toNetty)
+
+  override def toByteBuffer(allocator: Int => ByteBuffer): 
ChunkedByteBuffer = {
+buffer.copy(allocator)
+  }
--- End diff --

So I had traced through that stuff 2 or 3 times, and now I did it again and 
I think I finally understood all that's going on. Basically, the old code was 
really bad at explicitly disposing of the buffers, meaning a bunch of paths 
(like the ones that used managed buffers) didn't bother to do it and just left 
the work to the GC.

I changed the code a bit to make the dispose more explicit and added 
comments in a few key places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-20 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r106963546
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, 
diskManager: DiskBlockManager) e
   }
 
   def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
-put(blockId) { fileOutputStream =>
-  val channel = fileOutputStream.getChannel
-  Utils.tryWithSafeFinally {
-bytes.writeFully(channel)
-  } {
-channel.close()
-  }
+put(blockId) { channel =>
+  bytes.writeFully(channel)
--- End diff --

The channel is owned by the code in the `put` method, which does that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-20 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r106965268
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, 
diskManager: DiskBlockManager) e
   }
 
   def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
-put(blockId) { fileOutputStream =>
-  val channel = fileOutputStream.getChannel
-  Utils.tryWithSafeFinally {
-bytes.writeFully(channel)
-  } {
-channel.close()
-  }
+put(blockId) { channel =>
+  bytes.writeFully(channel)
 }
   }
 
-  def getBytes(blockId: BlockId): ChunkedByteBuffer = {
+  def getBytes(blockId: BlockId): BlockData = {
 val file = diskManager.getFile(blockId.name)
-val channel = new RandomAccessFile(file, "r").getChannel
-Utils.tryWithSafeFinally {
-  // For small files, directly read rather than memory map
-  if (file.length < minMemoryMapBytes) {
-val buf = ByteBuffer.allocate(file.length.toInt)
-channel.position(0)
-while (buf.remaining() != 0) {
-  if (channel.read(buf) == -1) {
-throw new IOException("Reached EOF before filling buffer\n" +
-  
s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
+val blockSize = getSize(blockId)
+
+securityManager.getIOEncryptionKey() match {
+  case Some(key) =>
+// Encrypted blocks cannot be memory mapped; return a special 
object that does decryption
+// and provides InputStream / FileRegion implementations for 
reading the data.
+new EncryptedBlockData(file, blockSize, conf, key)
+
+  case _ =>
+val channel = new FileInputStream(file).getChannel()
+if (blockSize < minMemoryMapBytes) {
+  // For small files, directly read rather than memory map.
+  Utils.tryWithSafeFinally {
+val buf = ByteBuffer.allocate(blockSize.toInt)
+while (buf.remaining() > 0) {
+  channel.read(buf)
+}
+buf.flip()
+new ByteBufferBlockData(new ChunkedByteBuffer(buf))
+  } {
+channel.close()
+  }
+} else {
+  Utils.tryWithSafeFinally {
+new ByteBufferBlockData(
+  new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, 
file.length)))
+  } {
+channel.close()
   }
 }
-buf.flip()
-new ChunkedByteBuffer(buf)
-  } else {
-new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, 
file.length))
-  }
-} {
-  channel.close()
 }
   }
 
   def remove(blockId: BlockId): Boolean = {
 val file = diskManager.getFile(blockId.name)
-if (file.exists()) {
-  val ret = file.delete()
-  if (!ret) {
-logWarning(s"Error deleting ${file.getPath()}")
+val meta = diskManager.getMetadataFile(blockId)
+
+def delete(f: File): Boolean = {
+  if (f.exists()) {
+val ret = f.delete()
+if (!ret) {
+  logWarning(s"Error deleting ${file.getPath()}")
+}
+
+ret
+  } else {
+false
   }
-  ret
-} else {
-  false
 }
+
+delete(file) & delete(meta)
   }
 
   def contains(blockId: BlockId): Boolean = {
 val file = diskManager.getFile(blockId.name)
 file.exists()
   }
+
+  private def openForWrite(file: File): WritableByteChannel = {
+val out = new FileOutputStream(file).getChannel()
+try {
+  securityManager.getIOEncryptionKey().map { key =>
+CryptoStreamUtils.createWritableChannel(out, conf, key)
+  }.getOrElse(out)
+} catch {
+  case e: Exception =>
+out.close()
+throw e
+}
+  }
+
+}
+
+private class EncryptedBlockData(
+file: File,
+blockSize: Long,
+conf: SparkConf,
+key: Array[Byte]) extends BlockData {
+
+  override def toInputStream(): InputStream = 
Channels.newInputStream(open())
+
+  override def toManagedBuffer(): ManagedBuffer = new 
EncryptedManagedBuffer()
+
+  override def toByteBuffer(allocator: Int => ByteBuffer): 
ChunkedByteBuffer = {
+val source = open()
+try {
+  var remaining = blockSize
+  val chunks = new ListBuffer[ByteBuffer]()
+  while (remaining > 0) {
+val chunkSize = math.min(remaining, Int.MaxValue)
+val chunk = 

[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-20 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r106964049
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, 
diskManager: DiskBlockManager) e
   }
 
   def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
-put(blockId) { fileOutputStream =>
-  val channel = fileOutputStream.getChannel
-  Utils.tryWithSafeFinally {
-bytes.writeFully(channel)
-  } {
-channel.close()
-  }
+put(blockId) { channel =>
+  bytes.writeFully(channel)
 }
   }
 
-  def getBytes(blockId: BlockId): ChunkedByteBuffer = {
+  def getBytes(blockId: BlockId): BlockData = {
 val file = diskManager.getFile(blockId.name)
-val channel = new RandomAccessFile(file, "r").getChannel
-Utils.tryWithSafeFinally {
-  // For small files, directly read rather than memory map
-  if (file.length < minMemoryMapBytes) {
-val buf = ByteBuffer.allocate(file.length.toInt)
-channel.position(0)
-while (buf.remaining() != 0) {
-  if (channel.read(buf) == -1) {
-throw new IOException("Reached EOF before filling buffer\n" +
-  
s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
+val blockSize = getSize(blockId)
+
+securityManager.getIOEncryptionKey() match {
+  case Some(key) =>
+// Encrypted blocks cannot be memory mapped; return a special 
object that does decryption
+// and provides InputStream / FileRegion implementations for 
reading the data.
+new EncryptedBlockData(file, blockSize, conf, key)
+
+  case _ =>
+val channel = new FileInputStream(file).getChannel()
+if (blockSize < minMemoryMapBytes) {
+  // For small files, directly read rather than memory map.
+  Utils.tryWithSafeFinally {
+val buf = ByteBuffer.allocate(blockSize.toInt)
+while (buf.remaining() > 0) {
+  channel.read(buf)
+}
+buf.flip()
+new ByteBufferBlockData(new ChunkedByteBuffer(buf))
+  } {
+channel.close()
+  }
+} else {
+  Utils.tryWithSafeFinally {
+new ByteBufferBlockData(
+  new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, 
file.length)))
+  } {
+channel.close()
   }
 }
-buf.flip()
-new ChunkedByteBuffer(buf)
-  } else {
-new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, 
file.length))
-  }
-} {
-  channel.close()
 }
   }
 
   def remove(blockId: BlockId): Boolean = {
 val file = diskManager.getFile(blockId.name)
-if (file.exists()) {
-  val ret = file.delete()
-  if (!ret) {
-logWarning(s"Error deleting ${file.getPath()}")
+val meta = diskManager.getMetadataFile(blockId)
+
+def delete(f: File): Boolean = {
+  if (f.exists()) {
+val ret = f.delete()
+if (!ret) {
+  logWarning(s"Error deleting ${file.getPath()}")
+}
+
+ret
+  } else {
+false
   }
-  ret
-} else {
-  false
 }
+
+delete(file) & delete(meta)
   }
 
   def contains(blockId: BlockId): Boolean = {
 val file = diskManager.getFile(blockId.name)
 file.exists()
   }
+
+  private def openForWrite(file: File): WritableByteChannel = {
+val out = new FileOutputStream(file).getChannel()
+try {
+  securityManager.getIOEncryptionKey().map { key =>
+CryptoStreamUtils.createWritableChannel(out, conf, key)
+  }.getOrElse(out)
+} catch {
+  case e: Exception =>
+out.close()
--- End diff --

There might be exceptions specific for the commons-crypto library being 
thrown.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-20 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r106962403
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -17,48 +17,61 @@
 
 package org.apache.spark.storage
 
-import java.io.{FileOutputStream, IOException, RandomAccessFile}
+import java.io._
 import java.nio.ByteBuffer
+import java.nio.channels.{Channels, ReadableByteChannel, 
WritableByteChannel}
 import java.nio.channels.FileChannel.MapMode
+import java.nio.charset.StandardCharsets.UTF_8
 
-import com.google.common.io.Closeables
+import scala.collection.mutable.ListBuffer
 
-import org.apache.spark.SparkConf
+import com.google.common.io.{ByteStreams, Closeables, Files}
+import io.netty.channel.FileRegion
+import io.netty.util.AbstractReferenceCounted
+
+import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.Utils
+import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.security.CryptoStreamUtils
+import org.apache.spark.util.{ByteBufferInputStream, Utils}
 import org.apache.spark.util.io.ChunkedByteBuffer
 
 /**
  * Stores BlockManager blocks on disk.
  */
-private[spark] class DiskStore(conf: SparkConf, diskManager: 
DiskBlockManager) extends Logging {
+private[spark] class DiskStore(
+conf: SparkConf,
+diskManager: DiskBlockManager,
+securityManager: SecurityManager) extends Logging {
 
   private val minMemoryMapBytes = 
conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
 
   def getSize(blockId: BlockId): Long = {
-diskManager.getFile(blockId.name).length
+val file = diskManager.getMetadataFile(blockId)
+Files.toString(file, UTF_8).toLong
   }
 
   /**
* Invokes the provided callback function to write the specific block.
*
* @throws IllegalStateException if the block already exists in the disk 
store.
*/
-  def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = {
+  def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit 
= {
 if (contains(blockId)) {
   throw new IllegalStateException(s"Block $blockId is already present 
in the disk store")
 }
 logDebug(s"Attempting to put block $blockId")
 val startTime = System.currentTimeMillis
 val file = diskManager.getFile(blockId)
-val fileOutputStream = new FileOutputStream(file)
+val out = new CountingWritableChannel(openForWrite(file))
 var threwException: Boolean = true
 try {
-  writeFunc(fileOutputStream)
+  writeFunc(out)
+  Files.write(out.getCount().toString(), 
diskManager.getMetadataFile(blockId), UTF_8)
   threwException = false
 } finally {
   try {
-Closeables.close(fileOutputStream, threwException)
+Closeables.close(out, threwException)
--- End diff --

This was the previous behavior, but well, doesn't hurt to fix it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-20 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r106962007
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
@@ -34,6 +34,8 @@ import org.apache.spark.util.{ShutdownHookManager, Utils}
  */
 private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: 
Boolean) extends Logging {
 
+  private val METADATA_FILE_SUFFIX = ".meta"
--- End diff --

Hmm, good point... there's currently no metadata kept in the `DiskStore` 
class, but then this shouldn't be a lot of data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-20 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r106961480
  
--- Diff: 
core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
@@ -102,4 +150,34 @@ private[spark] object CryptoStreamUtils extends 
Logging {
 }
 iv
   }
+
+  /**
+   * This class is a workaround for CRYPTO-125, that forces all bytes to 
be written to the
--- End diff --

There's a pretty nasty workaround for it in the network library... (the 
non-blocking workaround is a lot worse than this.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r106063310
  
--- Diff: 
core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
@@ -48,12 +50,30 @@ private[spark] object CryptoStreamUtils extends Logging 
{
   os: OutputStream,
   sparkConf: SparkConf,
   key: Array[Byte]): OutputStream = {
-val properties = toCryptoConf(sparkConf)
-val iv = createInitializationVector(properties)
+val params = new CryptoParams(key, sparkConf)
+val iv = createInitializationVector(params.conf)
 os.write(iv)
-val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION)
-new CryptoOutputStream(transformationStr, properties, os,
-  new SecretKeySpec(key, "AES"), new IvParameterSpec(iv))
+new CryptoOutputStream(params.transformation, params.conf, os, 
params.keySpec,
+  new IvParameterSpec(iv))
+  }
+
+  /**
+   * Wrap a `WritableByteChannel` for encryption.
+   */
+  def createWritableChannel(
+  channel: WritableByteChannel,
+  sparkConf: SparkConf,
+  key: Array[Byte]): WritableByteChannel = {
+val params = new CryptoParams(key, sparkConf)
+val iv = createInitializationVector(params.conf)
+val buf = ByteBuffer.wrap(iv)
+while (buf.remaining() > 0) {
--- End diff --

nit: buf.hasRemaining for this pattern of use


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r106779317
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, 
diskManager: DiskBlockManager) e
   }
 
   def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
-put(blockId) { fileOutputStream =>
-  val channel = fileOutputStream.getChannel
-  Utils.tryWithSafeFinally {
-bytes.writeFully(channel)
-  } {
-channel.close()
-  }
+put(blockId) { channel =>
+  bytes.writeFully(channel)
 }
   }
 
-  def getBytes(blockId: BlockId): ChunkedByteBuffer = {
+  def getBytes(blockId: BlockId): BlockData = {
 val file = diskManager.getFile(blockId.name)
-val channel = new RandomAccessFile(file, "r").getChannel
-Utils.tryWithSafeFinally {
-  // For small files, directly read rather than memory map
-  if (file.length < minMemoryMapBytes) {
-val buf = ByteBuffer.allocate(file.length.toInt)
-channel.position(0)
-while (buf.remaining() != 0) {
-  if (channel.read(buf) == -1) {
-throw new IOException("Reached EOF before filling buffer\n" +
-  
s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
+val blockSize = getSize(blockId)
+
+securityManager.getIOEncryptionKey() match {
+  case Some(key) =>
+// Encrypted blocks cannot be memory mapped; return a special 
object that does decryption
+// and provides InputStream / FileRegion implementations for 
reading the data.
+new EncryptedBlockData(file, blockSize, conf, key)
+
+  case _ =>
+val channel = new FileInputStream(file).getChannel()
+if (blockSize < minMemoryMapBytes) {
+  // For small files, directly read rather than memory map.
+  Utils.tryWithSafeFinally {
+val buf = ByteBuffer.allocate(blockSize.toInt)
+while (buf.remaining() > 0) {
+  channel.read(buf)
--- End diff --

We need to handle case where read() returns EOF (-1) in case of data 
corruption, file removal from underneath, etc : we will end up in infinite loop 
otherwise.

I might have missed more places where this pattern exists in this change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r106778005
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -56,6 +57,43 @@ private[spark] class BlockResult(
 val bytes: Long)
 
 /**
+ * Abstracts away how blocks are stored and provides different ways to 
read the underlying block
+ * data. The data for a BlockData instance can only be read once, since it 
may be backed by open
+ * file descriptors that change state as data is read.
+ */
+private[spark] trait BlockData {
+
+  def toInputStream(): InputStream
+
+  def toManagedBuffer(): ManagedBuffer
+
+  def toByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer
+
+  def size: Long
+
+  def dispose(): Unit
+
+}
+
+private[spark] class ByteBufferBlockData(
+val buffer: ChunkedByteBuffer,
+autoDispose: Boolean = true) extends BlockData {
+
+  override def toInputStream(): InputStream = buffer.toInputStream(dispose 
= autoDispose)
+
+  override def toManagedBuffer(): ManagedBuffer = new 
NettyManagedBuffer(buffer.toNetty)
+
+  override def toByteBuffer(allocator: Int => ByteBuffer): 
ChunkedByteBuffer = {
+buffer.copy(allocator)
+  }
--- End diff --

autoDispose is not honored for toManagedBuffer and toByteBuffer ?
On first pass, it looks like it is not ...

Also, is the expectation that invoker must manually invoke dispose when not 
using toInputStream ?
Would be good to add a comment about this to BlockData trait detailing the 
expectation.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r106778650
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
@@ -79,6 +81,11 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
deleteFilesOnStop: Boolea
 
   def getFile(blockId: BlockId): File = getFile(blockId.name)
 
+  /** The path of the metadata file for the given block. */
+  def getMetadataFile(blockId: BlockId): File = {
+new File(getFile(blockId).getAbsolutePath() + METADATA_FILE_SUFFIX)
--- End diff --

Would be good to add a note that actual filename for metadata does not 
match directory hashing used (but depends on the source block).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r106778688
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -17,48 +17,61 @@
 
 package org.apache.spark.storage
 
-import java.io.{FileOutputStream, IOException, RandomAccessFile}
+import java.io._
 import java.nio.ByteBuffer
+import java.nio.channels.{Channels, ReadableByteChannel, 
WritableByteChannel}
 import java.nio.channels.FileChannel.MapMode
+import java.nio.charset.StandardCharsets.UTF_8
 
-import com.google.common.io.Closeables
+import scala.collection.mutable.ListBuffer
 
-import org.apache.spark.SparkConf
+import com.google.common.io.{ByteStreams, Closeables, Files}
+import io.netty.channel.FileRegion
+import io.netty.util.AbstractReferenceCounted
+
+import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.Utils
+import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.security.CryptoStreamUtils
+import org.apache.spark.util.{ByteBufferInputStream, Utils}
 import org.apache.spark.util.io.ChunkedByteBuffer
 
 /**
  * Stores BlockManager blocks on disk.
  */
-private[spark] class DiskStore(conf: SparkConf, diskManager: 
DiskBlockManager) extends Logging {
+private[spark] class DiskStore(
+conf: SparkConf,
+diskManager: DiskBlockManager,
+securityManager: SecurityManager) extends Logging {
 
   private val minMemoryMapBytes = 
conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
 
   def getSize(blockId: BlockId): Long = {
-diskManager.getFile(blockId.name).length
+val file = diskManager.getMetadataFile(blockId)
+Files.toString(file, UTF_8).toLong
--- End diff --

Metadata file should be used only when required - otherwise we should avoid 
their use.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r106778760
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, 
diskManager: DiskBlockManager) e
   }
 
   def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
-put(blockId) { fileOutputStream =>
-  val channel = fileOutputStream.getChannel
-  Utils.tryWithSafeFinally {
-bytes.writeFully(channel)
-  } {
-channel.close()
-  }
+put(blockId) { channel =>
+  bytes.writeFully(channel)
 }
   }
 
-  def getBytes(blockId: BlockId): ChunkedByteBuffer = {
+  def getBytes(blockId: BlockId): BlockData = {
 val file = diskManager.getFile(blockId.name)
-val channel = new RandomAccessFile(file, "r").getChannel
-Utils.tryWithSafeFinally {
-  // For small files, directly read rather than memory map
-  if (file.length < minMemoryMapBytes) {
-val buf = ByteBuffer.allocate(file.length.toInt)
-channel.position(0)
-while (buf.remaining() != 0) {
-  if (channel.read(buf) == -1) {
-throw new IOException("Reached EOF before filling buffer\n" +
-  
s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
+val blockSize = getSize(blockId)
+
+securityManager.getIOEncryptionKey() match {
+  case Some(key) =>
+// Encrypted blocks cannot be memory mapped; return a special 
object that does decryption
+// and provides InputStream / FileRegion implementations for 
reading the data.
+new EncryptedBlockData(file, blockSize, conf, key)
+
+  case _ =>
+val channel = new FileInputStream(file).getChannel()
+if (blockSize < minMemoryMapBytes) {
+  // For small files, directly read rather than memory map.
+  Utils.tryWithSafeFinally {
+val buf = ByteBuffer.allocate(blockSize.toInt)
+while (buf.remaining() > 0) {
+  channel.read(buf)
+}
+buf.flip()
+new ByteBufferBlockData(new ChunkedByteBuffer(buf))
+  } {
+channel.close()
+  }
+} else {
+  Utils.tryWithSafeFinally {
+new ByteBufferBlockData(
+  new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, 
file.length)))
+  } {
+channel.close()
   }
 }
-buf.flip()
-new ChunkedByteBuffer(buf)
-  } else {
-new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, 
file.length))
-  }
-} {
-  channel.close()
 }
   }
 
   def remove(blockId: BlockId): Boolean = {
 val file = diskManager.getFile(blockId.name)
-if (file.exists()) {
-  val ret = file.delete()
-  if (!ret) {
-logWarning(s"Error deleting ${file.getPath()}")
+val meta = diskManager.getMetadataFile(blockId)
+
+def delete(f: File): Boolean = {
+  if (f.exists()) {
+val ret = f.delete()
+if (!ret) {
+  logWarning(s"Error deleting ${file.getPath()}")
+}
+
+ret
+  } else {
+false
   }
-  ret
-} else {
-  false
 }
+
+delete(file) & delete(meta)
   }
 
   def contains(blockId: BlockId): Boolean = {
 val file = diskManager.getFile(blockId.name)
 file.exists()
   }
+
+  private def openForWrite(file: File): WritableByteChannel = {
+val out = new FileOutputStream(file).getChannel()
+try {
+  securityManager.getIOEncryptionKey().map { key =>
+CryptoStreamUtils.createWritableChannel(out, conf, key)
+  }.getOrElse(out)
+} catch {
+  case e: Exception =>
+out.close()
--- End diff --

Other than IOException, what other exceptions are being thrown here ? Would 
be better to catch specific exceptions. At the least, change it to NonFatal if 
which exceptions thrown are unknown.

Also, we would need to delete file here in addition to closing the channel.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r106268712
  
--- Diff: 
core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
@@ -63,12 +83,40 @@ private[spark] object CryptoStreamUtils extends Logging 
{
   is: InputStream,
   sparkConf: SparkConf,
   key: Array[Byte]): InputStream = {
-val properties = toCryptoConf(sparkConf)
 val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
-is.read(iv, 0, iv.length)
-val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION)
-new CryptoInputStream(transformationStr, properties, is,
-  new SecretKeySpec(key, "AES"), new IvParameterSpec(iv))
+var read = 0
+while (read < iv.length) {
+  val _read = is.read(iv, 0, iv.length)
+  if (_read < 0) {
+throw new EOFException("Failed to read IV from stream.")
+  }
+  read += _read
+}
+
+val params = new CryptoParams(key, sparkConf)
+new CryptoInputStream(params.transformation, params.conf, is, 
params.keySpec,
+  new IvParameterSpec(iv))
+  }
+
+  /**
+   * Wrap a `ReadableByteChannel` for decryption.
+   */
+  def createReadableChannel(
+  channel: ReadableByteChannel,
+  sparkConf: SparkConf,
+  key: Array[Byte]): ReadableByteChannel = {
+val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
+val buf = ByteBuffer.wrap(iv)
+buf.clear()
--- End diff --

nit: The clear is not required.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r106264689
  
--- Diff: 
core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
@@ -63,12 +83,40 @@ private[spark] object CryptoStreamUtils extends Logging 
{
   is: InputStream,
   sparkConf: SparkConf,
   key: Array[Byte]): InputStream = {
-val properties = toCryptoConf(sparkConf)
 val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
-is.read(iv, 0, iv.length)
-val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION)
-new CryptoInputStream(transformationStr, properties, is,
-  new SecretKeySpec(key, "AES"), new IvParameterSpec(iv))
+var read = 0
+while (read < iv.length) {
+  val _read = is.read(iv, 0, iv.length)
+  if (_read < 0) {
+throw new EOFException("Failed to read IV from stream.")
+  }
+  read += _read
+}
--- End diff --

ByteStreams.readFully instead of the loop


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r106778914
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -17,48 +17,61 @@
 
 package org.apache.spark.storage
 
-import java.io.{FileOutputStream, IOException, RandomAccessFile}
+import java.io._
 import java.nio.ByteBuffer
+import java.nio.channels.{Channels, ReadableByteChannel, 
WritableByteChannel}
 import java.nio.channels.FileChannel.MapMode
+import java.nio.charset.StandardCharsets.UTF_8
 
-import com.google.common.io.Closeables
+import scala.collection.mutable.ListBuffer
 
-import org.apache.spark.SparkConf
+import com.google.common.io.{ByteStreams, Closeables, Files}
+import io.netty.channel.FileRegion
+import io.netty.util.AbstractReferenceCounted
+
+import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.Utils
+import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.security.CryptoStreamUtils
+import org.apache.spark.util.{ByteBufferInputStream, Utils}
 import org.apache.spark.util.io.ChunkedByteBuffer
 
 /**
  * Stores BlockManager blocks on disk.
  */
-private[spark] class DiskStore(conf: SparkConf, diskManager: 
DiskBlockManager) extends Logging {
+private[spark] class DiskStore(
+conf: SparkConf,
+diskManager: DiskBlockManager,
+securityManager: SecurityManager) extends Logging {
 
   private val minMemoryMapBytes = 
conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
 
   def getSize(blockId: BlockId): Long = {
-diskManager.getFile(blockId.name).length
+val file = diskManager.getMetadataFile(blockId)
+Files.toString(file, UTF_8).toLong
   }
 
   /**
* Invokes the provided callback function to write the specific block.
*
* @throws IllegalStateException if the block already exists in the disk 
store.
*/
-  def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = {
+  def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit 
= {
 if (contains(blockId)) {
   throw new IllegalStateException(s"Block $blockId is already present 
in the disk store")
 }
 logDebug(s"Attempting to put block $blockId")
 val startTime = System.currentTimeMillis
 val file = diskManager.getFile(blockId)
-val fileOutputStream = new FileOutputStream(file)
+val out = new CountingWritableChannel(openForWrite(file))
 var threwException: Boolean = true
 try {
-  writeFunc(fileOutputStream)
+  writeFunc(out)
+  Files.write(out.getCount().toString(), 
diskManager.getMetadataFile(blockId), UTF_8)
--- End diff --

Is there any reason to keep this in string form ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r106778813
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, 
diskManager: DiskBlockManager) e
   }
 
   def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
-put(blockId) { fileOutputStream =>
-  val channel = fileOutputStream.getChannel
-  Utils.tryWithSafeFinally {
-bytes.writeFully(channel)
-  } {
-channel.close()
-  }
+put(blockId) { channel =>
+  bytes.writeFully(channel)
 }
   }
 
-  def getBytes(blockId: BlockId): ChunkedByteBuffer = {
+  def getBytes(blockId: BlockId): BlockData = {
 val file = diskManager.getFile(blockId.name)
-val channel = new RandomAccessFile(file, "r").getChannel
-Utils.tryWithSafeFinally {
-  // For small files, directly read rather than memory map
-  if (file.length < minMemoryMapBytes) {
-val buf = ByteBuffer.allocate(file.length.toInt)
-channel.position(0)
-while (buf.remaining() != 0) {
-  if (channel.read(buf) == -1) {
-throw new IOException("Reached EOF before filling buffer\n" +
-  
s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
+val blockSize = getSize(blockId)
+
+securityManager.getIOEncryptionKey() match {
+  case Some(key) =>
+// Encrypted blocks cannot be memory mapped; return a special 
object that does decryption
+// and provides InputStream / FileRegion implementations for 
reading the data.
+new EncryptedBlockData(file, blockSize, conf, key)
+
+  case _ =>
+val channel = new FileInputStream(file).getChannel()
+if (blockSize < minMemoryMapBytes) {
+  // For small files, directly read rather than memory map.
+  Utils.tryWithSafeFinally {
+val buf = ByteBuffer.allocate(blockSize.toInt)
+while (buf.remaining() > 0) {
+  channel.read(buf)
+}
+buf.flip()
+new ByteBufferBlockData(new ChunkedByteBuffer(buf))
+  } {
+channel.close()
+  }
+} else {
+  Utils.tryWithSafeFinally {
+new ByteBufferBlockData(
+  new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, 
file.length)))
+  } {
+channel.close()
   }
 }
-buf.flip()
-new ChunkedByteBuffer(buf)
-  } else {
-new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, 
file.length))
-  }
-} {
-  channel.close()
 }
   }
 
   def remove(blockId: BlockId): Boolean = {
 val file = diskManager.getFile(blockId.name)
-if (file.exists()) {
-  val ret = file.delete()
-  if (!ret) {
-logWarning(s"Error deleting ${file.getPath()}")
+val meta = diskManager.getMetadataFile(blockId)
+
+def delete(f: File): Boolean = {
+  if (f.exists()) {
+val ret = f.delete()
+if (!ret) {
+  logWarning(s"Error deleting ${file.getPath()}")
+}
+
+ret
+  } else {
+false
   }
-  ret
-} else {
-  false
 }
+
+delete(file) & delete(meta)
   }
 
   def contains(blockId: BlockId): Boolean = {
 val file = diskManager.getFile(blockId.name)
 file.exists()
   }
+
+  private def openForWrite(file: File): WritableByteChannel = {
+val out = new FileOutputStream(file).getChannel()
+try {
+  securityManager.getIOEncryptionKey().map { key =>
+CryptoStreamUtils.createWritableChannel(out, conf, key)
+  }.getOrElse(out)
+} catch {
+  case e: Exception =>
+out.close()
+throw e
+}
+  }
+
+}
+
+private class EncryptedBlockData(
+file: File,
+blockSize: Long,
+conf: SparkConf,
+key: Array[Byte]) extends BlockData {
+
+  override def toInputStream(): InputStream = 
Channels.newInputStream(open())
+
+  override def toManagedBuffer(): ManagedBuffer = new 
EncryptedManagedBuffer()
+
+  override def toByteBuffer(allocator: Int => ByteBuffer): 
ChunkedByteBuffer = {
+val source = open()
+try {
+  var remaining = blockSize
+  val chunks = new ListBuffer[ByteBuffer]()
+  while (remaining > 0) {
+val chunkSize = math.min(remaining, Int.MaxValue)
+val chunk = 

[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r106778932
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, 
diskManager: DiskBlockManager) e
   }
 
   def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
-put(blockId) { fileOutputStream =>
-  val channel = fileOutputStream.getChannel
-  Utils.tryWithSafeFinally {
-bytes.writeFully(channel)
-  } {
-channel.close()
-  }
+put(blockId) { channel =>
+  bytes.writeFully(channel)
--- End diff --

Utils.tryWithSafeFinally with close is required ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r106779457
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, 
diskManager: DiskBlockManager) e
   }
 
   def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
-put(blockId) { fileOutputStream =>
-  val channel = fileOutputStream.getChannel
-  Utils.tryWithSafeFinally {
-bytes.writeFully(channel)
-  } {
-channel.close()
-  }
+put(blockId) { channel =>
+  bytes.writeFully(channel)
 }
   }
 
-  def getBytes(blockId: BlockId): ChunkedByteBuffer = {
+  def getBytes(blockId: BlockId): BlockData = {
 val file = diskManager.getFile(blockId.name)
-val channel = new RandomAccessFile(file, "r").getChannel
-Utils.tryWithSafeFinally {
-  // For small files, directly read rather than memory map
-  if (file.length < minMemoryMapBytes) {
-val buf = ByteBuffer.allocate(file.length.toInt)
-channel.position(0)
-while (buf.remaining() != 0) {
-  if (channel.read(buf) == -1) {
-throw new IOException("Reached EOF before filling buffer\n" +
-  
s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
+val blockSize = getSize(blockId)
+
+securityManager.getIOEncryptionKey() match {
+  case Some(key) =>
+// Encrypted blocks cannot be memory mapped; return a special 
object that does decryption
+// and provides InputStream / FileRegion implementations for 
reading the data.
+new EncryptedBlockData(file, blockSize, conf, key)
+
+  case _ =>
+val channel = new FileInputStream(file).getChannel()
+if (blockSize < minMemoryMapBytes) {
+  // For small files, directly read rather than memory map.
+  Utils.tryWithSafeFinally {
+val buf = ByteBuffer.allocate(blockSize.toInt)
+while (buf.remaining() > 0) {
+  channel.read(buf)
+}
+buf.flip()
+new ByteBufferBlockData(new ChunkedByteBuffer(buf))
+  } {
+channel.close()
+  }
+} else {
+  Utils.tryWithSafeFinally {
+new ByteBufferBlockData(
+  new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, 
file.length)))
+  } {
+channel.close()
   }
 }
-buf.flip()
-new ChunkedByteBuffer(buf)
-  } else {
-new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, 
file.length))
-  }
-} {
-  channel.close()
 }
   }
 
   def remove(blockId: BlockId): Boolean = {
 val file = diskManager.getFile(blockId.name)
-if (file.exists()) {
-  val ret = file.delete()
-  if (!ret) {
-logWarning(s"Error deleting ${file.getPath()}")
+val meta = diskManager.getMetadataFile(blockId)
+
+def delete(f: File): Boolean = {
+  if (f.exists()) {
+val ret = f.delete()
+if (!ret) {
+  logWarning(s"Error deleting ${file.getPath()}")
+}
+
+ret
+  } else {
+false
   }
-  ret
-} else {
-  false
 }
+
+delete(file) & delete(meta)
   }
 
   def contains(blockId: BlockId): Boolean = {
 val file = diskManager.getFile(blockId.name)
 file.exists()
   }
+
+  private def openForWrite(file: File): WritableByteChannel = {
+val out = new FileOutputStream(file).getChannel()
+try {
+  securityManager.getIOEncryptionKey().map { key =>
+CryptoStreamUtils.createWritableChannel(out, conf, key)
+  }.getOrElse(out)
+} catch {
+  case e: Exception =>
+out.close()
+throw e
+}
+  }
+
+}
+
+private class EncryptedBlockData(
+file: File,
+blockSize: Long,
+conf: SparkConf,
+key: Array[Byte]) extends BlockData {
+
+  override def toInputStream(): InputStream = 
Channels.newInputStream(open())
+
+  override def toManagedBuffer(): ManagedBuffer = new 
EncryptedManagedBuffer()
+
+  override def toByteBuffer(allocator: Int => ByteBuffer): 
ChunkedByteBuffer = {
+val source = open()
+try {
+  var remaining = blockSize
+  val chunks = new ListBuffer[ByteBuffer]()
+  while (remaining > 0) {
+val chunkSize = math.min(remaining, Int.MaxValue)
+val chunk = 

[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r106779213
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -17,48 +17,61 @@
 
 package org.apache.spark.storage
 
-import java.io.{FileOutputStream, IOException, RandomAccessFile}
+import java.io._
 import java.nio.ByteBuffer
+import java.nio.channels.{Channels, ReadableByteChannel, 
WritableByteChannel}
 import java.nio.channels.FileChannel.MapMode
+import java.nio.charset.StandardCharsets.UTF_8
 
-import com.google.common.io.Closeables
+import scala.collection.mutable.ListBuffer
 
-import org.apache.spark.SparkConf
+import com.google.common.io.{ByteStreams, Closeables, Files}
+import io.netty.channel.FileRegion
+import io.netty.util.AbstractReferenceCounted
+
+import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.Utils
+import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.security.CryptoStreamUtils
+import org.apache.spark.util.{ByteBufferInputStream, Utils}
 import org.apache.spark.util.io.ChunkedByteBuffer
 
 /**
  * Stores BlockManager blocks on disk.
  */
-private[spark] class DiskStore(conf: SparkConf, diskManager: 
DiskBlockManager) extends Logging {
+private[spark] class DiskStore(
+conf: SparkConf,
+diskManager: DiskBlockManager,
+securityManager: SecurityManager) extends Logging {
 
   private val minMemoryMapBytes = 
conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
 
   def getSize(blockId: BlockId): Long = {
-diskManager.getFile(blockId.name).length
+val file = diskManager.getMetadataFile(blockId)
+Files.toString(file, UTF_8).toLong
   }
 
   /**
* Invokes the provided callback function to write the specific block.
*
* @throws IllegalStateException if the block already exists in the disk 
store.
*/
-  def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = {
+  def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit 
= {
 if (contains(blockId)) {
   throw new IllegalStateException(s"Block $blockId is already present 
in the disk store")
 }
 logDebug(s"Attempting to put block $blockId")
 val startTime = System.currentTimeMillis
 val file = diskManager.getFile(blockId)
-val fileOutputStream = new FileOutputStream(file)
+val out = new CountingWritableChannel(openForWrite(file))
 var threwException: Boolean = true
 try {
-  writeFunc(fileOutputStream)
+  writeFunc(out)
+  Files.write(out.getCount().toString(), 
diskManager.getMetadataFile(blockId), UTF_8)
   threwException = false
 } finally {
   try {
-Closeables.close(fileOutputStream, threwException)
+Closeables.close(out, threwException)
--- End diff --

IOException can be thrown in close(), we will need to remove block (and 
meta) in that case as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r106779546
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, 
diskManager: DiskBlockManager) e
   }
 
   def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
-put(blockId) { fileOutputStream =>
-  val channel = fileOutputStream.getChannel
-  Utils.tryWithSafeFinally {
-bytes.writeFully(channel)
-  } {
-channel.close()
-  }
+put(blockId) { channel =>
+  bytes.writeFully(channel)
 }
   }
 
-  def getBytes(blockId: BlockId): ChunkedByteBuffer = {
+  def getBytes(blockId: BlockId): BlockData = {
 val file = diskManager.getFile(blockId.name)
-val channel = new RandomAccessFile(file, "r").getChannel
-Utils.tryWithSafeFinally {
-  // For small files, directly read rather than memory map
-  if (file.length < minMemoryMapBytes) {
-val buf = ByteBuffer.allocate(file.length.toInt)
-channel.position(0)
-while (buf.remaining() != 0) {
-  if (channel.read(buf) == -1) {
-throw new IOException("Reached EOF before filling buffer\n" +
-  
s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
+val blockSize = getSize(blockId)
+
+securityManager.getIOEncryptionKey() match {
+  case Some(key) =>
+// Encrypted blocks cannot be memory mapped; return a special 
object that does decryption
+// and provides InputStream / FileRegion implementations for 
reading the data.
+new EncryptedBlockData(file, blockSize, conf, key)
+
+  case _ =>
+val channel = new FileInputStream(file).getChannel()
+if (blockSize < minMemoryMapBytes) {
+  // For small files, directly read rather than memory map.
+  Utils.tryWithSafeFinally {
+val buf = ByteBuffer.allocate(blockSize.toInt)
+while (buf.remaining() > 0) {
+  channel.read(buf)
+}
+buf.flip()
+new ByteBufferBlockData(new ChunkedByteBuffer(buf))
+  } {
+channel.close()
+  }
+} else {
+  Utils.tryWithSafeFinally {
+new ByteBufferBlockData(
+  new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, 
file.length)))
+  } {
+channel.close()
   }
 }
-buf.flip()
-new ChunkedByteBuffer(buf)
-  } else {
-new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, 
file.length))
-  }
-} {
-  channel.close()
 }
   }
 
   def remove(blockId: BlockId): Boolean = {
 val file = diskManager.getFile(blockId.name)
-if (file.exists()) {
-  val ret = file.delete()
-  if (!ret) {
-logWarning(s"Error deleting ${file.getPath()}")
+val meta = diskManager.getMetadataFile(blockId)
+
+def delete(f: File): Boolean = {
+  if (f.exists()) {
+val ret = f.delete()
+if (!ret) {
+  logWarning(s"Error deleting ${file.getPath()}")
+}
+
+ret
+  } else {
+false
   }
-  ret
-} else {
-  false
 }
+
+delete(file) & delete(meta)
   }
 
   def contains(blockId: BlockId): Boolean = {
 val file = diskManager.getFile(blockId.name)
 file.exists()
   }
+
+  private def openForWrite(file: File): WritableByteChannel = {
+val out = new FileOutputStream(file).getChannel()
+try {
+  securityManager.getIOEncryptionKey().map { key =>
+CryptoStreamUtils.createWritableChannel(out, conf, key)
+  }.getOrElse(out)
+} catch {
+  case e: Exception =>
+out.close()
+throw e
+}
+  }
+
+}
+
+private class EncryptedBlockData(
+file: File,
+blockSize: Long,
+conf: SparkConf,
+key: Array[Byte]) extends BlockData {
+
+  override def toInputStream(): InputStream = 
Channels.newInputStream(open())
+
+  override def toManagedBuffer(): ManagedBuffer = new 
EncryptedManagedBuffer()
+
+  override def toByteBuffer(allocator: Int => ByteBuffer): 
ChunkedByteBuffer = {
+val source = open()
+try {
+  var remaining = blockSize
+  val chunks = new ListBuffer[ByteBuffer]()
+  while (remaining > 0) {
+val chunkSize = math.min(remaining, Int.MaxValue)
+val chunk = 

[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r106779004
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
@@ -34,6 +34,8 @@ import org.apache.spark.util.{ShutdownHookManager, Utils}
  */
 private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: 
Boolean) extends Logging {
 
+  private val METADATA_FILE_SUFFIX = ".meta"
--- End diff --

Assuming I am not missing something, shuffle does not use (require) block 
length from meta file.
If yes, for all others, why not simply keep the block size in memory ? On 
executor failure, the on disk block is lost anyway, and we already maintain 
block info for each block in executor.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r106269093
  
--- Diff: 
core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
@@ -102,4 +150,34 @@ private[spark] object CryptoStreamUtils extends 
Logging {
 }
 iv
   }
+
+  /**
+   * This class is a workaround for CRYPTO-125, that forces all bytes to 
be written to the
--- End diff --

This is a lousy bug ! Good thing that we dont seem to be hit by it (yet).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-18 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r10677
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
@@ -94,7 +101,11 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
deleteFilesOnStop: Boolea
   }
 }.filter(_ != null).flatMap { dir =>
   val files = dir.listFiles()
-  if (files != null) files else Seq.empty
+  if (files != null) {
+files.filter(!_.getName().endsWith(METADATA_FILE_SUFFIX))
+  } else {
+Seq.empty
+  }
--- End diff --

getAllFiles should be returning all files stored.
For example, test suite's use getAllFiles to remove all files using this 
api.

On other hand, getAllBlocks needs to ensure metadata file's are filtered 
out - this check should be moved there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-17 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r106751157
  
--- Diff: 
core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
@@ -63,12 +83,40 @@ private[spark] object CryptoStreamUtils extends Logging 
{
   is: InputStream,
   sparkConf: SparkConf,
   key: Array[Byte]): InputStream = {
-val properties = toCryptoConf(sparkConf)
 val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
-is.read(iv, 0, iv.length)
-val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION)
-new CryptoInputStream(transformationStr, properties, is,
-  new SecretKeySpec(key, "AES"), new IvParameterSpec(iv))
+var read = 0
+while (read < iv.length) {
--- End diff --

Ah, missed that one. +1 for shorter code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-17 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r106749569
  
--- Diff: 
core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
@@ -63,12 +83,40 @@ private[spark] object CryptoStreamUtils extends Logging 
{
   is: InputStream,
   sparkConf: SparkConf,
   key: Array[Byte]): InputStream = {
-val properties = toCryptoConf(sparkConf)
 val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
-is.read(iv, 0, iv.length)
-val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION)
-new CryptoInputStream(transformationStr, properties, is,
-  new SecretKeySpec(key, "AES"), new IvParameterSpec(iv))
+var read = 0
+while (read < iv.length) {
--- End diff --

Yeah, you can just use `ByteStreams.readFully(is, iv)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-17 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r106691642
  
--- Diff: 
core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
@@ -63,12 +83,40 @@ private[spark] object CryptoStreamUtils extends Logging 
{
   is: InputStream,
   sparkConf: SparkConf,
   key: Array[Byte]): InputStream = {
-val properties = toCryptoConf(sparkConf)
 val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
-is.read(iv, 0, iv.length)
-val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION)
-new CryptoInputStream(transformationStr, properties, is,
-  new SecretKeySpec(key, "AES"), new IvParameterSpec(iv))
+var read = 0
+while (read < iv.length) {
--- End diff --

It avoids issues with short reads. It's unlikely to happen but I always 
write read code like this to be safe.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-17 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r106691384
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, 
diskManager: DiskBlockManager) e
   }
 
   def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
-put(blockId) { fileOutputStream =>
-  val channel = fileOutputStream.getChannel
-  Utils.tryWithSafeFinally {
-bytes.writeFully(channel)
-  } {
-channel.close()
-  }
+put(blockId) { channel =>
+  bytes.writeFully(channel)
 }
   }
 
-  def getBytes(blockId: BlockId): ChunkedByteBuffer = {
+  def getBytes(blockId: BlockId): BlockData = {
 val file = diskManager.getFile(blockId.name)
-val channel = new RandomAccessFile(file, "r").getChannel
-Utils.tryWithSafeFinally {
-  // For small files, directly read rather than memory map
-  if (file.length < minMemoryMapBytes) {
-val buf = ByteBuffer.allocate(file.length.toInt)
-channel.position(0)
-while (buf.remaining() != 0) {
-  if (channel.read(buf) == -1) {
-throw new IOException("Reached EOF before filling buffer\n" +
-  
s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
+val blockSize = getSize(blockId)
+
+securityManager.getIOEncryptionKey() match {
+  case Some(key) =>
+// Encrypted blocks cannot be memory mapped; return a special 
object that does decryption
+// and provides InputStream / FileRegion implementations for 
reading the data.
+new EncryptedBlockData(file, blockSize, conf, key)
+
+  case _ =>
+val channel = new FileInputStream(file).getChannel()
+if (blockSize < minMemoryMapBytes) {
+  // For small files, directly read rather than memory map.
+  Utils.tryWithSafeFinally {
+val buf = ByteBuffer.allocate(blockSize.toInt)
+while (buf.remaining() > 0) {
+  channel.read(buf)
+}
+buf.flip()
+new ByteBufferBlockData(new ChunkedByteBuffer(buf))
+  } {
+channel.close()
+  }
+} else {
+  Utils.tryWithSafeFinally {
+new ByteBufferBlockData(
+  new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, 
file.length)))
+  } {
+channel.close()
   }
 }
-buf.flip()
-new ChunkedByteBuffer(buf)
-  } else {
-new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, 
file.length))
-  }
-} {
-  channel.close()
 }
   }
 
   def remove(blockId: BlockId): Boolean = {
 val file = diskManager.getFile(blockId.name)
-if (file.exists()) {
-  val ret = file.delete()
-  if (!ret) {
-logWarning(s"Error deleting ${file.getPath()}")
+val meta = diskManager.getMetadataFile(blockId)
+
+def delete(f: File): Boolean = {
+  if (f.exists()) {
+val ret = f.delete()
+if (!ret) {
+  logWarning(s"Error deleting ${file.getPath()}")
+}
+
+ret
+  } else {
+false
   }
-  ret
-} else {
-  false
 }
+
+delete(file) & delete(meta)
   }
 
   def contains(blockId: BlockId): Boolean = {
 val file = diskManager.getFile(blockId.name)
 file.exists()
   }
+
+  private def openForWrite(file: File): WritableByteChannel = {
+val out = new FileOutputStream(file).getChannel()
+try {
+  securityManager.getIOEncryptionKey().map { key =>
+CryptoStreamUtils.createWritableChannel(out, conf, key)
+  }.getOrElse(out)
+} catch {
+  case e: Exception =>
+out.close()
+throw e
+}
+  }
+
+}
+
+private class EncryptedBlockData(
+file: File,
+blockSize: Long,
+conf: SparkConf,
+key: Array[Byte]) extends BlockData {
+
+  override def toInputStream(): InputStream = 
Channels.newInputStream(open())
+
+  override def toManagedBuffer(): ManagedBuffer = new 
EncryptedManagedBuffer()
+
+  override def toByteBuffer(allocator: Int => ByteBuffer): 
ChunkedByteBuffer = {
+val source = open()
+try {
+  var remaining = blockSize
+  val chunks = new ListBuffer[ByteBuffer]()
+  while (remaining > 0) {
+val chunkSize = math.min(remaining, Int.MaxValue)
+val chunk = 

[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-17 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r106691863
  
--- Diff: 
core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
@@ -102,4 +150,34 @@ private[spark] object CryptoStreamUtils extends 
Logging {
 }
 iv
   }
+
+  /**
+   * This class is a workaround for CRYPTO-125, that forces all bytes to 
be written to the
+   * underlying channel. Since the callers of this API are using blocking 
I/O, there are no
+   * concerns with regards to CPU usage here.
--- End diff --

No. As the comment states, it's a workaround for a bug in the 
commons-crypto library, which would affect the code being added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-17 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r106588563
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, 
diskManager: DiskBlockManager) e
   }
 
   def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
-put(blockId) { fileOutputStream =>
-  val channel = fileOutputStream.getChannel
-  Utils.tryWithSafeFinally {
-bytes.writeFully(channel)
-  } {
-channel.close()
-  }
+put(blockId) { channel =>
+  bytes.writeFully(channel)
 }
   }
 
-  def getBytes(blockId: BlockId): ChunkedByteBuffer = {
+  def getBytes(blockId: BlockId): BlockData = {
 val file = diskManager.getFile(blockId.name)
-val channel = new RandomAccessFile(file, "r").getChannel
-Utils.tryWithSafeFinally {
-  // For small files, directly read rather than memory map
-  if (file.length < minMemoryMapBytes) {
-val buf = ByteBuffer.allocate(file.length.toInt)
-channel.position(0)
-while (buf.remaining() != 0) {
-  if (channel.read(buf) == -1) {
-throw new IOException("Reached EOF before filling buffer\n" +
-  
s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
+val blockSize = getSize(blockId)
+
+securityManager.getIOEncryptionKey() match {
+  case Some(key) =>
+// Encrypted blocks cannot be memory mapped; return a special 
object that does decryption
+// and provides InputStream / FileRegion implementations for 
reading the data.
+new EncryptedBlockData(file, blockSize, conf, key)
+
+  case _ =>
+val channel = new FileInputStream(file).getChannel()
+if (blockSize < minMemoryMapBytes) {
+  // For small files, directly read rather than memory map.
+  Utils.tryWithSafeFinally {
+val buf = ByteBuffer.allocate(blockSize.toInt)
+while (buf.remaining() > 0) {
+  channel.read(buf)
+}
+buf.flip()
+new ByteBufferBlockData(new ChunkedByteBuffer(buf))
+  } {
+channel.close()
+  }
+} else {
+  Utils.tryWithSafeFinally {
+new ByteBufferBlockData(
+  new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, 
file.length)))
+  } {
+channel.close()
   }
 }
-buf.flip()
-new ChunkedByteBuffer(buf)
-  } else {
-new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, 
file.length))
-  }
-} {
-  channel.close()
 }
   }
 
   def remove(blockId: BlockId): Boolean = {
 val file = diskManager.getFile(blockId.name)
-if (file.exists()) {
-  val ret = file.delete()
-  if (!ret) {
-logWarning(s"Error deleting ${file.getPath()}")
+val meta = diskManager.getMetadataFile(blockId)
+
+def delete(f: File): Boolean = {
+  if (f.exists()) {
+val ret = f.delete()
+if (!ret) {
+  logWarning(s"Error deleting ${file.getPath()}")
+}
+
+ret
+  } else {
+false
   }
-  ret
-} else {
-  false
 }
+
+delete(file) & delete(meta)
   }
 
   def contains(blockId: BlockId): Boolean = {
 val file = diskManager.getFile(blockId.name)
 file.exists()
   }
+
+  private def openForWrite(file: File): WritableByteChannel = {
+val out = new FileOutputStream(file).getChannel()
+try {
+  securityManager.getIOEncryptionKey().map { key =>
+CryptoStreamUtils.createWritableChannel(out, conf, key)
+  }.getOrElse(out)
+} catch {
+  case e: Exception =>
+out.close()
+throw e
+}
+  }
+
+}
+
+private class EncryptedBlockData(
+file: File,
+blockSize: Long,
+conf: SparkConf,
+key: Array[Byte]) extends BlockData {
+
+  override def toInputStream(): InputStream = 
Channels.newInputStream(open())
+
+  override def toManagedBuffer(): ManagedBuffer = new 
EncryptedManagedBuffer()
+
+  override def toByteBuffer(allocator: Int => ByteBuffer): 
ChunkedByteBuffer = {
+val source = open()
+try {
+  var remaining = blockSize
+  val chunks = new ListBuffer[ByteBuffer]()
+  while (remaining > 0) {
+val chunkSize = math.min(remaining, Int.MaxValue)
+val chunk = 

[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r106587687
  
--- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
---
@@ -1235,7 +1251,7 @@ private[spark] class BlockManager(
   peer.port,
   peer.executorId,
   blockId,
-  new NettyManagedBuffer(data.toNetty),
+  new BlockManagerManagedBuffer(blockInfoManager, blockId, 
data.toManagedBuffer()),
--- End diff --

why this change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r106587428
  
--- Diff: 
core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
@@ -102,4 +150,34 @@ private[spark] object CryptoStreamUtils extends 
Logging {
 }
 iv
   }
+
+  /**
+   * This class is a workaround for CRYPTO-125, that forces all bytes to 
be written to the
+   * underlying channel. Since the callers of this API are using blocking 
I/O, there are no
+   * concerns with regards to CPU usage here.
--- End diff --

is it a separated bug fix?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-16 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/17295#discussion_r106587322
  
--- Diff: 
core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
@@ -63,12 +83,40 @@ private[spark] object CryptoStreamUtils extends Logging 
{
   is: InputStream,
   sparkConf: SparkConf,
   key: Array[Byte]): InputStream = {
-val properties = toCryptoConf(sparkConf)
 val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
-is.read(iv, 0, iv.length)
-val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION)
-new CryptoInputStream(transformationStr, properties, is,
-  new SecretKeySpec(key, "AES"), new IvParameterSpec(iv))
+var read = 0
+while (read < iv.length) {
--- End diff --

what does this while loop do?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

2017-03-14 Thread vanzin
GitHub user vanzin opened a pull request:

https://github.com/apache/spark/pull/17295

[SPARK-19556][core] Do not encrypt block manager data in memory.

This change modifies the way block data is encrypted to make the more
common cases faster, while penalizing an edge case. As a side effect
of the change, all data that goes through the block manager is now
encrypted only when needed, including the previous path (broadcast
variables) where that did not happen.

The way the change works is by not encrypting data that is stored in
memory; so if a serialized block is in memory, it will only be encrypted
once it is evicted to disk.

The penalty comes when transferring that encrypted data from disk. If the
data ends up in memory again, it is as efficient as before; but if the
evicted block needs to be transferred directly to a remote executor, then
there's now a performance penalty, since the code now uses a custom
FileRegion implementation to decrypt the data before transferring.

This also means that block data transferred between executors now is
not encrypted (and thus relies on the network library encryption support
for secrecy). Shuffle blocks are still transferred in encrypted form,
since they're handled in a slightly different way by the code. This also
keeps compatibility with existing external shuffle services, which transfer
encrypted shuffle blocks, and avoids having to make the external service
aware of encryption at all.

Another change in the disk store is that it now stores a tiny metadata
file next to the file holding the block data; this is needed to accurately
account for the decrypted block size, which may be significantly different
from the size of the encrypted file on disk.

The serialization and deserialization APIs in the SerializerManager now
do not do encryption automatically; callers need to explicitly wrap their
streams with an appropriate crypto stream before using those.

As a result of these changes, some of the workarounds added in SPARK-19520
are removed here.

Testing: a new trait ("EncryptionFunSuite") was added that provides an easy
way to run a test twice, with encryption on and off; broadcast, block 
manager
and caching tests were modified to use this new trait so that the existing
tests exercise both encrypted and non-encrypted paths. I also ran some
applications with encryption turned on to verify that they still work,
including streaming tests that failed without the fix for SPARK-19520.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vanzin/spark SPARK-19556

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17295.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17295


commit 3aa752f9becdfe0e35a47d731736d942e3e5b3bf
Author: Marcelo Vanzin 
Date:   2017-02-10T23:59:51Z

[SPARK-19556][core] Do not encrypt block manager data in memory.

This change modifies the way block data is encrypted to make the more
common cases faster, while penalizing an edge case. As a side effect
of the change, all data that goes through the block manager is now
encrypted only when needed, including the previous path (broadcast
variables) where that did not happen.

The way the change works is by not encrypting data that is stored in
memory; so if a serialized block is in memory, it will only be encrypted
once it is evicted to disk.

The penalty comes when transferring that encrypted data from disk. If the
data ends up in memory again, it is as efficient as before; but if the
evicted block needs to be transferred directly to a remote executor, then
there's now a performance penalty, since the code now uses a custom
FileRegion implementation to decrypt the data before transferring.

This also means that block data transferred between executors now is
not encrypted (and thus relies on the network library encryption support
for secrecy). Shuffle blocks are still transferred in encrypted form,
since they're handled in a slightly different way by the code. This also
keeps compatibility with existing external shuffle services, which transfer
encrypted shuffle blocks, and avoids having to make the external service
aware of encryption at all.

Another change in the disk store is that it now stores a tiny metadata
file next to the file holding the block data; this is needed to accurately
account for the decrypted block size, which may be significantly different
from the size of the encrypted file on disk.

The serialization and deserialization APIs in the