[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/17295 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r108087419 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala --- @@ -17,31 +17,52 @@ package org.apache.spark.storage -import org.apache.spark.network.buffer.{ManagedBuffer, NettyManagedBuffer} +import java.io.InputStream +import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.spark.network.buffer.ManagedBuffer import org.apache.spark.util.io.ChunkedByteBuffer /** - * This [[ManagedBuffer]] wraps a [[ChunkedByteBuffer]] retrieved from the [[BlockManager]] + * This [[ManagedBuffer]] wraps a [[BlockData]] instance retrieved from the [[BlockManager]] * so that the corresponding block's read lock can be released once this buffer's references * are released. * + * If `dispose` is set to try, the [[BlockData]]will be disposed when the buffer's reference --- End diff -- `is set to try` -> `is set to true` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r108046997 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -56,6 +57,49 @@ private[spark] class BlockResult( val bytes: Long) /** + * Abstracts away how blocks are stored and provides different ways to read the underlying block + * data. Callers should call [[dispose()]] when they're done with the block. + */ +private[spark] trait BlockData { + + def toInputStream(): InputStream + + /** + * Returns a Netty-friendly wrapper for the block's data. + * + * @see [[ManagedBuffer#convertToNetty()]] + */ + def toNetty(): Object + + def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer + + def toByteBuffer(): ByteBuffer + + def size: Long + + def dispose(): Unit + +} + +private[spark] class ByteBufferBlockData(val buffer: ChunkedByteBuffer) extends BlockData { + + override def toInputStream(): InputStream = buffer.toInputStream(dispose = false) + + override def toNetty(): Object = buffer.toNetty + + override def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer = { +buffer.copy(allocator) + } + + override def toByteBuffer(): ByteBuffer = buffer.toByteBuffer + + override def size: Long = buffer.size + + override def dispose(): Unit = buffer.unmap() --- End diff -- BTW I'm really starting to think the fix in #16499, while technically correct, is more confusing that it should be. The problem is not that the code was disposing of off-heap buffers; the problem is that buffers read from the memory store should not be disposed of, while buffers read from the disk store should. So it's not really a matter of dispose vs. unmap, but a matter of where the buffer come from. (Which is kinda what I had in this patch with the `autoDispose` parameter to `ByteBufferBlockData`. Perhaps I should revive that and get rid of `StorageUtils.unmap`, which is just confusing.) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r108046686 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -56,6 +57,49 @@ private[spark] class BlockResult( val bytes: Long) /** + * Abstracts away how blocks are stored and provides different ways to read the underlying block + * data. Callers should call [[dispose()]] when they're done with the block. + */ +private[spark] trait BlockData { + + def toInputStream(): InputStream + + /** + * Returns a Netty-friendly wrapper for the block's data. + * + * @see [[ManagedBuffer#convertToNetty()]] + */ + def toNetty(): Object + + def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer + + def toByteBuffer(): ByteBuffer + + def size: Long + + def dispose(): Unit + +} + +private[spark] class ByteBufferBlockData(val buffer: ChunkedByteBuffer) extends BlockData { + + override def toInputStream(): InputStream = buffer.toInputStream(dispose = false) + + override def toNetty(): Object = buffer.toNetty + + override def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer = { +buffer.copy(allocator) + } + + override def toByteBuffer(): ByteBuffer = buffer.toByteBuffer + + override def size: Long = buffer.size + + override def dispose(): Unit = buffer.unmap() --- End diff -- I think `BlockData.dispose()` is pretty well defined. "Release any resources held by the object." What's confusing is that there's both `dispose()` and `unmap()` in `ChunkedByteBuffer`, when there used to be only `dispose()`. It's confusing to have two different methods for releasing resources, and that confusion is not being caused by this patch. `BlockData` is not just a wrapper around `ChunkedByteBuffer`; if it were there wouldn't be a need for it. Which is why calling the method `unmmap()` wouldn't make any sense here, since that's very specific to memory-mapped byte buffers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r108035391 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -56,6 +57,49 @@ private[spark] class BlockResult( val bytes: Long) /** + * Abstracts away how blocks are stored and provides different ways to read the underlying block + * data. Callers should call [[dispose()]] when they're done with the block. + */ +private[spark] trait BlockData { + + def toInputStream(): InputStream + + /** + * Returns a Netty-friendly wrapper for the block's data. + * + * @see [[ManagedBuffer#convertToNetty()]] + */ + def toNetty(): Object + + def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer + + def toByteBuffer(): ByteBuffer + + def size: Long + + def dispose(): Unit + +} + +private[spark] class ByteBufferBlockData(val buffer: ChunkedByteBuffer) extends BlockData { + + override def toInputStream(): InputStream = buffer.toInputStream(dispose = false) + + override def toNetty(): Object = buffer.toNetty + + override def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer = { +buffer.copy(allocator) + } + + override def toByteBuffer(): ByteBuffer = buffer.toByteBuffer + + override def size: Long = buffer.size + + override def dispose(): Unit = buffer.unmap() --- End diff -- can we define the semantic of the `BlockData.dispose` clearly? It's quite confusing here that the `dispose` method call `buffer.unmap` while `ChunkedByteBuffer` also has a `dispose` method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r107952007 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -1065,7 +1084,7 @@ private[spark] class BlockManager( try { replicate(blockId, bytesToReplicate, level, remoteClassTag) } finally { -bytesToReplicate.unmap() +bytesToReplicate.dispose() --- End diff -- `BlockData.dispose` calls `ChunkedByteBuffer.unmap`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r107832519 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -1065,7 +1084,7 @@ private[spark] class BlockManager( try { replicate(blockId, bytesToReplicate, level, remoteClassTag) } finally { -bytesToReplicate.unmap() +bytesToReplicate.dispose() --- End diff -- I'm afraid this may counteract the effort we made in https://github.com/apache/spark/pull/16499 Ideally `unmap` and `dispose` do different things cc @mallman --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r107789203 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -56,6 +57,44 @@ private[spark] class BlockResult( val bytes: Long) /** + * Abstracts away how blocks are stored and provides different ways to read the underlying block + * data. Callers should call [[dispose()]] when they're done with the block. + */ +private[spark] trait BlockData { + + def toInputStream(): InputStream + + def toNetty(): Object + + def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer + + def toByteBuffer(): ByteBuffer --- End diff -- I added scaladoc for `toNetty()`, but the others seem self-explanatory to me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r107787884 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -56,6 +57,44 @@ private[spark] class BlockResult( val bytes: Long) /** + * Abstracts away how blocks are stored and provides different ways to read the underlying block + * data. Callers should call [[dispose()]] when they're done with the block. + */ +private[spark] trait BlockData { + + def toInputStream(): InputStream + + def toNetty(): Object --- End diff -- See `ManagedBuffer.convertToNetty()`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r107787818 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala --- @@ -17,48 +17,67 @@ package org.apache.spark.storage -import java.io.{FileOutputStream, IOException, RandomAccessFile} +import java.io._ import java.nio.ByteBuffer +import java.nio.channels.{Channels, ReadableByteChannel, WritableByteChannel} import java.nio.channels.FileChannel.MapMode +import java.nio.charset.StandardCharsets.UTF_8 +import java.util.concurrent.ConcurrentHashMap -import com.google.common.io.Closeables +import scala.collection.mutable.ListBuffer -import org.apache.spark.SparkConf +import com.google.common.io.{ByteStreams, Closeables, Files} +import io.netty.channel.FileRegion +import io.netty.util.AbstractReferenceCounted + +import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils +import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.security.CryptoStreamUtils +import org.apache.spark.util.{ByteBufferInputStream, Utils} import org.apache.spark.util.io.ChunkedByteBuffer /** * Stores BlockManager blocks on disk. */ -private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) extends Logging { +private[spark] class DiskStore( +conf: SparkConf, +diskManager: DiskBlockManager, +securityManager: SecurityManager) extends Logging { private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m") + private val blockSizes = new ConcurrentHashMap[String, Long]() - def getSize(blockId: BlockId): Long = { -diskManager.getFile(blockId.name).length - } + def getSize(blockId: BlockId): Long = blockSizes.get(blockId.name) /** * Invokes the provided callback function to write the specific block. * * @throws IllegalStateException if the block already exists in the disk store. */ - def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = { + def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = { if (contains(blockId)) { throw new IllegalStateException(s"Block $blockId is already present in the disk store") } logDebug(s"Attempting to put block $blockId") val startTime = System.currentTimeMillis val file = diskManager.getFile(blockId) -val fileOutputStream = new FileOutputStream(file) +val out = new CountingWritableChannel(openForWrite(file)) var threwException: Boolean = true try { - writeFunc(fileOutputStream) + writeFunc(out) + blockSizes.put(blockId.name, out.getCount) threwException = false } finally { try { -Closeables.close(fileOutputStream, threwException) +out.close() + } catch { +case ioe: IOException => --- End diff -- The code needs to catch any exception thrown by `out.close()` and also remove the block in that case. That wasn't done before. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r107787099 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala --- @@ -31,17 +35,31 @@ import org.apache.spark.util.io.ChunkedByteBuffer private[storage] class BlockManagerManagedBuffer( blockInfoManager: BlockInfoManager, blockId: BlockId, -chunkedBuffer: ChunkedByteBuffer) extends NettyManagedBuffer(chunkedBuffer.toNetty) { +data: BlockData, +dispose: Boolean) extends ManagedBuffer { --- End diff -- Hmm, I prefer `dispose`, because it's not about needing to dispose the buffer, but wanting to dispose the buffer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r107786888 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -1065,7 +1084,7 @@ private[spark] class BlockManager( try { replicate(blockId, bytesToReplicate, level, remoteClassTag) } finally { -bytesToReplicate.unmap() +bytesToReplicate.dispose() --- End diff -- Because there's no `BlockData.unmap()`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r107786384 --- Diff: core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala --- @@ -167,30 +167,26 @@ private[spark] class SerializerManager( val byteStream = new BufferedOutputStream(outputStream) val autoPick = !blockId.isInstanceOf[StreamBlockId] val ser = getSerializer(implicitly[ClassTag[T]], autoPick).newInstance() -ser.serializeStream(wrapStream(blockId, byteStream)).writeAll(values).close() --- End diff -- They're still used in a bunch of places. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r107786072 --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala --- @@ -63,12 +84,27 @@ private[spark] object CryptoStreamUtils extends Logging { is: InputStream, sparkConf: SparkConf, key: Array[Byte]): InputStream = { -val properties = toCryptoConf(sparkConf) val iv = new Array[Byte](IV_LENGTH_IN_BYTES) -is.read(iv, 0, iv.length) -val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION) -new CryptoInputStream(transformationStr, properties, is, - new SecretKeySpec(key, "AES"), new IvParameterSpec(iv)) +ByteStreams.readFully(is, iv) +val params = new CryptoParams(key, sparkConf) +new CryptoInputStream(params.transformation, params.conf, is, params.keySpec, + new IvParameterSpec(iv)) + } + + /** + * Wrap a `ReadableByteChannel` for decryption. + */ + def createReadableChannel( + channel: ReadableByteChannel, + sparkConf: SparkConf, + key: Array[Byte]): ReadableByteChannel = { +val iv = new Array[Byte](IV_LENGTH_IN_BYTES) +val buf = ByteBuffer.wrap(iv) +JavaUtils.readFully(channel, buf) --- End diff -- There's no `ByteStreams.readFully` for `ReadableByteChannel` that I'm aware of. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r107785760 --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala --- @@ -48,12 +51,30 @@ private[spark] object CryptoStreamUtils extends Logging { os: OutputStream, sparkConf: SparkConf, key: Array[Byte]): OutputStream = { -val properties = toCryptoConf(sparkConf) -val iv = createInitializationVector(properties) +val params = new CryptoParams(key, sparkConf) +val iv = createInitializationVector(params.conf) os.write(iv) -val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION) -new CryptoOutputStream(transformationStr, properties, os, - new SecretKeySpec(key, "AES"), new IvParameterSpec(iv)) +new CryptoOutputStream(params.transformation, params.conf, os, params.keySpec, + new IvParameterSpec(iv)) + } + + /** + * Wrap a `WritableByteChannel` for encryption. + */ + def createWritableChannel( + channel: WritableByteChannel, + sparkConf: SparkConf, + key: Array[Byte]): WritableByteChannel = { +val params = new CryptoParams(key, sparkConf) +val iv = createInitializationVector(params.conf) +val buf = ByteBuffer.wrap(iv) +while (buf.hasRemaining()) { --- End diff -- No, there's no infinite loop here, because a failure would cause an exception. Yeah, using the helper should work too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r107327362 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -56,6 +57,44 @@ private[spark] class BlockResult( val bytes: Long) /** + * Abstracts away how blocks are stored and provides different ways to read the underlying block + * data. Callers should call [[dispose()]] when they're done with the block. + */ +private[spark] trait BlockData { + + def toInputStream(): InputStream + + def toNetty(): Object --- End diff -- why the return type is `Object`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r107327188 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala --- @@ -17,48 +17,67 @@ package org.apache.spark.storage -import java.io.{FileOutputStream, IOException, RandomAccessFile} +import java.io._ import java.nio.ByteBuffer +import java.nio.channels.{Channels, ReadableByteChannel, WritableByteChannel} import java.nio.channels.FileChannel.MapMode +import java.nio.charset.StandardCharsets.UTF_8 +import java.util.concurrent.ConcurrentHashMap -import com.google.common.io.Closeables +import scala.collection.mutable.ListBuffer -import org.apache.spark.SparkConf +import com.google.common.io.{ByteStreams, Closeables, Files} +import io.netty.channel.FileRegion +import io.netty.util.AbstractReferenceCounted + +import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils +import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.network.util.JavaUtils +import org.apache.spark.security.CryptoStreamUtils +import org.apache.spark.util.{ByteBufferInputStream, Utils} import org.apache.spark.util.io.ChunkedByteBuffer /** * Stores BlockManager blocks on disk. */ -private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) extends Logging { +private[spark] class DiskStore( +conf: SparkConf, +diskManager: DiskBlockManager, +securityManager: SecurityManager) extends Logging { private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m") + private val blockSizes = new ConcurrentHashMap[String, Long]() - def getSize(blockId: BlockId): Long = { -diskManager.getFile(blockId.name).length - } + def getSize(blockId: BlockId): Long = blockSizes.get(blockId.name) /** * Invokes the provided callback function to write the specific block. * * @throws IllegalStateException if the block already exists in the disk store. */ - def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = { + def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = { if (contains(blockId)) { throw new IllegalStateException(s"Block $blockId is already present in the disk store") } logDebug(s"Attempting to put block $blockId") val startTime = System.currentTimeMillis val file = diskManager.getFile(blockId) -val fileOutputStream = new FileOutputStream(file) +val out = new CountingWritableChannel(openForWrite(file)) var threwException: Boolean = true try { - writeFunc(fileOutputStream) + writeFunc(out) + blockSizes.put(blockId.name, out.getCount) threwException = false } finally { try { -Closeables.close(fileOutputStream, threwException) +out.close() + } catch { +case ioe: IOException => --- End diff -- why this? `threwException` starts with `true` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r107326715 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -1065,7 +1084,7 @@ private[spark] class BlockManager( try { replicate(blockId, bytesToReplicate, level, remoteClassTag) } finally { -bytesToReplicate.unmap() +bytesToReplicate.dispose() --- End diff -- why change `unmap` to `dispose`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r107324613 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala --- @@ -31,17 +35,31 @@ import org.apache.spark.util.io.ChunkedByteBuffer private[storage] class BlockManagerManagedBuffer( blockInfoManager: BlockInfoManager, blockId: BlockId, -chunkedBuffer: ChunkedByteBuffer) extends NettyManagedBuffer(chunkedBuffer.toNetty) { +data: BlockData, +dispose: Boolean) extends ManagedBuffer { --- End diff -- `needDispose` may be a better name --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r107324480 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala --- @@ -31,17 +35,31 @@ import org.apache.spark.util.io.ChunkedByteBuffer private[storage] class BlockManagerManagedBuffer( blockInfoManager: BlockInfoManager, blockId: BlockId, -chunkedBuffer: ChunkedByteBuffer) extends NettyManagedBuffer(chunkedBuffer.toNetty) { +data: BlockData, +dispose: Boolean) extends ManagedBuffer { + + private val refCount = new AtomicInteger(1) --- End diff -- maybe we should mention it in the class doc that the `BlockData` will be disposed automatically via reference count. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r107324269 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala --- @@ -17,11 +17,15 @@ package org.apache.spark.storage -import org.apache.spark.network.buffer.{ManagedBuffer, NettyManagedBuffer} +import java.io.InputStream +import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.spark.network.buffer.ManagedBuffer import org.apache.spark.util.io.ChunkedByteBuffer /** - * This [[ManagedBuffer]] wraps a [[ChunkedByteBuffer]] retrieved from the [[BlockManager]] + * This [[ManagedBuffer]] wraps a ManagedBuffer retrieved from the [[BlockManager]] --- End diff -- `wraps a [[BlockData]]` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r107323983 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -56,6 +57,44 @@ private[spark] class BlockResult( val bytes: Long) /** + * Abstracts away how blocks are stored and provides different ways to read the underlying block + * data. Callers should call [[dispose()]] when they're done with the block. + */ +private[spark] trait BlockData { + + def toInputStream(): InputStream + + def toNetty(): Object + + def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer + + def toByteBuffer(): ByteBuffer --- End diff -- it will be great to add some document for these 4 methods about when they will be called. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r107323833 --- Diff: core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala --- @@ -167,30 +167,26 @@ private[spark] class SerializerManager( val byteStream = new BufferedOutputStream(outputStream) val autoPick = !blockId.isInstanceOf[StreamBlockId] val ser = getSerializer(implicitly[ClassTag[T]], autoPick).newInstance() -ser.serializeStream(wrapStream(blockId, byteStream)).writeAll(values).close() --- End diff -- the `wrapStream` and `wrapForEncryption` methods can be removed from this class --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r107323552 --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala --- @@ -48,12 +51,30 @@ private[spark] object CryptoStreamUtils extends Logging { os: OutputStream, sparkConf: SparkConf, key: Array[Byte]): OutputStream = { -val properties = toCryptoConf(sparkConf) -val iv = createInitializationVector(properties) +val params = new CryptoParams(key, sparkConf) +val iv = createInitializationVector(params.conf) os.write(iv) -val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION) -new CryptoOutputStream(transformationStr, properties, os, - new SecretKeySpec(key, "AES"), new IvParameterSpec(iv)) +new CryptoOutputStream(params.transformation, params.conf, os, params.keySpec, + new IvParameterSpec(iv)) + } + + /** + * Wrap a `WritableByteChannel` for encryption. + */ + def createWritableChannel( + channel: WritableByteChannel, + sparkConf: SparkConf, + key: Array[Byte]): WritableByteChannel = { +val params = new CryptoParams(key, sparkConf) +val iv = createInitializationVector(params.conf) +val buf = ByteBuffer.wrap(iv) +while (buf.hasRemaining()) { --- End diff -- actually this logic is same with `CryptoHelperChannel`. Shall we create `CryptoHelperChannel` first and simply call `helper.write(buf)` here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r107323246 --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala --- @@ -63,12 +84,27 @@ private[spark] object CryptoStreamUtils extends Logging { is: InputStream, sparkConf: SparkConf, key: Array[Byte]): InputStream = { -val properties = toCryptoConf(sparkConf) val iv = new Array[Byte](IV_LENGTH_IN_BYTES) -is.read(iv, 0, iv.length) -val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION) -new CryptoInputStream(transformationStr, properties, is, - new SecretKeySpec(key, "AES"), new IvParameterSpec(iv)) +ByteStreams.readFully(is, iv) +val params = new CryptoParams(key, sparkConf) +new CryptoInputStream(params.transformation, params.conf, is, params.keySpec, + new IvParameterSpec(iv)) + } + + /** + * Wrap a `ReadableByteChannel` for decryption. + */ + def createReadableChannel( + channel: ReadableByteChannel, + sparkConf: SparkConf, + key: Array[Byte]): ReadableByteChannel = { +val iv = new Array[Byte](IV_LENGTH_IN_BYTES) +val buf = ByteBuffer.wrap(iv) +JavaUtils.readFully(channel, buf) --- End diff -- why not use `ByteStreams.readFully`? the `buf` is not used else where --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r107323085 --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala --- @@ -48,12 +51,30 @@ private[spark] object CryptoStreamUtils extends Logging { os: OutputStream, sparkConf: SparkConf, key: Array[Byte]): OutputStream = { -val properties = toCryptoConf(sparkConf) -val iv = createInitializationVector(properties) +val params = new CryptoParams(key, sparkConf) +val iv = createInitializationVector(params.conf) os.write(iv) -val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION) -new CryptoOutputStream(transformationStr, properties, os, - new SecretKeySpec(key, "AES"), new IvParameterSpec(iv)) +new CryptoOutputStream(params.transformation, params.conf, os, params.keySpec, + new IvParameterSpec(iv)) + } + + /** + * Wrap a `WritableByteChannel` for encryption. + */ + def createWritableChannel( + channel: WritableByteChannel, + sparkConf: SparkConf, + key: Array[Byte]): WritableByteChannel = { +val params = new CryptoParams(key, sparkConf) +val iv = createInitializationVector(params.conf) +val buf = ByteBuffer.wrap(iv) +while (buf.hasRemaining()) { --- End diff -- is there any possibility this may be an infinite loop? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r107322905 --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala --- @@ -219,18 +219,22 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) case None => logInfo("Started reading broadcast variable " + id) val startTimeMs = System.currentTimeMillis() - val blocks = readBlocks().flatMap(_.getChunks()) + val blocks = readBlocks() logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs)) - val obj = TorrentBroadcast.unBlockifyObject[T]( -blocks, SparkEnv.get.serializer, compressionCodec) - // Store the merged copy in BlockManager so other tasks on this executor don't - // need to re-fetch it. - val storageLevel = StorageLevel.MEMORY_AND_DISK - if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) { -throw new SparkException(s"Failed to store $broadcastId in BlockManager") + try { +val obj = TorrentBroadcast.unBlockifyObject[T]( + blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec) +// Store the merged copy in BlockManager so other tasks on this executor don't +// need to re-fetch it. +val storageLevel = StorageLevel.MEMORY_AND_DISK +if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) { + throw new SparkException(s"Failed to store $broadcastId in BlockManager") +} +obj + } finally { +blocks.foreach(_.dispose()) --- End diff -- ah good catch! we should dispose the blocks here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r107007132 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -56,6 +57,43 @@ private[spark] class BlockResult( val bytes: Long) /** + * Abstracts away how blocks are stored and provides different ways to read the underlying block + * data. The data for a BlockData instance can only be read once, since it may be backed by open + * file descriptors that change state as data is read. + */ +private[spark] trait BlockData { + + def toInputStream(): InputStream + + def toManagedBuffer(): ManagedBuffer + + def toByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer + + def size: Long + + def dispose(): Unit + +} + +private[spark] class ByteBufferBlockData( +val buffer: ChunkedByteBuffer, +autoDispose: Boolean = true) extends BlockData { + + override def toInputStream(): InputStream = buffer.toInputStream(dispose = autoDispose) + + override def toManagedBuffer(): ManagedBuffer = new NettyManagedBuffer(buffer.toNetty) + + override def toByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer = { +buffer.copy(allocator) + } --- End diff -- So I had traced through that stuff 2 or 3 times, and now I did it again and I think I finally understood all that's going on. Basically, the old code was really bad at explicitly disposing of the buffers, meaning a bunch of paths (like the ones that used managed buffers) didn't bother to do it and just left the work to the GC. I changed the code a bit to make the dispose more explicit and added comments in a few key places. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r106963546 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala --- @@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) e } def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = { -put(blockId) { fileOutputStream => - val channel = fileOutputStream.getChannel - Utils.tryWithSafeFinally { -bytes.writeFully(channel) - } { -channel.close() - } +put(blockId) { channel => + bytes.writeFully(channel) --- End diff -- The channel is owned by the code in the `put` method, which does that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r106965268 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala --- @@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) e } def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = { -put(blockId) { fileOutputStream => - val channel = fileOutputStream.getChannel - Utils.tryWithSafeFinally { -bytes.writeFully(channel) - } { -channel.close() - } +put(blockId) { channel => + bytes.writeFully(channel) } } - def getBytes(blockId: BlockId): ChunkedByteBuffer = { + def getBytes(blockId: BlockId): BlockData = { val file = diskManager.getFile(blockId.name) -val channel = new RandomAccessFile(file, "r").getChannel -Utils.tryWithSafeFinally { - // For small files, directly read rather than memory map - if (file.length < minMemoryMapBytes) { -val buf = ByteBuffer.allocate(file.length.toInt) -channel.position(0) -while (buf.remaining() != 0) { - if (channel.read(buf) == -1) { -throw new IOException("Reached EOF before filling buffer\n" + - s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}") +val blockSize = getSize(blockId) + +securityManager.getIOEncryptionKey() match { + case Some(key) => +// Encrypted blocks cannot be memory mapped; return a special object that does decryption +// and provides InputStream / FileRegion implementations for reading the data. +new EncryptedBlockData(file, blockSize, conf, key) + + case _ => +val channel = new FileInputStream(file).getChannel() +if (blockSize < minMemoryMapBytes) { + // For small files, directly read rather than memory map. + Utils.tryWithSafeFinally { +val buf = ByteBuffer.allocate(blockSize.toInt) +while (buf.remaining() > 0) { + channel.read(buf) +} +buf.flip() +new ByteBufferBlockData(new ChunkedByteBuffer(buf)) + } { +channel.close() + } +} else { + Utils.tryWithSafeFinally { +new ByteBufferBlockData( + new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))) + } { +channel.close() } } -buf.flip() -new ChunkedByteBuffer(buf) - } else { -new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length)) - } -} { - channel.close() } } def remove(blockId: BlockId): Boolean = { val file = diskManager.getFile(blockId.name) -if (file.exists()) { - val ret = file.delete() - if (!ret) { -logWarning(s"Error deleting ${file.getPath()}") +val meta = diskManager.getMetadataFile(blockId) + +def delete(f: File): Boolean = { + if (f.exists()) { +val ret = f.delete() +if (!ret) { + logWarning(s"Error deleting ${file.getPath()}") +} + +ret + } else { +false } - ret -} else { - false } + +delete(file) & delete(meta) } def contains(blockId: BlockId): Boolean = { val file = diskManager.getFile(blockId.name) file.exists() } + + private def openForWrite(file: File): WritableByteChannel = { +val out = new FileOutputStream(file).getChannel() +try { + securityManager.getIOEncryptionKey().map { key => +CryptoStreamUtils.createWritableChannel(out, conf, key) + }.getOrElse(out) +} catch { + case e: Exception => +out.close() +throw e +} + } + +} + +private class EncryptedBlockData( +file: File, +blockSize: Long, +conf: SparkConf, +key: Array[Byte]) extends BlockData { + + override def toInputStream(): InputStream = Channels.newInputStream(open()) + + override def toManagedBuffer(): ManagedBuffer = new EncryptedManagedBuffer() + + override def toByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer = { +val source = open() +try { + var remaining = blockSize + val chunks = new ListBuffer[ByteBuffer]() + while (remaining > 0) { +val chunkSize = math.min(remaining, Int.MaxValue) +val chunk =
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r106964049 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala --- @@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) e } def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = { -put(blockId) { fileOutputStream => - val channel = fileOutputStream.getChannel - Utils.tryWithSafeFinally { -bytes.writeFully(channel) - } { -channel.close() - } +put(blockId) { channel => + bytes.writeFully(channel) } } - def getBytes(blockId: BlockId): ChunkedByteBuffer = { + def getBytes(blockId: BlockId): BlockData = { val file = diskManager.getFile(blockId.name) -val channel = new RandomAccessFile(file, "r").getChannel -Utils.tryWithSafeFinally { - // For small files, directly read rather than memory map - if (file.length < minMemoryMapBytes) { -val buf = ByteBuffer.allocate(file.length.toInt) -channel.position(0) -while (buf.remaining() != 0) { - if (channel.read(buf) == -1) { -throw new IOException("Reached EOF before filling buffer\n" + - s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}") +val blockSize = getSize(blockId) + +securityManager.getIOEncryptionKey() match { + case Some(key) => +// Encrypted blocks cannot be memory mapped; return a special object that does decryption +// and provides InputStream / FileRegion implementations for reading the data. +new EncryptedBlockData(file, blockSize, conf, key) + + case _ => +val channel = new FileInputStream(file).getChannel() +if (blockSize < minMemoryMapBytes) { + // For small files, directly read rather than memory map. + Utils.tryWithSafeFinally { +val buf = ByteBuffer.allocate(blockSize.toInt) +while (buf.remaining() > 0) { + channel.read(buf) +} +buf.flip() +new ByteBufferBlockData(new ChunkedByteBuffer(buf)) + } { +channel.close() + } +} else { + Utils.tryWithSafeFinally { +new ByteBufferBlockData( + new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))) + } { +channel.close() } } -buf.flip() -new ChunkedByteBuffer(buf) - } else { -new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length)) - } -} { - channel.close() } } def remove(blockId: BlockId): Boolean = { val file = diskManager.getFile(blockId.name) -if (file.exists()) { - val ret = file.delete() - if (!ret) { -logWarning(s"Error deleting ${file.getPath()}") +val meta = diskManager.getMetadataFile(blockId) + +def delete(f: File): Boolean = { + if (f.exists()) { +val ret = f.delete() +if (!ret) { + logWarning(s"Error deleting ${file.getPath()}") +} + +ret + } else { +false } - ret -} else { - false } + +delete(file) & delete(meta) } def contains(blockId: BlockId): Boolean = { val file = diskManager.getFile(blockId.name) file.exists() } + + private def openForWrite(file: File): WritableByteChannel = { +val out = new FileOutputStream(file).getChannel() +try { + securityManager.getIOEncryptionKey().map { key => +CryptoStreamUtils.createWritableChannel(out, conf, key) + }.getOrElse(out) +} catch { + case e: Exception => +out.close() --- End diff -- There might be exceptions specific for the commons-crypto library being thrown. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r106962403 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala --- @@ -17,48 +17,61 @@ package org.apache.spark.storage -import java.io.{FileOutputStream, IOException, RandomAccessFile} +import java.io._ import java.nio.ByteBuffer +import java.nio.channels.{Channels, ReadableByteChannel, WritableByteChannel} import java.nio.channels.FileChannel.MapMode +import java.nio.charset.StandardCharsets.UTF_8 -import com.google.common.io.Closeables +import scala.collection.mutable.ListBuffer -import org.apache.spark.SparkConf +import com.google.common.io.{ByteStreams, Closeables, Files} +import io.netty.channel.FileRegion +import io.netty.util.AbstractReferenceCounted + +import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils +import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.security.CryptoStreamUtils +import org.apache.spark.util.{ByteBufferInputStream, Utils} import org.apache.spark.util.io.ChunkedByteBuffer /** * Stores BlockManager blocks on disk. */ -private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) extends Logging { +private[spark] class DiskStore( +conf: SparkConf, +diskManager: DiskBlockManager, +securityManager: SecurityManager) extends Logging { private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m") def getSize(blockId: BlockId): Long = { -diskManager.getFile(blockId.name).length +val file = diskManager.getMetadataFile(blockId) +Files.toString(file, UTF_8).toLong } /** * Invokes the provided callback function to write the specific block. * * @throws IllegalStateException if the block already exists in the disk store. */ - def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = { + def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = { if (contains(blockId)) { throw new IllegalStateException(s"Block $blockId is already present in the disk store") } logDebug(s"Attempting to put block $blockId") val startTime = System.currentTimeMillis val file = diskManager.getFile(blockId) -val fileOutputStream = new FileOutputStream(file) +val out = new CountingWritableChannel(openForWrite(file)) var threwException: Boolean = true try { - writeFunc(fileOutputStream) + writeFunc(out) + Files.write(out.getCount().toString(), diskManager.getMetadataFile(blockId), UTF_8) threwException = false } finally { try { -Closeables.close(fileOutputStream, threwException) +Closeables.close(out, threwException) --- End diff -- This was the previous behavior, but well, doesn't hurt to fix it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r106962007 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala --- @@ -34,6 +34,8 @@ import org.apache.spark.util.{ShutdownHookManager, Utils} */ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean) extends Logging { + private val METADATA_FILE_SUFFIX = ".meta" --- End diff -- Hmm, good point... there's currently no metadata kept in the `DiskStore` class, but then this shouldn't be a lot of data. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r106961480 --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala --- @@ -102,4 +150,34 @@ private[spark] object CryptoStreamUtils extends Logging { } iv } + + /** + * This class is a workaround for CRYPTO-125, that forces all bytes to be written to the --- End diff -- There's a pretty nasty workaround for it in the network library... (the non-blocking workaround is a lot worse than this.) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r106063310 --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala --- @@ -48,12 +50,30 @@ private[spark] object CryptoStreamUtils extends Logging { os: OutputStream, sparkConf: SparkConf, key: Array[Byte]): OutputStream = { -val properties = toCryptoConf(sparkConf) -val iv = createInitializationVector(properties) +val params = new CryptoParams(key, sparkConf) +val iv = createInitializationVector(params.conf) os.write(iv) -val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION) -new CryptoOutputStream(transformationStr, properties, os, - new SecretKeySpec(key, "AES"), new IvParameterSpec(iv)) +new CryptoOutputStream(params.transformation, params.conf, os, params.keySpec, + new IvParameterSpec(iv)) + } + + /** + * Wrap a `WritableByteChannel` for encryption. + */ + def createWritableChannel( + channel: WritableByteChannel, + sparkConf: SparkConf, + key: Array[Byte]): WritableByteChannel = { +val params = new CryptoParams(key, sparkConf) +val iv = createInitializationVector(params.conf) +val buf = ByteBuffer.wrap(iv) +while (buf.remaining() > 0) { --- End diff -- nit: buf.hasRemaining for this pattern of use --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r106779317 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala --- @@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) e } def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = { -put(blockId) { fileOutputStream => - val channel = fileOutputStream.getChannel - Utils.tryWithSafeFinally { -bytes.writeFully(channel) - } { -channel.close() - } +put(blockId) { channel => + bytes.writeFully(channel) } } - def getBytes(blockId: BlockId): ChunkedByteBuffer = { + def getBytes(blockId: BlockId): BlockData = { val file = diskManager.getFile(blockId.name) -val channel = new RandomAccessFile(file, "r").getChannel -Utils.tryWithSafeFinally { - // For small files, directly read rather than memory map - if (file.length < minMemoryMapBytes) { -val buf = ByteBuffer.allocate(file.length.toInt) -channel.position(0) -while (buf.remaining() != 0) { - if (channel.read(buf) == -1) { -throw new IOException("Reached EOF before filling buffer\n" + - s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}") +val blockSize = getSize(blockId) + +securityManager.getIOEncryptionKey() match { + case Some(key) => +// Encrypted blocks cannot be memory mapped; return a special object that does decryption +// and provides InputStream / FileRegion implementations for reading the data. +new EncryptedBlockData(file, blockSize, conf, key) + + case _ => +val channel = new FileInputStream(file).getChannel() +if (blockSize < minMemoryMapBytes) { + // For small files, directly read rather than memory map. + Utils.tryWithSafeFinally { +val buf = ByteBuffer.allocate(blockSize.toInt) +while (buf.remaining() > 0) { + channel.read(buf) --- End diff -- We need to handle case where read() returns EOF (-1) in case of data corruption, file removal from underneath, etc : we will end up in infinite loop otherwise. I might have missed more places where this pattern exists in this change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r106778005 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -56,6 +57,43 @@ private[spark] class BlockResult( val bytes: Long) /** + * Abstracts away how blocks are stored and provides different ways to read the underlying block + * data. The data for a BlockData instance can only be read once, since it may be backed by open + * file descriptors that change state as data is read. + */ +private[spark] trait BlockData { + + def toInputStream(): InputStream + + def toManagedBuffer(): ManagedBuffer + + def toByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer + + def size: Long + + def dispose(): Unit + +} + +private[spark] class ByteBufferBlockData( +val buffer: ChunkedByteBuffer, +autoDispose: Boolean = true) extends BlockData { + + override def toInputStream(): InputStream = buffer.toInputStream(dispose = autoDispose) + + override def toManagedBuffer(): ManagedBuffer = new NettyManagedBuffer(buffer.toNetty) + + override def toByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer = { +buffer.copy(allocator) + } --- End diff -- autoDispose is not honored for toManagedBuffer and toByteBuffer ? On first pass, it looks like it is not ... Also, is the expectation that invoker must manually invoke dispose when not using toInputStream ? Would be good to add a comment about this to BlockData trait detailing the expectation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r106778650 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala --- @@ -79,6 +81,11 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea def getFile(blockId: BlockId): File = getFile(blockId.name) + /** The path of the metadata file for the given block. */ + def getMetadataFile(blockId: BlockId): File = { +new File(getFile(blockId).getAbsolutePath() + METADATA_FILE_SUFFIX) --- End diff -- Would be good to add a note that actual filename for metadata does not match directory hashing used (but depends on the source block). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r106778688 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala --- @@ -17,48 +17,61 @@ package org.apache.spark.storage -import java.io.{FileOutputStream, IOException, RandomAccessFile} +import java.io._ import java.nio.ByteBuffer +import java.nio.channels.{Channels, ReadableByteChannel, WritableByteChannel} import java.nio.channels.FileChannel.MapMode +import java.nio.charset.StandardCharsets.UTF_8 -import com.google.common.io.Closeables +import scala.collection.mutable.ListBuffer -import org.apache.spark.SparkConf +import com.google.common.io.{ByteStreams, Closeables, Files} +import io.netty.channel.FileRegion +import io.netty.util.AbstractReferenceCounted + +import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils +import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.security.CryptoStreamUtils +import org.apache.spark.util.{ByteBufferInputStream, Utils} import org.apache.spark.util.io.ChunkedByteBuffer /** * Stores BlockManager blocks on disk. */ -private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) extends Logging { +private[spark] class DiskStore( +conf: SparkConf, +diskManager: DiskBlockManager, +securityManager: SecurityManager) extends Logging { private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m") def getSize(blockId: BlockId): Long = { -diskManager.getFile(blockId.name).length +val file = diskManager.getMetadataFile(blockId) +Files.toString(file, UTF_8).toLong --- End diff -- Metadata file should be used only when required - otherwise we should avoid their use. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r106778760 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala --- @@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) e } def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = { -put(blockId) { fileOutputStream => - val channel = fileOutputStream.getChannel - Utils.tryWithSafeFinally { -bytes.writeFully(channel) - } { -channel.close() - } +put(blockId) { channel => + bytes.writeFully(channel) } } - def getBytes(blockId: BlockId): ChunkedByteBuffer = { + def getBytes(blockId: BlockId): BlockData = { val file = diskManager.getFile(blockId.name) -val channel = new RandomAccessFile(file, "r").getChannel -Utils.tryWithSafeFinally { - // For small files, directly read rather than memory map - if (file.length < minMemoryMapBytes) { -val buf = ByteBuffer.allocate(file.length.toInt) -channel.position(0) -while (buf.remaining() != 0) { - if (channel.read(buf) == -1) { -throw new IOException("Reached EOF before filling buffer\n" + - s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}") +val blockSize = getSize(blockId) + +securityManager.getIOEncryptionKey() match { + case Some(key) => +// Encrypted blocks cannot be memory mapped; return a special object that does decryption +// and provides InputStream / FileRegion implementations for reading the data. +new EncryptedBlockData(file, blockSize, conf, key) + + case _ => +val channel = new FileInputStream(file).getChannel() +if (blockSize < minMemoryMapBytes) { + // For small files, directly read rather than memory map. + Utils.tryWithSafeFinally { +val buf = ByteBuffer.allocate(blockSize.toInt) +while (buf.remaining() > 0) { + channel.read(buf) +} +buf.flip() +new ByteBufferBlockData(new ChunkedByteBuffer(buf)) + } { +channel.close() + } +} else { + Utils.tryWithSafeFinally { +new ByteBufferBlockData( + new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))) + } { +channel.close() } } -buf.flip() -new ChunkedByteBuffer(buf) - } else { -new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length)) - } -} { - channel.close() } } def remove(blockId: BlockId): Boolean = { val file = diskManager.getFile(blockId.name) -if (file.exists()) { - val ret = file.delete() - if (!ret) { -logWarning(s"Error deleting ${file.getPath()}") +val meta = diskManager.getMetadataFile(blockId) + +def delete(f: File): Boolean = { + if (f.exists()) { +val ret = f.delete() +if (!ret) { + logWarning(s"Error deleting ${file.getPath()}") +} + +ret + } else { +false } - ret -} else { - false } + +delete(file) & delete(meta) } def contains(blockId: BlockId): Boolean = { val file = diskManager.getFile(blockId.name) file.exists() } + + private def openForWrite(file: File): WritableByteChannel = { +val out = new FileOutputStream(file).getChannel() +try { + securityManager.getIOEncryptionKey().map { key => +CryptoStreamUtils.createWritableChannel(out, conf, key) + }.getOrElse(out) +} catch { + case e: Exception => +out.close() --- End diff -- Other than IOException, what other exceptions are being thrown here ? Would be better to catch specific exceptions. At the least, change it to NonFatal if which exceptions thrown are unknown. Also, we would need to delete file here in addition to closing the channel. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r106268712 --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala --- @@ -63,12 +83,40 @@ private[spark] object CryptoStreamUtils extends Logging { is: InputStream, sparkConf: SparkConf, key: Array[Byte]): InputStream = { -val properties = toCryptoConf(sparkConf) val iv = new Array[Byte](IV_LENGTH_IN_BYTES) -is.read(iv, 0, iv.length) -val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION) -new CryptoInputStream(transformationStr, properties, is, - new SecretKeySpec(key, "AES"), new IvParameterSpec(iv)) +var read = 0 +while (read < iv.length) { + val _read = is.read(iv, 0, iv.length) + if (_read < 0) { +throw new EOFException("Failed to read IV from stream.") + } + read += _read +} + +val params = new CryptoParams(key, sparkConf) +new CryptoInputStream(params.transformation, params.conf, is, params.keySpec, + new IvParameterSpec(iv)) + } + + /** + * Wrap a `ReadableByteChannel` for decryption. + */ + def createReadableChannel( + channel: ReadableByteChannel, + sparkConf: SparkConf, + key: Array[Byte]): ReadableByteChannel = { +val iv = new Array[Byte](IV_LENGTH_IN_BYTES) +val buf = ByteBuffer.wrap(iv) +buf.clear() --- End diff -- nit: The clear is not required. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r106264689 --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala --- @@ -63,12 +83,40 @@ private[spark] object CryptoStreamUtils extends Logging { is: InputStream, sparkConf: SparkConf, key: Array[Byte]): InputStream = { -val properties = toCryptoConf(sparkConf) val iv = new Array[Byte](IV_LENGTH_IN_BYTES) -is.read(iv, 0, iv.length) -val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION) -new CryptoInputStream(transformationStr, properties, is, - new SecretKeySpec(key, "AES"), new IvParameterSpec(iv)) +var read = 0 +while (read < iv.length) { + val _read = is.read(iv, 0, iv.length) + if (_read < 0) { +throw new EOFException("Failed to read IV from stream.") + } + read += _read +} --- End diff -- ByteStreams.readFully instead of the loop --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r106778914 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala --- @@ -17,48 +17,61 @@ package org.apache.spark.storage -import java.io.{FileOutputStream, IOException, RandomAccessFile} +import java.io._ import java.nio.ByteBuffer +import java.nio.channels.{Channels, ReadableByteChannel, WritableByteChannel} import java.nio.channels.FileChannel.MapMode +import java.nio.charset.StandardCharsets.UTF_8 -import com.google.common.io.Closeables +import scala.collection.mutable.ListBuffer -import org.apache.spark.SparkConf +import com.google.common.io.{ByteStreams, Closeables, Files} +import io.netty.channel.FileRegion +import io.netty.util.AbstractReferenceCounted + +import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils +import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.security.CryptoStreamUtils +import org.apache.spark.util.{ByteBufferInputStream, Utils} import org.apache.spark.util.io.ChunkedByteBuffer /** * Stores BlockManager blocks on disk. */ -private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) extends Logging { +private[spark] class DiskStore( +conf: SparkConf, +diskManager: DiskBlockManager, +securityManager: SecurityManager) extends Logging { private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m") def getSize(blockId: BlockId): Long = { -diskManager.getFile(blockId.name).length +val file = diskManager.getMetadataFile(blockId) +Files.toString(file, UTF_8).toLong } /** * Invokes the provided callback function to write the specific block. * * @throws IllegalStateException if the block already exists in the disk store. */ - def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = { + def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = { if (contains(blockId)) { throw new IllegalStateException(s"Block $blockId is already present in the disk store") } logDebug(s"Attempting to put block $blockId") val startTime = System.currentTimeMillis val file = diskManager.getFile(blockId) -val fileOutputStream = new FileOutputStream(file) +val out = new CountingWritableChannel(openForWrite(file)) var threwException: Boolean = true try { - writeFunc(fileOutputStream) + writeFunc(out) + Files.write(out.getCount().toString(), diskManager.getMetadataFile(blockId), UTF_8) --- End diff -- Is there any reason to keep this in string form ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r106778813 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala --- @@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) e } def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = { -put(blockId) { fileOutputStream => - val channel = fileOutputStream.getChannel - Utils.tryWithSafeFinally { -bytes.writeFully(channel) - } { -channel.close() - } +put(blockId) { channel => + bytes.writeFully(channel) } } - def getBytes(blockId: BlockId): ChunkedByteBuffer = { + def getBytes(blockId: BlockId): BlockData = { val file = diskManager.getFile(blockId.name) -val channel = new RandomAccessFile(file, "r").getChannel -Utils.tryWithSafeFinally { - // For small files, directly read rather than memory map - if (file.length < minMemoryMapBytes) { -val buf = ByteBuffer.allocate(file.length.toInt) -channel.position(0) -while (buf.remaining() != 0) { - if (channel.read(buf) == -1) { -throw new IOException("Reached EOF before filling buffer\n" + - s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}") +val blockSize = getSize(blockId) + +securityManager.getIOEncryptionKey() match { + case Some(key) => +// Encrypted blocks cannot be memory mapped; return a special object that does decryption +// and provides InputStream / FileRegion implementations for reading the data. +new EncryptedBlockData(file, blockSize, conf, key) + + case _ => +val channel = new FileInputStream(file).getChannel() +if (blockSize < minMemoryMapBytes) { + // For small files, directly read rather than memory map. + Utils.tryWithSafeFinally { +val buf = ByteBuffer.allocate(blockSize.toInt) +while (buf.remaining() > 0) { + channel.read(buf) +} +buf.flip() +new ByteBufferBlockData(new ChunkedByteBuffer(buf)) + } { +channel.close() + } +} else { + Utils.tryWithSafeFinally { +new ByteBufferBlockData( + new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))) + } { +channel.close() } } -buf.flip() -new ChunkedByteBuffer(buf) - } else { -new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length)) - } -} { - channel.close() } } def remove(blockId: BlockId): Boolean = { val file = diskManager.getFile(blockId.name) -if (file.exists()) { - val ret = file.delete() - if (!ret) { -logWarning(s"Error deleting ${file.getPath()}") +val meta = diskManager.getMetadataFile(blockId) + +def delete(f: File): Boolean = { + if (f.exists()) { +val ret = f.delete() +if (!ret) { + logWarning(s"Error deleting ${file.getPath()}") +} + +ret + } else { +false } - ret -} else { - false } + +delete(file) & delete(meta) } def contains(blockId: BlockId): Boolean = { val file = diskManager.getFile(blockId.name) file.exists() } + + private def openForWrite(file: File): WritableByteChannel = { +val out = new FileOutputStream(file).getChannel() +try { + securityManager.getIOEncryptionKey().map { key => +CryptoStreamUtils.createWritableChannel(out, conf, key) + }.getOrElse(out) +} catch { + case e: Exception => +out.close() +throw e +} + } + +} + +private class EncryptedBlockData( +file: File, +blockSize: Long, +conf: SparkConf, +key: Array[Byte]) extends BlockData { + + override def toInputStream(): InputStream = Channels.newInputStream(open()) + + override def toManagedBuffer(): ManagedBuffer = new EncryptedManagedBuffer() + + override def toByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer = { +val source = open() +try { + var remaining = blockSize + val chunks = new ListBuffer[ByteBuffer]() + while (remaining > 0) { +val chunkSize = math.min(remaining, Int.MaxValue) +val chunk =
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r106778932 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala --- @@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) e } def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = { -put(blockId) { fileOutputStream => - val channel = fileOutputStream.getChannel - Utils.tryWithSafeFinally { -bytes.writeFully(channel) - } { -channel.close() - } +put(blockId) { channel => + bytes.writeFully(channel) --- End diff -- Utils.tryWithSafeFinally with close is required ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r106779457 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala --- @@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) e } def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = { -put(blockId) { fileOutputStream => - val channel = fileOutputStream.getChannel - Utils.tryWithSafeFinally { -bytes.writeFully(channel) - } { -channel.close() - } +put(blockId) { channel => + bytes.writeFully(channel) } } - def getBytes(blockId: BlockId): ChunkedByteBuffer = { + def getBytes(blockId: BlockId): BlockData = { val file = diskManager.getFile(blockId.name) -val channel = new RandomAccessFile(file, "r").getChannel -Utils.tryWithSafeFinally { - // For small files, directly read rather than memory map - if (file.length < minMemoryMapBytes) { -val buf = ByteBuffer.allocate(file.length.toInt) -channel.position(0) -while (buf.remaining() != 0) { - if (channel.read(buf) == -1) { -throw new IOException("Reached EOF before filling buffer\n" + - s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}") +val blockSize = getSize(blockId) + +securityManager.getIOEncryptionKey() match { + case Some(key) => +// Encrypted blocks cannot be memory mapped; return a special object that does decryption +// and provides InputStream / FileRegion implementations for reading the data. +new EncryptedBlockData(file, blockSize, conf, key) + + case _ => +val channel = new FileInputStream(file).getChannel() +if (blockSize < minMemoryMapBytes) { + // For small files, directly read rather than memory map. + Utils.tryWithSafeFinally { +val buf = ByteBuffer.allocate(blockSize.toInt) +while (buf.remaining() > 0) { + channel.read(buf) +} +buf.flip() +new ByteBufferBlockData(new ChunkedByteBuffer(buf)) + } { +channel.close() + } +} else { + Utils.tryWithSafeFinally { +new ByteBufferBlockData( + new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))) + } { +channel.close() } } -buf.flip() -new ChunkedByteBuffer(buf) - } else { -new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length)) - } -} { - channel.close() } } def remove(blockId: BlockId): Boolean = { val file = diskManager.getFile(blockId.name) -if (file.exists()) { - val ret = file.delete() - if (!ret) { -logWarning(s"Error deleting ${file.getPath()}") +val meta = diskManager.getMetadataFile(blockId) + +def delete(f: File): Boolean = { + if (f.exists()) { +val ret = f.delete() +if (!ret) { + logWarning(s"Error deleting ${file.getPath()}") +} + +ret + } else { +false } - ret -} else { - false } + +delete(file) & delete(meta) } def contains(blockId: BlockId): Boolean = { val file = diskManager.getFile(blockId.name) file.exists() } + + private def openForWrite(file: File): WritableByteChannel = { +val out = new FileOutputStream(file).getChannel() +try { + securityManager.getIOEncryptionKey().map { key => +CryptoStreamUtils.createWritableChannel(out, conf, key) + }.getOrElse(out) +} catch { + case e: Exception => +out.close() +throw e +} + } + +} + +private class EncryptedBlockData( +file: File, +blockSize: Long, +conf: SparkConf, +key: Array[Byte]) extends BlockData { + + override def toInputStream(): InputStream = Channels.newInputStream(open()) + + override def toManagedBuffer(): ManagedBuffer = new EncryptedManagedBuffer() + + override def toByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer = { +val source = open() +try { + var remaining = blockSize + val chunks = new ListBuffer[ByteBuffer]() + while (remaining > 0) { +val chunkSize = math.min(remaining, Int.MaxValue) +val chunk =
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r106779213 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala --- @@ -17,48 +17,61 @@ package org.apache.spark.storage -import java.io.{FileOutputStream, IOException, RandomAccessFile} +import java.io._ import java.nio.ByteBuffer +import java.nio.channels.{Channels, ReadableByteChannel, WritableByteChannel} import java.nio.channels.FileChannel.MapMode +import java.nio.charset.StandardCharsets.UTF_8 -import com.google.common.io.Closeables +import scala.collection.mutable.ListBuffer -import org.apache.spark.SparkConf +import com.google.common.io.{ByteStreams, Closeables, Files} +import io.netty.channel.FileRegion +import io.netty.util.AbstractReferenceCounted + +import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.internal.Logging -import org.apache.spark.util.Utils +import org.apache.spark.network.buffer.ManagedBuffer +import org.apache.spark.security.CryptoStreamUtils +import org.apache.spark.util.{ByteBufferInputStream, Utils} import org.apache.spark.util.io.ChunkedByteBuffer /** * Stores BlockManager blocks on disk. */ -private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) extends Logging { +private[spark] class DiskStore( +conf: SparkConf, +diskManager: DiskBlockManager, +securityManager: SecurityManager) extends Logging { private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m") def getSize(blockId: BlockId): Long = { -diskManager.getFile(blockId.name).length +val file = diskManager.getMetadataFile(blockId) +Files.toString(file, UTF_8).toLong } /** * Invokes the provided callback function to write the specific block. * * @throws IllegalStateException if the block already exists in the disk store. */ - def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = { + def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = { if (contains(blockId)) { throw new IllegalStateException(s"Block $blockId is already present in the disk store") } logDebug(s"Attempting to put block $blockId") val startTime = System.currentTimeMillis val file = diskManager.getFile(blockId) -val fileOutputStream = new FileOutputStream(file) +val out = new CountingWritableChannel(openForWrite(file)) var threwException: Boolean = true try { - writeFunc(fileOutputStream) + writeFunc(out) + Files.write(out.getCount().toString(), diskManager.getMetadataFile(blockId), UTF_8) threwException = false } finally { try { -Closeables.close(fileOutputStream, threwException) +Closeables.close(out, threwException) --- End diff -- IOException can be thrown in close(), we will need to remove block (and meta) in that case as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r106779546 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala --- @@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) e } def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = { -put(blockId) { fileOutputStream => - val channel = fileOutputStream.getChannel - Utils.tryWithSafeFinally { -bytes.writeFully(channel) - } { -channel.close() - } +put(blockId) { channel => + bytes.writeFully(channel) } } - def getBytes(blockId: BlockId): ChunkedByteBuffer = { + def getBytes(blockId: BlockId): BlockData = { val file = diskManager.getFile(blockId.name) -val channel = new RandomAccessFile(file, "r").getChannel -Utils.tryWithSafeFinally { - // For small files, directly read rather than memory map - if (file.length < minMemoryMapBytes) { -val buf = ByteBuffer.allocate(file.length.toInt) -channel.position(0) -while (buf.remaining() != 0) { - if (channel.read(buf) == -1) { -throw new IOException("Reached EOF before filling buffer\n" + - s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}") +val blockSize = getSize(blockId) + +securityManager.getIOEncryptionKey() match { + case Some(key) => +// Encrypted blocks cannot be memory mapped; return a special object that does decryption +// and provides InputStream / FileRegion implementations for reading the data. +new EncryptedBlockData(file, blockSize, conf, key) + + case _ => +val channel = new FileInputStream(file).getChannel() +if (blockSize < minMemoryMapBytes) { + // For small files, directly read rather than memory map. + Utils.tryWithSafeFinally { +val buf = ByteBuffer.allocate(blockSize.toInt) +while (buf.remaining() > 0) { + channel.read(buf) +} +buf.flip() +new ByteBufferBlockData(new ChunkedByteBuffer(buf)) + } { +channel.close() + } +} else { + Utils.tryWithSafeFinally { +new ByteBufferBlockData( + new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))) + } { +channel.close() } } -buf.flip() -new ChunkedByteBuffer(buf) - } else { -new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length)) - } -} { - channel.close() } } def remove(blockId: BlockId): Boolean = { val file = diskManager.getFile(blockId.name) -if (file.exists()) { - val ret = file.delete() - if (!ret) { -logWarning(s"Error deleting ${file.getPath()}") +val meta = diskManager.getMetadataFile(blockId) + +def delete(f: File): Boolean = { + if (f.exists()) { +val ret = f.delete() +if (!ret) { + logWarning(s"Error deleting ${file.getPath()}") +} + +ret + } else { +false } - ret -} else { - false } + +delete(file) & delete(meta) } def contains(blockId: BlockId): Boolean = { val file = diskManager.getFile(blockId.name) file.exists() } + + private def openForWrite(file: File): WritableByteChannel = { +val out = new FileOutputStream(file).getChannel() +try { + securityManager.getIOEncryptionKey().map { key => +CryptoStreamUtils.createWritableChannel(out, conf, key) + }.getOrElse(out) +} catch { + case e: Exception => +out.close() +throw e +} + } + +} + +private class EncryptedBlockData( +file: File, +blockSize: Long, +conf: SparkConf, +key: Array[Byte]) extends BlockData { + + override def toInputStream(): InputStream = Channels.newInputStream(open()) + + override def toManagedBuffer(): ManagedBuffer = new EncryptedManagedBuffer() + + override def toByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer = { +val source = open() +try { + var remaining = blockSize + val chunks = new ListBuffer[ByteBuffer]() + while (remaining > 0) { +val chunkSize = math.min(remaining, Int.MaxValue) +val chunk =
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r106779004 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala --- @@ -34,6 +34,8 @@ import org.apache.spark.util.{ShutdownHookManager, Utils} */ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean) extends Logging { + private val METADATA_FILE_SUFFIX = ".meta" --- End diff -- Assuming I am not missing something, shuffle does not use (require) block length from meta file. If yes, for all others, why not simply keep the block size in memory ? On executor failure, the on disk block is lost anyway, and we already maintain block info for each block in executor. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r106269093 --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala --- @@ -102,4 +150,34 @@ private[spark] object CryptoStreamUtils extends Logging { } iv } + + /** + * This class is a workaround for CRYPTO-125, that forces all bytes to be written to the --- End diff -- This is a lousy bug ! Good thing that we dont seem to be hit by it (yet). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user mridulm commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r10677 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala --- @@ -94,7 +101,11 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea } }.filter(_ != null).flatMap { dir => val files = dir.listFiles() - if (files != null) files else Seq.empty + if (files != null) { +files.filter(!_.getName().endsWith(METADATA_FILE_SUFFIX)) + } else { +Seq.empty + } --- End diff -- getAllFiles should be returning all files stored. For example, test suite's use getAllFiles to remove all files using this api. On other hand, getAllBlocks needs to ensure metadata file's are filtered out - this check should be moved there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r106751157 --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala --- @@ -63,12 +83,40 @@ private[spark] object CryptoStreamUtils extends Logging { is: InputStream, sparkConf: SparkConf, key: Array[Byte]): InputStream = { -val properties = toCryptoConf(sparkConf) val iv = new Array[Byte](IV_LENGTH_IN_BYTES) -is.read(iv, 0, iv.length) -val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION) -new CryptoInputStream(transformationStr, properties, is, - new SecretKeySpec(key, "AES"), new IvParameterSpec(iv)) +var read = 0 +while (read < iv.length) { --- End diff -- Ah, missed that one. +1 for shorter code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r106749569 --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala --- @@ -63,12 +83,40 @@ private[spark] object CryptoStreamUtils extends Logging { is: InputStream, sparkConf: SparkConf, key: Array[Byte]): InputStream = { -val properties = toCryptoConf(sparkConf) val iv = new Array[Byte](IV_LENGTH_IN_BYTES) -is.read(iv, 0, iv.length) -val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION) -new CryptoInputStream(transformationStr, properties, is, - new SecretKeySpec(key, "AES"), new IvParameterSpec(iv)) +var read = 0 +while (read < iv.length) { --- End diff -- Yeah, you can just use `ByteStreams.readFully(is, iv)`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r106691642 --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala --- @@ -63,12 +83,40 @@ private[spark] object CryptoStreamUtils extends Logging { is: InputStream, sparkConf: SparkConf, key: Array[Byte]): InputStream = { -val properties = toCryptoConf(sparkConf) val iv = new Array[Byte](IV_LENGTH_IN_BYTES) -is.read(iv, 0, iv.length) -val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION) -new CryptoInputStream(transformationStr, properties, is, - new SecretKeySpec(key, "AES"), new IvParameterSpec(iv)) +var read = 0 +while (read < iv.length) { --- End diff -- It avoids issues with short reads. It's unlikely to happen but I always write read code like this to be safe. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r106691384 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala --- @@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) e } def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = { -put(blockId) { fileOutputStream => - val channel = fileOutputStream.getChannel - Utils.tryWithSafeFinally { -bytes.writeFully(channel) - } { -channel.close() - } +put(blockId) { channel => + bytes.writeFully(channel) } } - def getBytes(blockId: BlockId): ChunkedByteBuffer = { + def getBytes(blockId: BlockId): BlockData = { val file = diskManager.getFile(blockId.name) -val channel = new RandomAccessFile(file, "r").getChannel -Utils.tryWithSafeFinally { - // For small files, directly read rather than memory map - if (file.length < minMemoryMapBytes) { -val buf = ByteBuffer.allocate(file.length.toInt) -channel.position(0) -while (buf.remaining() != 0) { - if (channel.read(buf) == -1) { -throw new IOException("Reached EOF before filling buffer\n" + - s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}") +val blockSize = getSize(blockId) + +securityManager.getIOEncryptionKey() match { + case Some(key) => +// Encrypted blocks cannot be memory mapped; return a special object that does decryption +// and provides InputStream / FileRegion implementations for reading the data. +new EncryptedBlockData(file, blockSize, conf, key) + + case _ => +val channel = new FileInputStream(file).getChannel() +if (blockSize < minMemoryMapBytes) { + // For small files, directly read rather than memory map. + Utils.tryWithSafeFinally { +val buf = ByteBuffer.allocate(blockSize.toInt) +while (buf.remaining() > 0) { + channel.read(buf) +} +buf.flip() +new ByteBufferBlockData(new ChunkedByteBuffer(buf)) + } { +channel.close() + } +} else { + Utils.tryWithSafeFinally { +new ByteBufferBlockData( + new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))) + } { +channel.close() } } -buf.flip() -new ChunkedByteBuffer(buf) - } else { -new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length)) - } -} { - channel.close() } } def remove(blockId: BlockId): Boolean = { val file = diskManager.getFile(blockId.name) -if (file.exists()) { - val ret = file.delete() - if (!ret) { -logWarning(s"Error deleting ${file.getPath()}") +val meta = diskManager.getMetadataFile(blockId) + +def delete(f: File): Boolean = { + if (f.exists()) { +val ret = f.delete() +if (!ret) { + logWarning(s"Error deleting ${file.getPath()}") +} + +ret + } else { +false } - ret -} else { - false } + +delete(file) & delete(meta) } def contains(blockId: BlockId): Boolean = { val file = diskManager.getFile(blockId.name) file.exists() } + + private def openForWrite(file: File): WritableByteChannel = { +val out = new FileOutputStream(file).getChannel() +try { + securityManager.getIOEncryptionKey().map { key => +CryptoStreamUtils.createWritableChannel(out, conf, key) + }.getOrElse(out) +} catch { + case e: Exception => +out.close() +throw e +} + } + +} + +private class EncryptedBlockData( +file: File, +blockSize: Long, +conf: SparkConf, +key: Array[Byte]) extends BlockData { + + override def toInputStream(): InputStream = Channels.newInputStream(open()) + + override def toManagedBuffer(): ManagedBuffer = new EncryptedManagedBuffer() + + override def toByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer = { +val source = open() +try { + var remaining = blockSize + val chunks = new ListBuffer[ByteBuffer]() + while (remaining > 0) { +val chunkSize = math.min(remaining, Int.MaxValue) +val chunk =
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r106691863 --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala --- @@ -102,4 +150,34 @@ private[spark] object CryptoStreamUtils extends Logging { } iv } + + /** + * This class is a workaround for CRYPTO-125, that forces all bytes to be written to the + * underlying channel. Since the callers of this API are using blocking I/O, there are no + * concerns with regards to CPU usage here. --- End diff -- No. As the comment states, it's a workaround for a bug in the commons-crypto library, which would affect the code being added. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r106588563 --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala --- @@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) e } def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = { -put(blockId) { fileOutputStream => - val channel = fileOutputStream.getChannel - Utils.tryWithSafeFinally { -bytes.writeFully(channel) - } { -channel.close() - } +put(blockId) { channel => + bytes.writeFully(channel) } } - def getBytes(blockId: BlockId): ChunkedByteBuffer = { + def getBytes(blockId: BlockId): BlockData = { val file = diskManager.getFile(blockId.name) -val channel = new RandomAccessFile(file, "r").getChannel -Utils.tryWithSafeFinally { - // For small files, directly read rather than memory map - if (file.length < minMemoryMapBytes) { -val buf = ByteBuffer.allocate(file.length.toInt) -channel.position(0) -while (buf.remaining() != 0) { - if (channel.read(buf) == -1) { -throw new IOException("Reached EOF before filling buffer\n" + - s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}") +val blockSize = getSize(blockId) + +securityManager.getIOEncryptionKey() match { + case Some(key) => +// Encrypted blocks cannot be memory mapped; return a special object that does decryption +// and provides InputStream / FileRegion implementations for reading the data. +new EncryptedBlockData(file, blockSize, conf, key) + + case _ => +val channel = new FileInputStream(file).getChannel() +if (blockSize < minMemoryMapBytes) { + // For small files, directly read rather than memory map. + Utils.tryWithSafeFinally { +val buf = ByteBuffer.allocate(blockSize.toInt) +while (buf.remaining() > 0) { + channel.read(buf) +} +buf.flip() +new ByteBufferBlockData(new ChunkedByteBuffer(buf)) + } { +channel.close() + } +} else { + Utils.tryWithSafeFinally { +new ByteBufferBlockData( + new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))) + } { +channel.close() } } -buf.flip() -new ChunkedByteBuffer(buf) - } else { -new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length)) - } -} { - channel.close() } } def remove(blockId: BlockId): Boolean = { val file = diskManager.getFile(blockId.name) -if (file.exists()) { - val ret = file.delete() - if (!ret) { -logWarning(s"Error deleting ${file.getPath()}") +val meta = diskManager.getMetadataFile(blockId) + +def delete(f: File): Boolean = { + if (f.exists()) { +val ret = f.delete() +if (!ret) { + logWarning(s"Error deleting ${file.getPath()}") +} + +ret + } else { +false } - ret -} else { - false } + +delete(file) & delete(meta) } def contains(blockId: BlockId): Boolean = { val file = diskManager.getFile(blockId.name) file.exists() } + + private def openForWrite(file: File): WritableByteChannel = { +val out = new FileOutputStream(file).getChannel() +try { + securityManager.getIOEncryptionKey().map { key => +CryptoStreamUtils.createWritableChannel(out, conf, key) + }.getOrElse(out) +} catch { + case e: Exception => +out.close() +throw e +} + } + +} + +private class EncryptedBlockData( +file: File, +blockSize: Long, +conf: SparkConf, +key: Array[Byte]) extends BlockData { + + override def toInputStream(): InputStream = Channels.newInputStream(open()) + + override def toManagedBuffer(): ManagedBuffer = new EncryptedManagedBuffer() + + override def toByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer = { +val source = open() +try { + var remaining = blockSize + val chunks = new ListBuffer[ByteBuffer]() + while (remaining > 0) { +val chunkSize = math.min(remaining, Int.MaxValue) +val chunk =
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r106587687 --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala --- @@ -1235,7 +1251,7 @@ private[spark] class BlockManager( peer.port, peer.executorId, blockId, - new NettyManagedBuffer(data.toNetty), + new BlockManagerManagedBuffer(blockInfoManager, blockId, data.toManagedBuffer()), --- End diff -- why this change? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r106587428 --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala --- @@ -102,4 +150,34 @@ private[spark] object CryptoStreamUtils extends Logging { } iv } + + /** + * This class is a workaround for CRYPTO-125, that forces all bytes to be written to the + * underlying channel. Since the callers of this API are using blocking I/O, there are no + * concerns with regards to CPU usage here. --- End diff -- is it a separated bug fix? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/17295#discussion_r106587322 --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala --- @@ -63,12 +83,40 @@ private[spark] object CryptoStreamUtils extends Logging { is: InputStream, sparkConf: SparkConf, key: Array[Byte]): InputStream = { -val properties = toCryptoConf(sparkConf) val iv = new Array[Byte](IV_LENGTH_IN_BYTES) -is.read(iv, 0, iv.length) -val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION) -new CryptoInputStream(transformationStr, properties, is, - new SecretKeySpec(key, "AES"), new IvParameterSpec(iv)) +var read = 0 +while (read < iv.length) { --- End diff -- what does this while loop do? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...
GitHub user vanzin opened a pull request: https://github.com/apache/spark/pull/17295 [SPARK-19556][core] Do not encrypt block manager data in memory. This change modifies the way block data is encrypted to make the more common cases faster, while penalizing an edge case. As a side effect of the change, all data that goes through the block manager is now encrypted only when needed, including the previous path (broadcast variables) where that did not happen. The way the change works is by not encrypting data that is stored in memory; so if a serialized block is in memory, it will only be encrypted once it is evicted to disk. The penalty comes when transferring that encrypted data from disk. If the data ends up in memory again, it is as efficient as before; but if the evicted block needs to be transferred directly to a remote executor, then there's now a performance penalty, since the code now uses a custom FileRegion implementation to decrypt the data before transferring. This also means that block data transferred between executors now is not encrypted (and thus relies on the network library encryption support for secrecy). Shuffle blocks are still transferred in encrypted form, since they're handled in a slightly different way by the code. This also keeps compatibility with existing external shuffle services, which transfer encrypted shuffle blocks, and avoids having to make the external service aware of encryption at all. Another change in the disk store is that it now stores a tiny metadata file next to the file holding the block data; this is needed to accurately account for the decrypted block size, which may be significantly different from the size of the encrypted file on disk. The serialization and deserialization APIs in the SerializerManager now do not do encryption automatically; callers need to explicitly wrap their streams with an appropriate crypto stream before using those. As a result of these changes, some of the workarounds added in SPARK-19520 are removed here. Testing: a new trait ("EncryptionFunSuite") was added that provides an easy way to run a test twice, with encryption on and off; broadcast, block manager and caching tests were modified to use this new trait so that the existing tests exercise both encrypted and non-encrypted paths. I also ran some applications with encryption turned on to verify that they still work, including streaming tests that failed without the fix for SPARK-19520. You can merge this pull request into a Git repository by running: $ git pull https://github.com/vanzin/spark SPARK-19556 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17295.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17295 commit 3aa752f9becdfe0e35a47d731736d942e3e5b3bf Author: Marcelo VanzinDate: 2017-02-10T23:59:51Z [SPARK-19556][core] Do not encrypt block manager data in memory. This change modifies the way block data is encrypted to make the more common cases faster, while penalizing an edge case. As a side effect of the change, all data that goes through the block manager is now encrypted only when needed, including the previous path (broadcast variables) where that did not happen. The way the change works is by not encrypting data that is stored in memory; so if a serialized block is in memory, it will only be encrypted once it is evicted to disk. The penalty comes when transferring that encrypted data from disk. If the data ends up in memory again, it is as efficient as before; but if the evicted block needs to be transferred directly to a remote executor, then there's now a performance penalty, since the code now uses a custom FileRegion implementation to decrypt the data before transferring. This also means that block data transferred between executors now is not encrypted (and thus relies on the network library encryption support for secrecy). Shuffle blocks are still transferred in encrypted form, since they're handled in a slightly different way by the code. This also keeps compatibility with existing external shuffle services, which transfer encrypted shuffle blocks, and avoids having to make the external service aware of encryption at all. Another change in the disk store is that it now stores a tiny metadata file next to the file holding the block data; this is needed to accurately account for the decrypted block size, which may be significantly different from the size of the encrypted file on disk. The serialization and deserialization APIs in the