This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b01dce2b2a5 [SPARK-45378][CORE] Add convertToNettyForSsl to ManagedBuffer b01dce2b2a5 is described below commit b01dce2b2a57b933283d6fd350aa917d3cd76d83 Author: Hasnain Lakhani <hasnain.lakh...@databricks.com> AuthorDate: Mon Oct 2 22:56:03 2023 -0500 [SPARK-45378][CORE] Add convertToNettyForSsl to ManagedBuffer ### What changes were proposed in this pull request? As the title suggests. In addition to that API, add a config to the `TransportConf` to configure the default block size if desired. ### Why are the changes needed? Netty's SSL support does not support zero-copy transfers. In order to support SSL using Netty we need to add another API to the `ManagedBuffer` which lets buffers return a different data type. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? CI. This will have tests added later - it's tested as part of https://github.com/apache/spark/pull/42685 from which this is split out. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43166 from hasnain-db/spark-tls-buffers. Authored-by: Hasnain Lakhani <hasnain.lakh...@databricks.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> --- .../network/buffer/FileSegmentManagedBuffer.java | 7 +++++++ .../apache/spark/network/buffer/ManagedBuffer.java | 14 ++++++++++++++ .../spark/network/buffer/NettyManagedBuffer.java | 5 +++++ .../spark/network/buffer/NioManagedBuffer.java | 5 +++++ .../org/apache/spark/network/util/TransportConf.java | 8 ++++++++ .../org/apache/spark/network/TestManagedBuffer.java | 5 +++++ .../org/apache/spark/storage/BlockManager.scala | 9 +++++++++ .../spark/storage/BlockManagerManagedBuffer.scala | 2 ++ .../scala/org/apache/spark/storage/DiskStore.scala | 13 +++++++++++++ .../org/apache/spark/util/io/ChunkedByteBuffer.scala | 20 ++++++++++++++++++++ .../spark/network/BlockTransferServiceSuite.scala | 2 ++ .../spark/shuffle/BlockStoreShuffleReaderSuite.scala | 1 + 12 files changed, 91 insertions(+) diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java index 66566b67870..dd7c2061ec9 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java @@ -28,6 +28,7 @@ import java.nio.file.StandardOpenOption; import com.google.common.io.ByteStreams; import io.netty.channel.DefaultFileRegion; +import io.netty.handler.stream.ChunkedStream; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; @@ -137,6 +138,12 @@ public final class FileSegmentManagedBuffer extends ManagedBuffer { } } + @Override + public Object convertToNettyForSsl() throws IOException { + // Cannot use zero-copy with HTTPS + return new ChunkedStream(createInputStream(), conf.sslShuffleChunkSize()); + } + public File getFile() { return file; } public long getOffset() { return offset; } diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java index 4dd8cec2900..893aa106a3f 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java @@ -75,4 +75,18 @@ public abstract class ManagedBuffer { * the caller will be responsible for releasing this new reference. */ public abstract Object convertToNetty() throws IOException; + + /** + * Convert the buffer into a Netty object, used to write the data out with SSL encryption, + * which cannot use {@link io.netty.channel.FileRegion}. + * The return value is either a {@link io.netty.buffer.ByteBuf}, + * a {@link io.netty.handler.stream.ChunkedStream}, or a {@link java.io.InputStream}. + * + * If this method returns a ByteBuf, then that buffer's reference count will be incremented and + * the caller will be responsible for releasing this new reference. + * + * Once `kernel.ssl.sendfile` and OpenSSL's `ssl_sendfile` are more widely adopted (and supported + * in Netty), we can potentially deprecate these APIs and just use `convertToNetty`. + */ + public abstract Object convertToNettyForSsl() throws IOException; } diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java index b42977c7cb7..a40cfc8bc04 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/NettyManagedBuffer.java @@ -68,6 +68,11 @@ public class NettyManagedBuffer extends ManagedBuffer { return buf.duplicate().retain(); } + @Override + public Object convertToNettyForSsl() throws IOException { + return buf.duplicate().retain(); + } + @Override public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java index 084f89d2611..6eb8d4e2c73 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/NioManagedBuffer.java @@ -66,6 +66,11 @@ public class NioManagedBuffer extends ManagedBuffer { return Unpooled.wrappedBuffer(buf); } + @Override + public Object convertToNettyForSsl() throws IOException { + return Unpooled.wrappedBuffer(buf); + } + @Override public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index 2794883f3cf..b8d8f6b85a4 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -249,6 +249,14 @@ public class TransportConf { return conf.getBoolean("spark.network.sasl.serverAlwaysEncrypt", false); } + /** + * When Secure (SSL/TLS) Shuffle is enabled, the Chunk size to use for shuffling files. + */ + public int sslShuffleChunkSize() { + return Ints.checkedCast(JavaUtils.byteStringAsBytes( + conf.get("spark.network.ssl.maxEncryptedBlockSize", "64k"))); + } + /** * Flag indicating whether to share the pooled ByteBuf allocators between the different Netty * channels. If enabled then only two pooled ByteBuf allocators are created: one where caching diff --git a/common/network-common/src/test/java/org/apache/spark/network/TestManagedBuffer.java b/common/network-common/src/test/java/org/apache/spark/network/TestManagedBuffer.java index 83c90f9eff2..1814634fb92 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/TestManagedBuffer.java +++ b/common/network-common/src/test/java/org/apache/spark/network/TestManagedBuffer.java @@ -80,6 +80,11 @@ public class TestManagedBuffer extends ManagedBuffer { return underlying.convertToNetty(); } + @Override + public Object convertToNettyForSsl() throws IOException { + return underlying.convertToNettyForSsl(); + } + @Override public int hashCode() { return underlying.hashCode(); diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index cccee78aee1..a6962c46243 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -85,6 +85,13 @@ private[spark] trait BlockData { */ def toNetty(): Object + /** + * Returns a Netty-friendly wrapper for the block's data. + * + * Please see `ManagedBuffer.convertToNettyForSsl()` for more details. + */ + def toNettyForSsl(): Object + def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer def toByteBuffer(): ByteBuffer @@ -103,6 +110,8 @@ private[spark] class ByteBufferBlockData( override def toNetty(): Object = buffer.toNetty + override def toNettyForSsl(): AnyRef = buffer.toNettyForSsl + override def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer = { buffer.copy(allocator) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala index 5c12b5cee4d..cab11536e14 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala @@ -51,6 +51,8 @@ private[storage] class BlockManagerManagedBuffer( override def convertToNetty(): Object = data.toNetty() + override def convertToNettyForSsl(): Object = data.toNettyForSsl() + override def retain(): ManagedBuffer = { refCount.incrementAndGet() val locked = blockInfoManager.lockForReading(blockId, blocking = false) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 1cb5adef5f4..54c5d0b2dce 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -184,6 +184,14 @@ private class DiskBlockData( */ override def toNetty(): AnyRef = new DefaultFileRegion(file, 0, size) + /** + * Returns a Netty-friendly wrapper for the block's data. + * + * Please see `ManagedBuffer.convertToNettyForSsl()` for more details. + */ + override def toNettyForSsl(): AnyRef = + toChunkedByteBuffer(ByteBuffer.allocate).toNettyForSsl + override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): ChunkedByteBuffer = { Utils.tryWithResource(open()) { channel => var remaining = blockSize @@ -234,6 +242,9 @@ private[spark] class EncryptedBlockData( override def toNetty(): Object = new ReadableChannelFileRegion(open(), blockSize) + override def toNettyForSsl(): AnyRef = + toChunkedByteBuffer(ByteBuffer.allocate).toNettyForSsl + override def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer = { val source = open() try { @@ -297,6 +308,8 @@ private[spark] class EncryptedManagedBuffer( override def convertToNetty(): AnyRef = blockData.toNetty() + override def convertToNettyForSsl(): AnyRef = blockData.toNettyForSsl() + override def createInputStream(): InputStream = blockData.toInputStream() override def retain(): ManagedBuffer = this diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index 73e4e72cc5b..88bd117ba22 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -23,6 +23,7 @@ import java.nio.channels.WritableByteChannel import com.google.common.io.ByteStreams import com.google.common.primitives.UnsignedBytes +import io.netty.handler.stream.ChunkedStream import org.apache.commons.io.IOUtils import org.apache.spark.SparkEnv @@ -131,6 +132,14 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) extends Ex new ChunkedByteBufferFileRegion(this, bufferWriteChunkSize) } + /** + * Wrap this in a ChunkedStream which allows us to provide the data in a manner + * compatible with SSL encryption + */ + def toNettyForSsl: ChunkedStream = { + new ChunkedStream(toInputStream(), bufferWriteChunkSize) + } + /** * Copy this buffer into a new byte array. * @@ -284,6 +293,17 @@ private[spark] class ChunkedByteBufferInputStream( } } + override def available(): Int = { + if (currentChunk != null && !currentChunk.hasRemaining && chunks.hasNext) { + currentChunk = chunks.next() + } + if (currentChunk != null && currentChunk.hasRemaining) { + currentChunk.remaining + } else { + 0 + } + } + override def read(): Int = { if (currentChunk != null && !currentChunk.hasRemaining && chunks.hasNext) { currentChunk = chunks.next() diff --git a/core/src/test/scala/org/apache/spark/network/BlockTransferServiceSuite.scala b/core/src/test/scala/org/apache/spark/network/BlockTransferServiceSuite.scala index d7e4b9166fa..f9a1b778b4e 100644 --- a/core/src/test/scala/org/apache/spark/network/BlockTransferServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/network/BlockTransferServiceSuite.scala @@ -74,6 +74,8 @@ class BlockTransferServiceSuite extends SparkFunSuite with TimeLimits { override def release(): ManagedBuffer = this override def convertToNetty(): AnyRef = null + + override def convertToNettyForSsl(): AnyRef = null } listener.onBlockFetchSuccess("block-id-unused", badBuffer) } diff --git a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala index 56b8e0b6df3..9638558e3c9 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala @@ -43,6 +43,7 @@ class RecordingManagedBuffer(underlyingBuffer: NioManagedBuffer) extends Managed override def nioByteBuffer(): ByteBuffer = underlyingBuffer.nioByteBuffer() override def createInputStream(): InputStream = underlyingBuffer.createInputStream() override def convertToNetty(): AnyRef = underlyingBuffer.convertToNetty() + override def convertToNettyForSsl(): AnyRef = underlyingBuffer.convertToNettyForSsl() override def retain(): ManagedBuffer = { callsToRetain += 1 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org