This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 37a0ae3511c [SPARK-43138][CORE] Fix ClassNotFoundException during migration 37a0ae3511c is described below commit 37a0ae3511c9f153537d5928e9938f72763f5464 Author: Emil Ejbyfeldt <eejbyfe...@liveintent.com> AuthorDate: Thu May 11 08:25:45 2023 -0500 [SPARK-43138][CORE] Fix ClassNotFoundException during migration ### What changes were proposed in this pull request? This PR fixes an unhandled ClassNotFoundException during RDD block decommissions migrations. ``` 2023-04-08 04:15:11,791 ERROR server.TransportRequestHandler: Error while invoking RpcHandler#receive() on RPC id 6425687122551756860 java.lang.ClassNotFoundException: com.class.from.user.jar.ClassName at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) at java.base/java.lang.Class.forName0(Native Method) at java.base/java.lang.Class.forName(Class.java:398) at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:71) at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2003) at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1870) at java.base/java.io.ObjectInputStream.readClass(ObjectInputStream.java:1833) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1658) at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496) at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390) at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496) at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390) at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489) at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:123) at org.apache.spark.network.netty.NettyBlockRpcServer.deserializeMetadata(NettyBlockRpcServer.scala:180) at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:119) at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:163) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:109) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:829) ``` The exception occurs if RDD block contains user defined during the serialization of a `ClassTag` for the user defined class. The problem for serialization of the `ClassTag` a instance of `JavaSerializer`(https://github.com/apache/spark/blob/ca2ddf3c2079dda93053e64070ebda1610aa1968/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala#L62) is used, but it never configured to use a class loader including user defined classes. This PR solves the issue by inst [...] The reason is this does not occur during normal block replication and only during decommission is that there is a workaround/hack in `BlockManager.doPutIterator` that replaces the `ClassTag` with a `ClassTag[Any]` when replicating that block https://github.com/apache/spark/blob/ca2ddf3c2079dda93053e64070ebda1610aa1968/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1657-L1664 But during RDD migration (and probably pro-active replication) it will use a different codepa [...] ### Why are the changes needed? The unhandled exception means that block replication does not work properly. Specifically cases where the block contains a user class and it not replicated at creation then the block will never successfully be migrated during decommission. ### Does this PR introduce _any_ user-facing change? It fixes the bug. But also since it changes from a fixed `JavaSerializer` to instead use the `SerializerManager` the `NettyBlockTransferService` might now instead use `KryoSerializer` or some other user configured serializer for the metadata. ### How was this patch tested? This modifies an existing spec to correctly check that replication happens for repl defined classes while removing the hack that erases the `ClassTag`. Additionally I tested this manually on a hadoop cluster to check that it also solves the decommission migration issue. If some can point me to some better way to add a spec using user defined classes I would also like to add a unittest for it. Closes #40808 from eejbyfeldt/SPARK-43138-class-not-found-exception. Authored-by: Emil Ejbyfeldt <eejbyfe...@liveintent.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../src/main/scala/org/apache/spark/SparkEnv.scala | 4 ++-- .../network/netty/NettyBlockTransferService.scala | 5 +++-- .../org/apache/spark/storage/BlockManager.scala | 10 +-------- .../netty/NettyBlockTransferSecuritySuite.scala | 10 ++++++--- .../netty/NettyBlockTransferServiceSuite.scala | 6 ++++-- .../storage/BlockManagerReplicationSuite.scala | 5 +++-- .../apache/spark/storage/BlockManagerSuite.scala | 25 ++++++++++++---------- .../org/apache/spark/repl/SingletonReplSuite.scala | 14 ++++++++---- .../streaming/ReceivedBlockHandlerSuite.scala | 3 ++- 9 files changed, 46 insertions(+), 36 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 28ab9dc0742..272a0a6332b 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -363,8 +363,8 @@ object SparkEnv extends Logging { isDriver) val blockTransferService = - new NettyBlockTransferService(conf, securityManager, bindAddress, advertiseAddress, - blockManagerPort, numUsableCores, blockManagerMaster.driverEndpoint) + new NettyBlockTransferService(conf, securityManager, serializerManager, bindAddress, + advertiseAddress, blockManagerPort, numUsableCores, blockManagerMaster.driverEndpoint) // NB: blockManager is not valid until initialize() is called later. val blockManager = new BlockManager( diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index a418cb2bf44..d04d2eeef0b 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -40,7 +40,7 @@ import org.apache.spark.network.shuffle.{BlockFetchingListener, BlockTransferLis import org.apache.spark.network.shuffle.protocol.{UploadBlock, UploadBlockStream} import org.apache.spark.network.util.JavaUtils import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.serializer.JavaSerializer +import org.apache.spark.serializer.SerializerManager import org.apache.spark.storage.{BlockId, StorageLevel} import org.apache.spark.storage.BlockManagerMessages.IsExecutorAlive import org.apache.spark.util.Utils @@ -51,6 +51,7 @@ import org.apache.spark.util.Utils private[spark] class NettyBlockTransferService( conf: SparkConf, securityManager: SecurityManager, + serializerManager: SerializerManager, bindAddress: String, override val hostName: String, _port: Int, @@ -59,7 +60,7 @@ private[spark] class NettyBlockTransferService( extends BlockTransferService { // TODO: Don't use Java serialization, use a more cross-version compatible serialization format. - private val serializer = new JavaSerializer(conf) + private val serializer = serializerManager.getSerializer(scala.reflect.classTag[Any], false) private val authEnabled = securityManager.isAuthenticationEnabled() private[this] var transportContext: TransportContext = _ diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index a8f74ef179b..b4453b4d35e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1654,16 +1654,8 @@ private[spark] class BlockManager( if (level.replication > 1) { val remoteStartTimeNs = System.nanoTime() val bytesToReplicate = doGetLocalBytes(blockId, info) - // [SPARK-16550] Erase the typed classTag when using default serialization, since - // NettyBlockRpcServer crashes when deserializing repl-defined classes. - // TODO(ekl) remove this once the classloader issue on the remote end is fixed. - val remoteClassTag = if (!serializerManager.canUseKryo(classTag)) { - scala.reflect.classTag[Any] - } else { - classTag - } try { - replicate(blockId, bytesToReplicate, level, remoteClassTag) + replicate(blockId, bytesToReplicate, level, classTag) } finally { bytesToReplicate.dispose() } diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala index 13bb811b840..85b05cd5f98 100644 --- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala +++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala @@ -38,6 +38,7 @@ import org.apache.spark.internal.config.Network import org.apache.spark.network.{BlockDataManager, BlockTransferService} import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.shuffle.BlockFetchingListener +import org.apache.spark.serializer.{JavaSerializer, SerializerManager} import org.apache.spark.storage.{BlockId, ShuffleBlockId} import org.apache.spark.util.ThreadUtils @@ -126,13 +127,16 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi when(blockManager.getLocalBlockData(blockId)).thenReturn(blockBuffer) val securityManager0 = new SecurityManager(conf0) - val exec0 = new NettyBlockTransferService(conf0, securityManager0, "localhost", "localhost", 0, + val serializerManager0 = new SerializerManager(new JavaSerializer(conf0), conf0) + val exec0 = new NettyBlockTransferService( + conf0, securityManager0, serializerManager0, "localhost", "localhost", 0, 1) exec0.init(blockManager) val securityManager1 = new SecurityManager(conf1) - val exec1 = new NettyBlockTransferService(conf1, securityManager1, "localhost", "localhost", 0, - 1) + val serializerManager1 = new SerializerManager(new JavaSerializer(conf1), conf1) + val exec1 = new NettyBlockTransferService( + conf1, securityManager1, serializerManager1, "localhost", "localhost", 0, 1) exec1.init(blockManager) val result = fetchBlock(exec0, exec1, "1", blockId) match { diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala index 3a6bc47257f..62105f1d514 100644 --- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala +++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala @@ -34,6 +34,7 @@ import org.apache.spark.network.BlockDataManager import org.apache.spark.network.client.{TransportClient, TransportClientFactory} import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager} import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout} +import org.apache.spark.serializer.{JavaSerializer, SerializerManager} class NettyBlockTransferServiceSuite extends SparkFunSuite @@ -142,10 +143,11 @@ class NettyBlockTransferServiceSuite rpcEndpointRef: RpcEndpointRef = null): NettyBlockTransferService = { val conf = new SparkConf() .set("spark.app.id", s"test-${getClass.getName}") + val serializerManager = new SerializerManager(new JavaSerializer(conf), conf) val securityManager = new SecurityManager(conf) val blockDataManager = mock(classOf[BlockDataManager]) - val service = new NettyBlockTransferService(conf, securityManager, "localhost", "localhost", - port, 1, rpcEndpointRef) + val service = new NettyBlockTransferService( + conf, securityManager, serializerManager, "localhost", "localhost", port, 1, rpcEndpointRef) service.init(blockDataManager) service } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index 14e1ee5b09d..8729ae1edfb 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -75,9 +75,10 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite memoryManager: Option[UnifiedMemoryManager] = None): BlockManager = { conf.set(TEST_MEMORY, maxMem) conf.set(MEMORY_OFFHEAP_SIZE, maxMem) - val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1) - val memManager = memoryManager.getOrElse(UnifiedMemoryManager(conf, numCores = 1)) val serializerManager = new SerializerManager(serializer, conf) + val transfer = new NettyBlockTransferService( + conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1) + val memManager = memoryManager.getOrElse(UnifiedMemoryManager(conf, numCores = 1)) val store = new BlockManager(name, rpcEnv, master, serializerManager, conf, memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, None) memManager.setMemoryStore(store.memoryStore) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index cc1c01d80cb..29592434765 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -131,10 +131,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe None } val bmSecurityMgr = new SecurityManager(bmConf, encryptionKey) - val transfer = transferService - .getOrElse(new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)) - val memManager = UnifiedMemoryManager(bmConf, numCores = 1) val serializerManager = new SerializerManager(serializer, bmConf) + val transfer = transferService.getOrElse(new NettyBlockTransferService( + conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1)) + val memManager = UnifiedMemoryManager(bmConf, numCores = 1) val externalShuffleClient = if (conf.get(config.SHUFFLE_SERVICE_ENABLED)) { val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", 0) Some(new ExternalBlockStoreClient(transConf, bmSecurityMgr, @@ -1308,9 +1308,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe test("block store put failure") { // Use Java serializer so we can create an unserializable error. conf.set(TEST_MEMORY, 1200L) - val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1) - val memoryManager = UnifiedMemoryManager(conf, numCores = 1) val serializerManager = new SerializerManager(new JavaSerializer(conf), conf) + val transfer = new NettyBlockTransferService( + conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1) + val memoryManager = UnifiedMemoryManager(conf, numCores = 1) val store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master, serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager, transfer, securityMgr, None) @@ -1357,8 +1358,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe if (conf.get(IO_ENCRYPTION_ENABLED)) Some(CryptoStreamUtils.createKey(conf)) else None val securityMgr = new SecurityManager(conf, ioEncryptionKey) val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey) - val transfer = - new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1) + val transfer = new NettyBlockTransferService( + conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1) val memoryManager = UnifiedMemoryManager(conf, numCores = 1) val blockManager = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master, serializerManager, conf, memoryManager, mapOutputTracker, @@ -2193,9 +2194,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe case class User(id: Long, name: String) conf.set(TEST_MEMORY, 1200L) - val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1) - val memoryManager = UnifiedMemoryManager(conf, numCores = 1) val serializerManager = new SerializerManager(kryoSerializerWithDiskCorruptedInputStream, conf) + val transfer = new NettyBlockTransferService( + conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1) + val memoryManager = UnifiedMemoryManager(conf, numCores = 1) val store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master, serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager, transfer, securityMgr, None) @@ -2216,9 +2218,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe = createKryoSerializerWithDiskCorruptedInputStream() conf.set(TEST_MEMORY, 1200L) - val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1) - val memoryManager = UnifiedMemoryManager(conf, numCores = 1) val serializerManager = new SerializerManager(kryoSerializerWithDiskCorruptedInputStream, conf) + val transfer = new NettyBlockTransferService( + conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1) + val memoryManager = UnifiedMemoryManager(conf, numCores = 1) val store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master, serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager, transfer, securityMgr, None) diff --git a/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala index 4795306692f..0e3bfcfa89d 100644 --- a/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala +++ b/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala @@ -344,13 +344,19 @@ class SingletonReplSuite extends SparkFunSuite { |} |import org.apache.spark.storage.StorageLevel._ |case class Foo(i: Int) - |val ret = sc.parallelize((1 to 100).map(Foo), 10).persist(MEMORY_AND_DISK_2) - |ret.count() - |val res = sc.getRDDStorageInfo.filter(_.id == ret.id).map(_.numCachedPartitions).sum + |val rdd1 = sc.parallelize((1 to 100).map(Foo), 10).persist(MEMORY_ONLY) + |val rdd2 = sc.parallelize((1 to 100).map(Foo), 10).persist(MEMORY_ONLY_2) + |rdd1.count() + |rdd2.count() + |val cached1 = sc.getRDDStorageInfo.filter(_.id == rdd1.id).map(_.numCachedPartitions).sum + |val size1 = sc.getRDDStorageInfo.filter(_.id == rdd1.id).map(_.memSize).sum + |val size2 = sc.getRDDStorageInfo.filter(_.id == rdd2.id).map(_.memSize).sum + |assert(size2 == size1 * 2, s"Blocks not replicated properly size1=$size1, size2=$size2") """.stripMargin) assertDoesNotContain("error:", output) assertDoesNotContain("Exception", output) - assertContains("res: Int = 10", output) + assertContains("cached1: Int = 10", output) + assertDoesNotContain("AssertionError", output) } test("should clone and clean line object in ClosureCleaner") { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index dcf82d5e2c2..1913552ceed 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -288,7 +288,8 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean) conf: SparkConf, name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { val memManager = new UnifiedMemoryManager(conf, maxMem, maxMem / 2, 1) - val transfer = new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1) + val transfer = new NettyBlockTransferService( + conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1) val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, serializerManager, conf, memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, None) memManager.setMemoryStore(blockManager.memoryStore) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org