Repository: spark Updated Branches: refs/heads/branch-2.3 9ac9f36c4 -> 575fea120
[CORE] Updates to remote cache reads Covered by tests in DistributedSuite Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/575fea12 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/575fea12 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/575fea12 Branch: refs/heads/branch-2.3 Commit: 575fea120e25249716e3f680396580c5f9e26b5b Parents: 6d742d1 Author: Imran Rashid <iras...@cloudera.com> Authored: Wed Aug 22 16:38:28 2018 -0500 Committer: Imran Rashid <iras...@cloudera.com> Committed: Thu Sep 13 09:19:56 2018 -0500 ---------------------------------------------------------------------- .../spark/network/buffer/ManagedBuffer.java | 5 +- .../spark/network/shuffle/DownloadFile.java | 47 ++++++++++ .../network/shuffle/DownloadFileManager.java | 36 ++++++++ .../shuffle/DownloadFileWritableChannel.java | 31 +++++++ .../network/shuffle/ExternalShuffleClient.java | 4 +- .../network/shuffle/OneForOneBlockFetcher.java | 28 +++--- .../spark/network/shuffle/ShuffleClient.java | 4 +- .../network/shuffle/SimpleDownloadFile.java | 91 ++++++++++++++++++++ .../spark/network/shuffle/TempFileManager.java | 36 -------- .../spark/network/BlockTransferService.scala | 6 +- .../netty/NettyBlockTransferService.scala | 4 +- .../org/apache/spark/storage/BlockManager.scala | 78 ++++++++++++++--- .../org/apache/spark/storage/DiskStore.scala | 16 ++++ .../storage/ShuffleBlockFetcherIterator.scala | 21 +++-- .../spark/storage/BlockManagerSuite.scala | 8 +- .../ShuffleBlockFetcherIteratorSuite.scala | 6 +- 16 files changed, 329 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java ---------------------------------------------------------------------- diff --git a/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java b/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java index 1861f8d..2d573f5 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java +++ b/common/network-common/src/main/java/org/apache/spark/network/buffer/ManagedBuffer.java @@ -36,7 +36,10 @@ import java.nio.ByteBuffer; */ public abstract class ManagedBuffer { - /** Number of bytes of the data. */ + /** + * Number of bytes of the data. If this buffer will decrypt for all of the views into the data, + * this is the size of the decrypted data. + */ public abstract long size(); /** http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFile.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFile.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFile.java new file mode 100644 index 0000000..76c2d02 --- /dev/null +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFile.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import java.io.IOException; + +/** + * A handle on the file used when fetching remote data to disk. Used to ensure the lifecycle of + * writing the data, reading it back, and then cleaning it up is followed. Specific implementations + * may also handle encryption. The data can be read only via DownloadFileWritableChannel, + * which ensures data is not read until after the writer is closed. + */ +public interface DownloadFile { + /** + * Delete the file. + * + * @return <code>true</code> if and only if the file or directory is + * successfully deleted; <code>false</code> otherwise + */ + public boolean delete(); + + /** + * A channel for writing data to the file. This special channel allows access to the data for + * reading, after the channel is closed, via {@link DownloadFileWritableChannel#closeAndRead()}. + */ + public DownloadFileWritableChannel openForWriting() throws IOException; + + /** + * The path of the file, intended only for debug purposes. + */ + public String path(); +} http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFileManager.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFileManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFileManager.java new file mode 100644 index 0000000..c335a17 --- /dev/null +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFileManager.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import org.apache.spark.network.util.TransportConf; + +/** + * A manager to create temp block files used when fetching remote data to reduce the memory usage. + * It will clean files when they won't be used any more. + */ +public interface DownloadFileManager { + + /** Create a temp block file. */ + DownloadFile createTempFile(TransportConf transportConf); + + /** + * Register a temp file to clean up when it won't be used any more. Return whether the + * file is registered successfully. If `false`, the caller should clean up the file by itself. + */ + boolean registerTempFileToClean(DownloadFile file); +} http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFileWritableChannel.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFileWritableChannel.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFileWritableChannel.java new file mode 100644 index 0000000..acf194c --- /dev/null +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/DownloadFileWritableChannel.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import org.apache.spark.network.buffer.ManagedBuffer; + +import java.io.OutputStream; +import java.nio.channels.WritableByteChannel; + +/** + * A channel for writing data which is fetched to disk, which allows access to the written data only + * after the writer has been closed. Used with DownloadFile and DownloadFileManager. + */ +public interface DownloadFileWritableChannel extends WritableByteChannel { + public ManagedBuffer closeAndRead(); +} http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java index 7ed0b6e..9a2cf0f 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java @@ -91,7 +91,7 @@ public class ExternalShuffleClient extends ShuffleClient { String execId, String[] blockIds, BlockFetchingListener listener, - TempFileManager tempFileManager) { + DownloadFileManager downloadFileManager) { checkInit(); logger.debug("External shuffle fetch from {}:{} (executor id {})", host, port, execId); try { @@ -99,7 +99,7 @@ public class ExternalShuffleClient extends ShuffleClient { (blockIds1, listener1) -> { TransportClient client = clientFactory.createClient(host, port); new OneForOneBlockFetcher(client, appId, execId, - blockIds1, listener1, conf, tempFileManager).start(); + blockIds1, listener1, conf, downloadFileManager).start(); }; int maxRetries = conf.maxIORetries(); http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java index 0bc5718..3058702 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockFetcher.java @@ -17,18 +17,13 @@ package org.apache.spark.network.shuffle; -import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.WritableByteChannel; import java.util.Arrays; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.spark.network.buffer.FileSegmentManagedBuffer; import org.apache.spark.network.buffer.ManagedBuffer; import org.apache.spark.network.client.ChunkReceivedCallback; import org.apache.spark.network.client.RpcResponseCallback; @@ -58,7 +53,7 @@ public class OneForOneBlockFetcher { private final BlockFetchingListener listener; private final ChunkReceivedCallback chunkCallback; private final TransportConf transportConf; - private final TempFileManager tempFileManager; + private final DownloadFileManager downloadFileManager; private StreamHandle streamHandle = null; @@ -79,14 +74,14 @@ public class OneForOneBlockFetcher { String[] blockIds, BlockFetchingListener listener, TransportConf transportConf, - TempFileManager tempFileManager) { + DownloadFileManager downloadFileManager) { this.client = client; this.openMessage = new OpenBlocks(appId, execId, blockIds); this.blockIds = blockIds; this.listener = listener; this.chunkCallback = new ChunkCallback(); this.transportConf = transportConf; - this.tempFileManager = tempFileManager; + this.downloadFileManager = downloadFileManager; } /** Callback invoked on receipt of each chunk. We equate a single chunk to a single block. */ @@ -125,7 +120,7 @@ public class OneForOneBlockFetcher { // Immediately request all chunks -- we expect that the total size of the request is // reasonable due to higher level chunking in [[ShuffleBlockFetcherIterator]]. for (int i = 0; i < streamHandle.numChunks; i++) { - if (tempFileManager != null) { + if (downloadFileManager != null) { client.stream(OneForOneStreamManager.genStreamChunkId(streamHandle.streamId, i), new DownloadCallback(i)); } else { @@ -159,13 +154,13 @@ public class OneForOneBlockFetcher { private class DownloadCallback implements StreamCallback { - private WritableByteChannel channel = null; - private File targetFile = null; + private DownloadFileWritableChannel channel = null; + private DownloadFile targetFile = null; private int chunkIndex; DownloadCallback(int chunkIndex) throws IOException { - this.targetFile = tempFileManager.createTempFile(); - this.channel = Channels.newChannel(new FileOutputStream(targetFile)); + this.targetFile = downloadFileManager.createTempFile(transportConf); + this.channel = targetFile.openForWriting(); this.chunkIndex = chunkIndex; } @@ -178,11 +173,8 @@ public class OneForOneBlockFetcher { @Override public void onComplete(String streamId) throws IOException { - channel.close(); - ManagedBuffer buffer = new FileSegmentManagedBuffer(transportConf, targetFile, 0, - targetFile.length()); - listener.onBlockFetchSuccess(blockIds[chunkIndex], buffer); - if (!tempFileManager.registerTempFileToClean(targetFile)) { + listener.onBlockFetchSuccess(blockIds[chunkIndex], channel.closeAndRead()); + if (!downloadFileManager.registerTempFileToClean(targetFile)) { targetFile.delete(); } } http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java index 18b04fe..62b99c4 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java @@ -43,7 +43,7 @@ public abstract class ShuffleClient implements Closeable { * @param execId the executor id. * @param blockIds block ids to fetch. * @param listener the listener to receive block fetching status. - * @param tempFileManager TempFileManager to create and clean temp files. + * @param downloadFileManager DownloadFileManager to create and clean temp files. * If it's not <code>null</code>, the remote blocks will be streamed * into temp shuffle files to reduce the memory usage, otherwise, * they will be kept in memory. @@ -54,7 +54,7 @@ public abstract class ShuffleClient implements Closeable { String execId, String[] blockIds, BlockFetchingListener listener, - TempFileManager tempFileManager); + DownloadFileManager downloadFileManager); /** * Get the shuffle MetricsSet from ShuffleClient, this will be used in MetricsSystem to http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/SimpleDownloadFile.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/SimpleDownloadFile.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/SimpleDownloadFile.java new file mode 100644 index 0000000..670612f --- /dev/null +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/SimpleDownloadFile.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.network.shuffle; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; + +import org.apache.spark.network.buffer.FileSegmentManagedBuffer; +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.util.TransportConf; + +/** + * A DownloadFile that does not take any encryption settings into account for reading and + * writing data. + * + * This does *not* mean the data in the file is un-encrypted -- it could be that the data is + * already encrypted when its written, and subsequent layer is responsible for decrypting. + */ +public class SimpleDownloadFile implements DownloadFile { + + private final File file; + private final TransportConf transportConf; + + public SimpleDownloadFile(File file, TransportConf transportConf) { + this.file = file; + this.transportConf = transportConf; + } + + @Override + public boolean delete() { + return file.delete(); + } + + @Override + public DownloadFileWritableChannel openForWriting() throws IOException { + return new SimpleDownloadWritableChannel(); + } + + @Override + public String path() { + return file.getAbsolutePath(); + } + + private class SimpleDownloadWritableChannel implements DownloadFileWritableChannel { + + private final WritableByteChannel channel; + + SimpleDownloadWritableChannel() throws FileNotFoundException { + channel = Channels.newChannel(new FileOutputStream(file)); + } + + @Override + public ManagedBuffer closeAndRead() { + return new FileSegmentManagedBuffer(transportConf, file, 0, file.length()); + } + + @Override + public int write(ByteBuffer src) throws IOException { + return channel.write(src); + } + + @Override + public boolean isOpen() { + return channel.isOpen(); + } + + @Override + public void close() throws IOException { + channel.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempFileManager.java ---------------------------------------------------------------------- diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempFileManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempFileManager.java deleted file mode 100644 index 552364d..0000000 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/TempFileManager.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.network.shuffle; - -import java.io.File; - -/** - * A manager to create temp block files to reduce the memory usage and also clean temp - * files when they won't be used any more. - */ -public interface TempFileManager { - - /** Create a temp block file. */ - File createTempFile(); - - /** - * Register a temp file to clean up when it won't be used any more. Return whether the - * file is registered successfully. If `false`, the caller should clean up the file by itself. - */ - boolean registerTempFileToClean(File file); -} http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala index 1d8a266..eef8c31 100644 --- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala @@ -26,7 +26,7 @@ import scala.reflect.ClassTag import org.apache.spark.internal.Logging import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer, NioManagedBuffer} -import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, TempFileManager} +import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager, ShuffleClient} import org.apache.spark.storage.{BlockId, StorageLevel} import org.apache.spark.util.ThreadUtils @@ -68,7 +68,7 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo execId: String, blockIds: Array[String], listener: BlockFetchingListener, - tempFileManager: TempFileManager): Unit + tempFileManager: DownloadFileManager): Unit /** * Upload a single block to a remote node, available only after [[init]] is invoked. @@ -92,7 +92,7 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo port: Int, execId: String, blockId: String, - tempFileManager: TempFileManager): ManagedBuffer = { + tempFileManager: DownloadFileManager): ManagedBuffer = { // A monitor for the thread to wait on. val result = Promise[ManagedBuffer]() fetchBlocks(host, port, execId, Array(blockId), http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index b7d8c35..1c48ad6 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -32,7 +32,7 @@ import org.apache.spark.network.buffer.ManagedBuffer import org.apache.spark.network.client.{RpcResponseCallback, TransportClientBootstrap, TransportClientFactory} import org.apache.spark.network.crypto.{AuthClientBootstrap, AuthServerBootstrap} import org.apache.spark.network.server._ -import org.apache.spark.network.shuffle.{BlockFetchingListener, OneForOneBlockFetcher, RetryingBlockFetcher, TempFileManager} +import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager, OneForOneBlockFetcher, RetryingBlockFetcher} import org.apache.spark.network.shuffle.protocol.UploadBlock import org.apache.spark.network.util.JavaUtils import org.apache.spark.serializer.JavaSerializer @@ -105,7 +105,7 @@ private[spark] class NettyBlockTransferService( execId: String, blockIds: Array[String], listener: BlockFetchingListener, - tempFileManager: TempFileManager): Unit = { + tempFileManager: DownloadFileManager): Unit = { logTrace(s"Fetch blocks from $host:$port (executor id $execId)") try { val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter { http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index df1a4be..027c31f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -33,6 +33,7 @@ import scala.util.Random import scala.util.control.NonFatal import com.codahale.metrics.{MetricRegistry, MetricSet} +import com.google.common.io.CountingOutputStream import org.apache.spark._ import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics} @@ -42,8 +43,9 @@ import org.apache.spark.metrics.source.Source import org.apache.spark.network._ import org.apache.spark.network.buffer.ManagedBuffer import org.apache.spark.network.netty.SparkTransportConf -import org.apache.spark.network.shuffle.{ExternalShuffleClient, TempFileManager} +import org.apache.spark.network.shuffle._ import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo +import org.apache.spark.network.util.TransportConf import org.apache.spark.rpc.RpcEnv import org.apache.spark.serializer.{SerializerInstance, SerializerManager} import org.apache.spark.shuffle.ShuffleManager @@ -206,11 +208,11 @@ private[spark] class BlockManager( private var blockReplicationPolicy: BlockReplicationPolicy = _ - // A TempFileManager used to track all the files of remote blocks which above the + // A DownloadFileManager used to track all the files of remote blocks which are above the // specified memory threshold. Files will be deleted automatically based on weak reference. // Exposed for test private[storage] val remoteBlockTempFileManager = - new BlockManager.RemoteBlockTempFileManager(this) + new BlockManager.RemoteBlockDownloadFileManager(this) private val maxRemoteBlockToMem = conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM) /** @@ -1582,23 +1584,28 @@ private[spark] object BlockManager { metricRegistry.registerAll(metricSet) } - class RemoteBlockTempFileManager(blockManager: BlockManager) - extends TempFileManager with Logging { + class RemoteBlockDownloadFileManager(blockManager: BlockManager) + extends DownloadFileManager with Logging { + // lazy because SparkEnv is set after this + lazy val encryptionKey = SparkEnv.get.securityManager.getIOEncryptionKey() - private class ReferenceWithCleanup(file: File, referenceQueue: JReferenceQueue[File]) - extends WeakReference[File](file, referenceQueue) { - private val filePath = file.getAbsolutePath + private class ReferenceWithCleanup( + file: DownloadFile, + referenceQueue: JReferenceQueue[DownloadFile] + ) extends WeakReference[DownloadFile](file, referenceQueue) { + + val filePath = file.path() def cleanUp(): Unit = { logDebug(s"Clean up file $filePath") - if (!new File(filePath).delete()) { + if (!file.delete()) { logDebug(s"Fail to delete file $filePath") } } } - private val referenceQueue = new JReferenceQueue[File] + private val referenceQueue = new JReferenceQueue[DownloadFile] private val referenceBuffer = Collections.newSetFromMap[ReferenceWithCleanup]( new ConcurrentHashMap) @@ -1610,11 +1617,21 @@ private[spark] object BlockManager { cleaningThread.setName("RemoteBlock-temp-file-clean-thread") cleaningThread.start() - override def createTempFile(): File = { - blockManager.diskBlockManager.createTempLocalBlock()._2 + override def createTempFile(transportConf: TransportConf): DownloadFile = { + val file = blockManager.diskBlockManager.createTempLocalBlock()._2 + encryptionKey match { + case Some(key) => + // encryption is enabled, so when we read the decrypted data off the network, we need to + // encrypt it when writing to disk. Note that the data may have been encrypted when it + // was cached on disk on the remote side, but it was already decrypted by now (see + // EncryptedBlockData). + new EncryptedDownloadFile(file, key) + case None => + new SimpleDownloadFile(file, transportConf) + } } - override def registerTempFileToClean(file: File): Boolean = { + override def registerTempFileToClean(file: DownloadFile): Boolean = { referenceBuffer.add(new ReferenceWithCleanup(file, referenceQueue)) } @@ -1642,4 +1659,39 @@ private[spark] object BlockManager { } } } + + /** + * A DownloadFile that encrypts data when it is written, and decrypts when it's read. + */ + private class EncryptedDownloadFile( + file: File, + key: Array[Byte]) extends DownloadFile { + + private val env = SparkEnv.get + + override def delete(): Boolean = file.delete() + + override def openForWriting(): DownloadFileWritableChannel = { + new EncryptedDownloadWritableChannel() + } + + override def path(): String = file.getAbsolutePath + + private class EncryptedDownloadWritableChannel extends DownloadFileWritableChannel { + private val countingOutput: CountingWritableChannel = new CountingWritableChannel( + Channels.newChannel(env.serializerManager.wrapForEncryption(new FileOutputStream(file)))) + + override def closeAndRead(): ManagedBuffer = { + countingOutput.close() + val size = countingOutput.getCount + new EncryptedManagedBuffer(new EncryptedBlockData(file, size, env.conf, key)) + } + + override def write(src: ByteBuffer): Int = countingOutput.write(src) + + override def isOpen: Boolean = countingOutput.isOpen() + + override def close(): Unit = countingOutput.close() + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 39249d4..e61cfe8 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -30,6 +30,7 @@ import io.netty.channel.DefaultFileRegion import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.internal.Logging +import org.apache.spark.network.buffer.ManagedBuffer import org.apache.spark.network.util.{AbstractFileRegion, JavaUtils} import org.apache.spark.security.CryptoStreamUtils import org.apache.spark.util.Utils @@ -261,7 +262,22 @@ private class EncryptedBlockData( throw e } } +} + +private class EncryptedManagedBuffer(val blockData: EncryptedBlockData) extends ManagedBuffer { + + // This is the size of the decrypted data + override def size(): Long = blockData.size + + override def nioByteBuffer(): ByteBuffer = blockData.toByteBuffer() + + override def convertToNetty(): AnyRef = blockData.toNetty() + + override def createInputStream(): InputStream = blockData.toInputStream() + + override def retain(): ManagedBuffer = this + override def release(): ManagedBuffer = this } private class ReadableChannelFileRegion(source: ReadableByteChannel, blockSize: Long) http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index dd9df74..94def4d 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -17,7 +17,7 @@ package org.apache.spark.storage -import java.io.{File, InputStream, IOException} +import java.io.{InputStream, IOException} import java.nio.ByteBuffer import java.util.concurrent.LinkedBlockingQueue import javax.annotation.concurrent.GuardedBy @@ -28,7 +28,8 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue} import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} -import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, TempFileManager} +import org.apache.spark.network.shuffle._ +import org.apache.spark.network.util.TransportConf import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.util.Utils import org.apache.spark.util.io.ChunkedByteBufferOutputStream @@ -69,7 +70,7 @@ final class ShuffleBlockFetcherIterator( maxBlocksInFlightPerAddress: Int, maxReqSizeShuffleToMem: Long, detectCorrupt: Boolean) - extends Iterator[(BlockId, InputStream)] with TempFileManager with Logging { + extends Iterator[(BlockId, InputStream)] with DownloadFileManager with Logging { import ShuffleBlockFetcherIterator._ @@ -148,7 +149,7 @@ final class ShuffleBlockFetcherIterator( * deleted when cleanup. This is a layer of defensiveness against disk file leaks. */ @GuardedBy("this") - private[this] val shuffleFilesSet = mutable.HashSet[File]() + private[this] val shuffleFilesSet = mutable.HashSet[DownloadFile]() initialize() @@ -162,11 +163,15 @@ final class ShuffleBlockFetcherIterator( currentResult = null } - override def createTempFile(): File = { - blockManager.diskBlockManager.createTempLocalBlock()._2 + override def createTempFile(transportConf: TransportConf): DownloadFile = { + // we never need to do any encryption or decryption here, regardless of configs, because that + // is handled at another layer in the code. When encryption is enabled, shuffle data is written + // to disk encrypted in the first place, and sent over the network still encrypted. + new SimpleDownloadFile( + blockManager.diskBlockManager.createTempLocalBlock()._2, transportConf) } - override def registerTempFileToClean(file: File): Boolean = synchronized { + override def registerTempFileToClean(file: DownloadFile): Boolean = synchronized { if (isZombie) { false } else { @@ -202,7 +207,7 @@ final class ShuffleBlockFetcherIterator( } shuffleFilesSet.foreach { file => if (!file.delete()) { - logWarning("Failed to cleanup shuffle fetch temp file " + file.getAbsolutePath()) + logWarning("Failed to cleanup shuffle fetch temp file " + file.path()) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 629eed4..4d2168f 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -44,7 +44,7 @@ import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.client.{RpcResponseCallback, TransportClient} import org.apache.spark.network.netty.{NettyBlockTransferService, SparkTransportConf} import org.apache.spark.network.server.{NoOpRpcHandler, TransportServer, TransportServerBootstrap} -import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, TempFileManager} +import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager} import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, RegisterExecutor} import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.LiveListenerBus @@ -1403,7 +1403,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService { var numCalls = 0 - var tempFileManager: TempFileManager = null + var tempFileManager: DownloadFileManager = null override def init(blockDataManager: BlockDataManager): Unit = {} @@ -1413,7 +1413,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE execId: String, blockIds: Array[String], listener: BlockFetchingListener, - tempFileManager: TempFileManager): Unit = { + tempFileManager: DownloadFileManager): Unit = { listener.onBlockFetchSuccess("mockBlockId", new NioManagedBuffer(ByteBuffer.allocate(1))) } @@ -1440,7 +1440,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE port: Int, execId: String, blockId: String, - tempFileManager: TempFileManager): ManagedBuffer = { + tempFileManager: DownloadFileManager): ManagedBuffer = { numCalls += 1 this.tempFileManager = tempFileManager if (numCalls <= maxFailures) { http://git-wip-us.apache.org/repos/asf/spark/blob/575fea12/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala index 692ae3b..24244f9 100644 --- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala @@ -33,7 +33,7 @@ import org.scalatest.PrivateMethodTester import org.apache.spark.{SparkFunSuite, TaskContext} import org.apache.spark.network._ import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} -import org.apache.spark.network.shuffle.{BlockFetchingListener, TempFileManager} +import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager} import org.apache.spark.network.util.LimitedInputStream import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.util.Utils @@ -482,12 +482,12 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT val remoteBlocks = Map[BlockId, ManagedBuffer]( ShuffleBlockId(0, 0, 0) -> createMockManagedBuffer()) val transfer = mock(classOf[BlockTransferService]) - var tempFileManager: TempFileManager = null + var tempFileManager: DownloadFileManager = null when(transfer.fetchBlocks(any(), any(), any(), any(), any(), any())) .thenAnswer(new Answer[Unit] { override def answer(invocation: InvocationOnMock): Unit = { val listener = invocation.getArguments()(4).asInstanceOf[BlockFetchingListener] - tempFileManager = invocation.getArguments()(5).asInstanceOf[TempFileManager] + tempFileManager = invocation.getArguments()(5).asInstanceOf[DownloadFileManager] Future { listener.onBlockFetchSuccess( ShuffleBlockId(0, 0, 0).toString, remoteBlocks(ShuffleBlockId(0, 0, 0))) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org