hasnain-db commented on code in PR #43541:
URL: https://github.com/apache/spark/pull/43541#discussion_r1374880822


##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ShuffleTransportContextSuite.java:
##########
@@ -90,15 +94,22 @@ private ByteBuf getDecodableMessageBuf(Message req) throws 
Exception {
   public void testInitializePipeline() throws IOException {
     // SPARK-43987: test that the FinalizedHandler is added to the pipeline 
only when configured
     for (boolean enabled : new boolean[]{true, false}) {
-      ShuffleTransportContext ctx = createShuffleTransportContext(enabled);
-      SocketChannel channel = new NioSocketChannel();
-      RpcHandler rpcHandler = mock(RpcHandler.class);
-      ctx.initializePipeline(channel, rpcHandler);
-      String handlerName = 
ShuffleTransportContext.FinalizedHandler.HANDLER_NAME;
-      if (enabled) {
-        Assertions.assertNotNull(channel.pipeline().get(handlerName));
-      } else {
-        Assertions.assertNull(channel.pipeline().get(handlerName));
+      for (boolean isClient: new boolean[]{true, false}) {
+        // Since the decoder is not Shareable, reset it between test runs to 
avoid errors since it's
+        // used both across ShuffleTransportContextSuite and 
SslShuffleTransportContextSuite
+        // and server/clients

Review Comment:
   if I don't do this, test fails with
   
   ```
   [error] Test 
org.apache.spark.network.shuffle.SslShuffleTransportContextSuite.testInitializePipeline
 failed: io.netty.channel.ChannelPipelineException: 
org.apache.spark.network.shuffle.ShuffleTransportContext$ShuffleMessageDecoder 
is not a @Sharable handler, so can't be added or removed multiple times., took 
0.868s
   [error]     at 
io.netty.channel.DefaultChannelPipeline.checkMultiplicity(DefaultChannelPipeline.java:600)
   [error]     at 
io.netty.channel.DefaultChannelPipeline.addLast(DefaultChannelPipeline.java:202)
   [error]     at 
io.netty.channel.DefaultChannelPipeline.addLast(DefaultChannelPipeline.java:195)
   [error]     at 
org.apache.spark.network.TransportContext.initializePipeline(TransportContext.java:232)
   [error]     at 
org.apache.spark.network.shuffle.ShuffleTransportContext.initializePipeline(ShuffleTransportContext.java:94)
   [error]     at 
org.apache.spark.network.shuffle.ShuffleTransportContextSuite.testInitializePipeline(ShuffleTransportContextSuite.java:105)
   [error]     at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
   [error]     at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
   [error]     at 
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   [error]     at java.lang.reflect.Method.invoke(Method.java:568)
   ```



##########
common/network-common/src/main/java/org/apache/spark/network/TransportContext.java:
##########
@@ -189,15 +204,32 @@ public TransportChannelHandler 
initializePipeline(SocketChannel channel) {
    */
   public TransportChannelHandler initializePipeline(
       SocketChannel channel,
-      RpcHandler channelRpcHandler) {
+      RpcHandler channelRpcHandler,
+      boolean isClient) {
     try {
       TransportChannelHandler channelHandler = createChannelHandler(channel, 
channelRpcHandler);
       ChannelPipeline pipeline = channel.pipeline();
+
       if (nettyLogger.getLoggingHandler() != null) {
         pipeline.addLast("loggingHandler", nettyLogger.getLoggingHandler());
       }
+
+      if (sslEncryptionEnabled()) {
+        SslHandler sslHandler;
+        try {
+          sslHandler = new SslHandler(
+            sslFactory.createSSLEngine(isClient, pipeline.channel().alloc()));
+        } catch (Exception e) {
+          throw new RuntimeException("Error creating Netty SslHandler", e);

Review Comment:
   @HyukjinKwon I mostly wanted to rethrow this with more context. Is there a 
better exception type I should use or should I just remove this catch?



##########
common/network-common/src/main/java/org/apache/spark/network/TransportContext.java:
##########
@@ -189,15 +204,32 @@ public TransportChannelHandler 
initializePipeline(SocketChannel channel) {
    */
   public TransportChannelHandler initializePipeline(
       SocketChannel channel,
-      RpcHandler channelRpcHandler) {
+      RpcHandler channelRpcHandler,
+      boolean isClient) {
     try {
       TransportChannelHandler channelHandler = createChannelHandler(channel, 
channelRpcHandler);
       ChannelPipeline pipeline = channel.pipeline();
+
       if (nettyLogger.getLoggingHandler() != null) {
         pipeline.addLast("loggingHandler", nettyLogger.getLoggingHandler());
       }
+
+      if (sslEncryptionEnabled()) {
+        SslHandler sslHandler;
+        try {
+          sslHandler = new SslHandler(
+            sslFactory.createSSLEngine(isClient, pipeline.channel().alloc()));
+        } catch (Exception e) {
+          throw new RuntimeException("Error creating Netty SslHandler", e);
+        }
+        pipeline.addFirst("NettySslEncryptionHandler", sslHandler);
+        // Cannot use zero-copy with HTTPS, so we add in our 
ChunkedWriteHandler just before the
+        // MessageEncoder
+        pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());

Review Comment:
   @mridulm I am not sure that's correct, given that we are adding a logging 
handler above.
   
   My understanding is that after line 228 the order of handlers will be:
   
   * NettySslEncryptionHandler
   * LoggingHandler
   * ChunkedWriteHandler
   
   Which means that we log the packets after encryption (for debugging) and 
then chunk them. 
   
   If we do `addLast` for the encryption handler, then it comes after logging 
(which we may want, but for now I found it helpful for debugging to keep as 
is). I
   
   If we do `addAfter` for the chunked write handler, then the logging would 
log chunked packets, which would make them a little harder to read for 
debugging.
   
   Am I missing something?



##########
common/network-common/src/test/java/org/apache/spark/network/SslChunkFetchIntegrationSuite.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import com.google.common.io.Closeables;
+import org.junit.jupiter.api.BeforeAll;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NioManagedBuffer;
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.server.RpcHandler;
+import org.apache.spark.network.server.StreamManager;
+import org.apache.spark.network.util.TransportConf;
+import org.apache.spark.network.ssl.SslSampleConfigs;
+
+
+public class SslChunkFetchIntegrationSuite extends ChunkFetchIntegrationSuite {
+
+  @BeforeAll
+  public static void setUp() throws Exception {
+    int bufSize = 100000;
+    final ByteBuffer buf = ByteBuffer.allocate(bufSize);
+    for (int i = 0; i < bufSize; i ++) {
+      buf.put((byte) i);
+    }
+    buf.flip();
+    bufferChunk = new NioManagedBuffer(buf);
+
+    testFile = File.createTempFile("shuffle-test-file", "txt");
+    testFile.deleteOnExit();
+    RandomAccessFile fp = new RandomAccessFile(testFile, "rw");
+    boolean shouldSuppressIOException = true;
+    try {
+      byte[] fileContent = new byte[1024];
+      new Random().nextBytes(fileContent);
+      fp.write(fileContent);
+      shouldSuppressIOException = false;
+    } finally {
+      Closeables.close(fp, shouldSuppressIOException);
+    }
+
+    final TransportConf conf = new TransportConf(
+      "shuffle", 
SslSampleConfigs.createDefaultConfigProviderForRpcNamespace());
+    fileChunk = new FileSegmentManagedBuffer(conf, testFile, 10, 
testFile.length() - 25);
+
+    streamManager = new StreamManager() {
+      @Override
+      public ManagedBuffer getChunk(long streamId, int chunkIndex) {
+        assertEquals(STREAM_ID, streamId);
+        if (chunkIndex == BUFFER_CHUNK_INDEX) {
+          return new NioManagedBuffer(buf);
+        } else if (chunkIndex == FILE_CHUNK_INDEX) {
+          return new FileSegmentManagedBuffer(conf, testFile, 10, 
testFile.length() - 25);
+        } else {
+          throw new IllegalArgumentException("Invalid chunk index: " + 
chunkIndex);
+        }
+      }
+    };
+    RpcHandler handler = new RpcHandler() {
+      @Override
+      public void receive(
+          TransportClient client,
+          ByteBuffer message,
+          RpcResponseCallback callback) {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public StreamManager getStreamManager() {
+        return streamManager;
+      }
+    };
+    context = new TransportContext(conf, handler);
+    server = context.createServer();
+    clientFactory = context.createClientFactory();
+  }

Review Comment:
   that's correct. I had run into issues since it's called in a `BeforeAll` 
block which didn't seem to work properly with inheritance - let me look at what 
you suggested



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to