[GitHub] spark pull request #16972: [SPARK-19556][CORE][WIP] Broadcast data is not en...

2017-03-23 Thread uncleGen
Github user uncleGen closed the pull request at:

https://github.com/apache/spark/pull/16972


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16972: [SPARK-19556][CORE][WIP] Broadcast data is not en...

2017-02-22 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16972#discussion_r102531623
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -73,17 +81,52 @@ private[spark] class DiskStore(conf: SparkConf, 
diskManager: DiskBlockManager) e
   }
 
   def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
+val bytesToStore = if (serializerManager.encryptionEnabled) {
+  try {
+val data = bytes.toByteBuffer
+val in = new ByteBufferInputStream(data, true)
+val byteBufOut = new ByteBufferOutputStream(data.remaining())
+val out = CryptoStreamUtils.createCryptoOutputStream(byteBufOut, 
conf,
+  serializerManager.encryptionKey.get)
+try {
+  ByteStreams.copy(in, out)
+} finally {
+  in.close()
+  out.close()
+}
+new ChunkedByteBuffer(byteBufOut.toByteBuffer)
+  } finally {
+bytes.dispose()
+  }
+} else {
+  bytes
+}
+
 put(blockId) { fileOutputStream =>
   val channel = fileOutputStream.getChannel
   Utils.tryWithSafeFinally {
-bytes.writeFully(channel)
+bytesToStore.writeFully(channel)
   } {
 channel.close()
   }
 }
   }
 
   def getBytes(blockId: BlockId): ChunkedByteBuffer = {
+val bytes = readBytes(blockId)
+
+val in = 
serializerManager.wrapForEncryption(bytes.toInputStream(dispose = true))
+new ChunkedByteBuffer(ByteBuffer.wrap(IOUtils.toByteArray(in)))
--- End diff --

Yes, it's easier to have the data encrypted in both places. It also makes 
performance worse, because there's no reason to encrypt the data in memory. It 
also requires all sorts of hacks in the block manager, such as the one I added 
to make the streaming WAL work, which is similar to the the one that would be 
required to encrypt broadcast data.

Yes, the change to not encrypt in memory and encrypt on disk is larger, 
more involved, more complex, but it's a better change and results in a block 
manager that has less caveats (for example, users of the block manager don't 
need to be aware of encryption at all).

I have ideas on how to fix this, I just haven't had the time to work on 
them. If you don't feel like trying it out, feel free to close the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16972: [SPARK-19556][CORE][WIP] Broadcast data is not en...

2017-02-21 Thread uncleGen
Github user uncleGen commented on a diff in the pull request:

https://github.com/apache/spark/pull/16972#discussion_r102390214
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -73,17 +81,52 @@ private[spark] class DiskStore(conf: SparkConf, 
diskManager: DiskBlockManager) e
   }
 
   def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
+val bytesToStore = if (serializerManager.encryptionEnabled) {
+  try {
+val data = bytes.toByteBuffer
+val in = new ByteBufferInputStream(data, true)
+val byteBufOut = new ByteBufferOutputStream(data.remaining())
+val out = CryptoStreamUtils.createCryptoOutputStream(byteBufOut, 
conf,
+  serializerManager.encryptionKey.get)
+try {
+  ByteStreams.copy(in, out)
+} finally {
+  in.close()
+  out.close()
+}
+new ChunkedByteBuffer(byteBufOut.toByteBuffer)
+  } finally {
+bytes.dispose()
+  }
+} else {
+  bytes
+}
+
 put(blockId) { fileOutputStream =>
   val channel = fileOutputStream.getChannel
   Utils.tryWithSafeFinally {
-bytes.writeFully(channel)
+bytesToStore.writeFully(channel)
   } {
 channel.close()
   }
 }
   }
 
   def getBytes(blockId: BlockId): ChunkedByteBuffer = {
+val bytes = readBytes(blockId)
+
+val in = 
serializerManager.wrapForEncryption(bytes.toInputStream(dispose = true))
+new ChunkedByteBuffer(ByteBuffer.wrap(IOUtils.toByteArray(in)))
--- End diff --

@vanzin After take some to think about it, I find it may perplex the issue 
if we seperate `MemoryStore` with un-encrypted data and `DiskStore`with 
encrypted data. As get data from remote, we will encrypt data if it is stored 
in memory in un-encrypted style. Besides, when we 
`maybeCacheDiskBytesInMemory`, we will decrypt them again. I've thought about 
caching disk data into memory in encrypted style, and then decrypt them lazily 
when used. It makes things much complicated. Maybe, it is better to keep the 
original style, i.e. keep data encrypted (if can) in memory and disk. We should 
narrow this problem. Any suggesstion?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16972: [SPARK-19556][CORE][WIP] Broadcast data is not en...

2017-02-20 Thread uncleGen
Github user uncleGen commented on a diff in the pull request:

https://github.com/apache/spark/pull/16972#discussion_r101969639
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -73,17 +81,52 @@ private[spark] class DiskStore(conf: SparkConf, 
diskManager: DiskBlockManager) e
   }
 
   def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
+val bytesToStore = if (serializerManager.encryptionEnabled) {
+  try {
+val data = bytes.toByteBuffer
+val in = new ByteBufferInputStream(data, true)
+val byteBufOut = new ByteBufferOutputStream(data.remaining())
+val out = CryptoStreamUtils.createCryptoOutputStream(byteBufOut, 
conf,
+  serializerManager.encryptionKey.get)
+try {
+  ByteStreams.copy(in, out)
+} finally {
+  in.close()
+  out.close()
+}
+new ChunkedByteBuffer(byteBufOut.toByteBuffer)
+  } finally {
+bytes.dispose()
+  }
+} else {
+  bytes
+}
+
 put(blockId) { fileOutputStream =>
   val channel = fileOutputStream.getChannel
   Utils.tryWithSafeFinally {
-bytes.writeFully(channel)
+bytesToStore.writeFully(channel)
   } {
 channel.close()
   }
 }
   }
 
   def getBytes(blockId: BlockId): ChunkedByteBuffer = {
+val bytes = readBytes(blockId)
+
+val in = 
serializerManager.wrapForEncryption(bytes.toInputStream(dispose = true))
+new ChunkedByteBuffer(ByteBuffer.wrap(IOUtils.toByteArray(in)))
--- End diff --

I see, make sense. It seems like to be much more complex than I thought in 
decryption.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16972: [SPARK-19556][CORE][WIP] Broadcast data is not en...

2017-02-17 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/16972#discussion_r101831507
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -73,17 +81,52 @@ private[spark] class DiskStore(conf: SparkConf, 
diskManager: DiskBlockManager) e
   }
 
   def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
+val bytesToStore = if (serializerManager.encryptionEnabled) {
+  try {
+val data = bytes.toByteBuffer
+val in = new ByteBufferInputStream(data, true)
+val byteBufOut = new ByteBufferOutputStream(data.remaining())
+val out = CryptoStreamUtils.createCryptoOutputStream(byteBufOut, 
conf,
+  serializerManager.encryptionKey.get)
+try {
+  ByteStreams.copy(in, out)
+} finally {
+  in.close()
+  out.close()
+}
+new ChunkedByteBuffer(byteBufOut.toByteBuffer)
+  } finally {
+bytes.dispose()
+  }
+} else {
+  bytes
+}
+
 put(blockId) { fileOutputStream =>
   val channel = fileOutputStream.getChannel
   Utils.tryWithSafeFinally {
-bytes.writeFully(channel)
+bytesToStore.writeFully(channel)
   } {
 channel.close()
   }
 }
   }
 
   def getBytes(blockId: BlockId): ChunkedByteBuffer = {
+val bytes = readBytes(blockId)
+
+val in = 
serializerManager.wrapForEncryption(bytes.toInputStream(dispose = true))
+new ChunkedByteBuffer(ByteBuffer.wrap(IOUtils.toByteArray(in)))
--- End diff --

So, this is the part in that I mentioned in the bug is "tricky".

You don't want to do this. The disk store should not be materializing 
blocks into memory, because there may not be enough memory for it. If you look 
at the code in `BlockManager`, in the specific case where blocks from the disk 
store are put in the memory store, it checks whether there's enough storage 
memory for it.

If there isn't, it just returns a mapped byte buffer to the caller (which 
doesn't use storage memory). The problem is that you can't do that with 
encryption. Thus the tricky part.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16972: [SPARK-19556][CORE][WIP] Broadcast data is not en...

2017-02-16 Thread uncleGen
Github user uncleGen commented on a diff in the pull request:

https://github.com/apache/spark/pull/16972#discussion_r101682505
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -73,17 +81,52 @@ private[spark] class DiskStore(conf: SparkConf, 
diskManager: DiskBlockManager) e
   }
 
   def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
+val bytesToStore = if (serializerManager.encryptionEnabled) {
+  try {
+val data = bytes.toByteBuffer
+val in = new ByteBufferInputStream(data, true)
+val byteBufOut = new ByteBufferOutputStream(data.remaining())
+val out = CryptoStreamUtils.createCryptoOutputStream(byteBufOut, 
conf,
+  serializerManager.encryptionKey.get)
+try {
+  ByteStreams.copy(in, out)
+} finally {
+  in.close()
+  out.close()
+}
+new ChunkedByteBuffer(byteBufOut.toByteBuffer)
+  } finally {
+bytes.dispose()
+  }
+} else {
+  bytes
+}
+
 put(blockId) { fileOutputStream =>
   val channel = fileOutputStream.getChannel
   Utils.tryWithSafeFinally {
-bytes.writeFully(channel)
+bytesToStore.writeFully(channel)
   } {
 channel.close()
   }
 }
   }
 
   def getBytes(blockId: BlockId): ChunkedByteBuffer = {
+val bytes = readBytes(blockId)
+
+val in = 
serializerManager.wrapForEncryption(bytes.toInputStream(dispose = true))
+new ChunkedByteBuffer(ByteBuffer.wrap(IOUtils.toByteArray(in)))
+  }
+
+  def getBytesAsValues[T](blockId: BlockId, classTag: ClassTag[T]): 
Iterator[T] = {
+val bytes = readBytes(blockId)
+
+serializerManager
+  .dataDeserializeStream(blockId, bytes.toInputStream(dispose = 
true))(classTag)
+  }
+
+  private[storage] def readBytes(blockId: BlockId): ChunkedByteBuffer = {
--- End diff --

abstract it for unit test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16972: [SPARK-19556][CORE][WIP] Broadcast data is not en...

2017-02-16 Thread uncleGen
Github user uncleGen commented on a diff in the pull request:

https://github.com/apache/spark/pull/16972#discussion_r101682451
  
--- Diff: core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala 
---
@@ -39,27 +40,27 @@ class DiskStoreSuite extends SparkFunSuite {
 val blockId = BlockId("rdd_1_2")
 val diskBlockManager = new DiskBlockManager(new SparkConf(), 
deleteFilesOnStop = true)
 
-val diskStoreMapped = new DiskStore(new SparkConf().set(confKey, "0"), 
diskBlockManager)
+val conf = new SparkConf()
+val serializer = new KryoSerializer(conf)
+val serializerManager = new SerializerManager(serializer, conf)
+
+conf.set(confKey, "0")
+val diskStoreMapped = new DiskStore(conf, serializerManager, 
diskBlockManager)
 diskStoreMapped.putBytes(blockId, byteBuffer)
-val mapped = diskStoreMapped.getBytes(blockId)
+val mapped = diskStoreMapped.readBytes(blockId)
 assert(diskStoreMapped.remove(blockId))
 
-val diskStoreNotMapped = new DiskStore(new SparkConf().set(confKey, 
"1m"), diskBlockManager)
+conf.set(confKey, "1m")
+val diskStoreNotMapped = new DiskStore(conf, serializerManager, 
diskBlockManager)
 diskStoreNotMapped.putBytes(blockId, byteBuffer)
-val notMapped = diskStoreNotMapped.getBytes(blockId)
+val notMapped = diskStoreNotMapped.readBytes(blockId)
 
 // Not possible to do isInstanceOf due to visibility of HeapByteBuffer
 
assert(notMapped.getChunks().forall(_.getClass.getName.endsWith("HeapByteBuffer")),
   "Expected HeapByteBuffer for un-mapped read")
 assert(mapped.getChunks().forall(_.isInstanceOf[MappedByteBuffer]),
   "Expected MappedByteBuffer for mapped read")
 
-def arrayFromByteBuffer(in: ByteBuffer): Array[Byte] = {
-  val array = new Array[Byte](in.remaining())
--- End diff --

remove unused


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16972: [SPARK-19556][CORE][WIP] Broadcast data is not en...

2017-02-16 Thread uncleGen
Github user uncleGen commented on a diff in the pull request:

https://github.com/apache/spark/pull/16972#discussion_r101682663
  
--- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
@@ -21,17 +21,25 @@ import java.io.{FileOutputStream, IOException, 
RandomAccessFile}
 import java.nio.ByteBuffer
 import java.nio.channels.FileChannel.MapMode
 
-import com.google.common.io.Closeables
+import scala.reflect.ClassTag
+
+import com.google.common.io.{ByteStreams, Closeables}
+import org.apache.commons.io.IOUtils
 
-import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.Utils
+import org.apache.spark.SparkConf
+import org.apache.spark.security.CryptoStreamUtils
+import org.apache.spark.serializer.SerializerManager
+import org.apache.spark.util.{ByteBufferInputStream, 
ByteBufferOutputStream, Utils}
 import org.apache.spark.util.io.ChunkedByteBuffer
 
 /**
  * Stores BlockManager blocks on disk.
  */
-private[spark] class DiskStore(conf: SparkConf, diskManager: 
DiskBlockManager) extends Logging {
+private[spark] class DiskStore(
+conf: SparkConf,
+serializerManager: SerializerManager,
--- End diff --

add `serializerManager ` to do decryption work in `DiskStore`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16972: [SPARK-19556][CORE][WIP] Broadcast data is not en...

2017-02-16 Thread uncleGen
Github user uncleGen commented on a diff in the pull request:

https://github.com/apache/spark/pull/16972#discussion_r101682730
  
--- Diff: 
core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
@@ -344,7 +370,7 @@ private[spark] class MemoryStore(
 val serializationStream: SerializationStream = {
   val autoPick = !blockId.isInstanceOf[StreamBlockId]
   val ser = serializerManager.getSerializer(classTag, 
autoPick).newInstance()
-  ser.serializeStream(serializerManager.wrapStream(blockId, 
redirectableStream))
+  ser.serializeStream(serializerManager.wrapForCompression(blockId, 
redirectableStream))
 }
--- End diff --

`MemoryStore` will not do encryption work


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16972: [SPARK-19556][CORE][WIP] Broadcast data is not en...

2017-02-16 Thread uncleGen
GitHub user uncleGen opened a pull request:

https://github.com/apache/spark/pull/16972

[SPARK-19556][CORE][WIP] Broadcast data is not encrypted when I/O 
encryption is on

## What changes were proposed in this pull request?

`TorrentBroadcast` uses a couple of "back doors" into the block manager to 
write and read data.

The thing these block manager methods have in common is that they bypass 
the encryption code; so broadcast data is stored unencrypted in the block 
manager, causing unencrypted data to be written to disk if those blocks need to 
be evicted from memory.

The correct fix here is actually not to change `TorrentBroadcast`, but to 
fix the block manager so that:

- data stored in memory is not encrypted
- data written to disk is encrypted

This would simplify the code paths that use BlockManager / 
SerializerManager APIs.

## How was this patch tested?

update and add unit test.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/uncleGen/spark SPARK-19556

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16972.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16972


commit 63d909b4a0cc108fd8756436b21c65614abb9466
Author: uncleGen 
Date:   2017-02-15T14:12:47Z

cp

commit f9a91d63af3191b853ef88bd48293bcc19f3ec4c
Author: uncleGen 
Date:   2017-02-17T03:54:44Z

refactor blockmanager: data stored in memory is not encrypted, data written 
to disk is encrypted




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org