Dear Apache Flume Team,

I am Yusuke Wakuta. I hope this message finds you well.
I am writing to report a potential bug that I've encountered in Apache
Flume.
I appreciate the excellent work you have produced, and I hope the
information I provide below on the issue will contribute to enhancing
Apache Flume's functionality.

## AvroSource/AvroSink: Failed to connect with `ssl = true`,
`compression-type = deflate` between flume v1.9.0 and flume v1.10.1

In flume v1.10.1, the location of adding SslHandler to netty's pipeline has
changed from the method in v1.9.0, so AvroSink (flume 1.9.0) and AvroSource
(flume 1.10) with both compression and ssl enabled, there was a behavior
where communication failed.

### Detail

When using the settings `compression-type = deflate` and `ssl = true`
respectively with AvroSink in Flume (version 1.9.0) and AvroSource in Flume
(version 1.10.1), an exception occurs, resulting in a failed communication.
Hereafter, the AvroSink is referred to as current_step_flume, while the
AvroSource is referred to as next_step_flume.

* AvroSink setting in current_step_flume

    ```
    agent.sinks.avro-sink1.type = avro
    agent.sinks.avro-sink1.channel = ch1
    agent.sinks.avro-sink1.hostname = {{flume.hostname.next_step_flume}}
    agent.sinks.avro-sink1.port = 61414
    agent.sinks.avro-sink1.ssl = true
    agent.sinks.avro-sink1.compression-type = deflate
    ```

* AvroSource setting in next_step_flume

    ```
    agent.sources.avro-source1.bind = 0.0.0.0
    agent.sources.avro-source1.channels = ch2
    agent.sources.avro-source1.port = 61414
    agent.sources.avro-source1.ssl = true
    agent.sources.avro-source1.compression-type = deflate
    agent.sources.avro-source1.type = avro
    ```

* error log on current_step_flume

    ```log
    03 Aug 2023 19:24:57,830 ERROR
[SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.SinkRunner$PollingRunner.run:158)  - Unable to deliver
event. Exception follows.
    org.apache.flume.EventDeliveryException: Failed to send events
            at
org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:398)
            at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
            at
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
            at java.base/java.lang.Thread.run(Thread.java:829)
    Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient
{ host: ${NEXT_STEP_FLUME_HOSTNAME}, port: 61414 }: Failed to send batch
            at
org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:310)
            at
org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:380)
            ... 3 more
    Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient
{ host: ${NEXT_STEP_FLUME_HOSTNAME}, port: 61414 }: Exception thrown from
remote handler
            at
org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:392)
            at
org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:369)
            at
org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:298)
            ... 4 more
    Caused by: java.util.concurrent.ExecutionException:
org.jboss.netty.handler.ssl.NotSslRecordException: not an SSL/TLS record:
789c030000000001
            at org.apache.avro.ipc.CallFuture.get(CallFuture.java:128)
            at
org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:384)
            ... 6 more
    Caused by: org.jboss.netty.handler.ssl.NotSslRecordException: not an
SSL/TLS record: 789c030000000001
            at
org.jboss.netty.handler.ssl.SslHandler.decode(SslHandler.java:857)
            at
org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:425)
            at
org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
            at
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
            at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
            at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
            at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
            at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
            at
org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
            at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
            at
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
            at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
            at
org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
            at
org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
            at
org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
            at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
            at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
            ... 1 more
    ```

* error log on next_step_flume

    ```log
    03 Aug 2023 19:24:57,830 ERROR
[SinkRunner-PollingRunner-DefaultSinkProcessor]
(org.apache.flume.SinkRunner$PollingRunner.run:158)  - Unable to deliver
event. Exception follows.
    org.apache.flume.EventDeliveryException: Failed to send events
            at
org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:398)
            at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
            at
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
            at java.base/java.lang.Thread.run(Thread.java:829)
    Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient
{ host: ${NEXT_STEP_FLUME_HOSTNAME}, port: 61414 }: Failed to send batch
            at
org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:310)
            at
org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:380)
            ... 3 more
    Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient
{ host: ${NEXT_STEP_FLUME_HOSTNAME}, port: 61414 }: Exception thrown from
remote handler
            at
org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:392)
            at
org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:369)
            at
org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:298)
            ... 4 more
    Caused by: java.util.concurrent.ExecutionException:
org.jboss.netty.handler.ssl.NotSslRecordException: not an SSL/TLS record:
789c030000000001
            at org.apache.avro.ipc.CallFuture.get(CallFuture.java:128)
            at
org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:384)
            ... 6 more
    Caused by: org.jboss.netty.handler.ssl.NotSslRecordException: not an
SSL/TLS record: 789c030000000001
            at
org.jboss.netty.handler.ssl.SslHandler.decode(SslHandler.java:857)
            at
org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:425)
            at
org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
            at
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
            at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
            at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
            at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
            at
org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
            at
org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
            at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
            at
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
            at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
            at
org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
            at
org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
            at
org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
            at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
            at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
            ... 1 more
    ```

I inspected the packet capture and found the TLS handshake fails after the
client sends `Client Hello` to the server.

### The cause of this behavior

The issue was due to the different positions where the SslHandler was added
to the netty pipeline in flume v1.9.0 and flume v1.10.1.
In v1.9.0, SslHandler was added at the very beginning of the pipeline with
addFirst()
<https://github.com/apache/flume/blob/d4fcab4f501d41597bc616921329a4339f73585e/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java#L443>.
On the other hand, as part of the changes made in FLUME-3363 - Upgrade
Netty to version 4. Update Avro to 1.11.0. Update PMD plugin
<https://github.com/apache/flume/commit/2810811f66da74342d19854a23fcd498b12aea9a>
to
accommodate FLUME-3363, it was altered to add the SslHandler with addLast().
I have not been able to find out the exact reason for this change from
addFirst() to addLast() in this commit.

To determine which is appropriate between adding the sslHandler with
addFirst() and adding it with addLast(), I have created a sample case based
on netty's EchoServer
<https://github.com/netty/netty/tree/4.1/example/src/main/java/io/netty/example/echo>
.

### the minimum code example to reproduce the behavior

I made the minimum code example with netty server and client. And I took
packet capture between the client and the server.
The server consists of two classes,, EchoServer and Echo ServerHandler.

* EchoServer

    ```java
    /*
     * Copyright 2012 The Netty Project
     *
     * The Netty Project licenses this file to you under the Apache License,
     * version 2.0 (the "License"); you may not use this file except in
compliance
     * with the License. You may obtain a copy of the License at:
     *
     *   <https://www.apache.org/licenses/LICENSE-2.0>
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT
     * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
the
     * License for the specific language governing permissions and
limitations
     * under the License.
     */
    package org.example.echo;

    import javax.net.ssl.SSLEngine;

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBufAllocator;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.compression.JZlibDecoder;
    import io.netty.handler.codec.compression.JZlibEncoder;
    import io.netty.handler.codec.compression.ZlibEncoder;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.ssl.SslContext;
    import io.netty.handler.ssl.SslContextBuilder;
    import io.netty.handler.ssl.SslHandler;
    import io.netty.handler.ssl.util.SelfSignedCertificate;

    /**
     * Echoes back any received data from a client.
     */
    public final class EchoServer {

        static final int PORT = Integer.parseInt(System.getProperty("port",
"8007"));

        public static void main(String[] args) throws Exception {

            // Configure the server.
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            final EchoServerHandler serverHandler = new EchoServerHandler();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 100)
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new
ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch)
throws Exception {
                                ChannelPipeline p = ch.pipeline();
                                ZlibEncoder encoder = new JZlibEncoder(6);
                                p.addFirst("deflater", encoder);
                                p.addFirst("inflater", new JZlibDecoder());

                                SelfSignedCertificate ssc = new
SelfSignedCertificate();
                                SslContext context =
SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey())
                                        .build();
                                SSLEngine engine =
context.newEngine(ByteBufAllocator.DEFAULT);
                                engine.setUseClientMode(false);

                                // checking the behavior with changing
addLast() and addFirst()
                                p.addLast(new SslHandler(engine));
                                // p.addFirst(new SslHandler(engine));

                                p.addLast(new
LoggingHandler(LogLevel.INFO));
                                p.addLast(serverHandler);
                            }
                        });

                // Start the server.
                ChannelFuture f = b.bind(PORT).sync();

                // Wait until the server socket is closed.
                f.channel().closeFuture().sync();
            } finally {
                // Shut down all event loops to terminate all threads.
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    ```

- EchoServerHandler

    ```java
    /*
     * Copyright 2012 The Netty Project
     *
     * The Netty Project licenses this file to you under the Apache License,
     * version 2.0 (the "License"); you may not use this file except in
compliance
     * with the License. You may obtain a copy of the License at:
     *
     *   <https://www.apache.org/licenses/LICENSE-2.0>
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT
     * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
the
     * License for the specific language governing permissions and
limitations
     * under the License.
     */
    package org.example.echo;

    import io.netty.channel.ChannelHandler.Sharable;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;

    /**
     * Handler implementation for the echo server.
     */
    @Sharable
    public class EchoServerHandler extends ChannelInboundHandlerAdapter {

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            ctx.write(msg);
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) {
            ctx.flush();
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable
cause) {
            // Close the connection when an exception is raised.
            cause.printStackTrace();
            ctx.close();
        }
    }
    ```

And the client consists of two classes, EchoClient and EchoClientHandler

- EchoClient

    ```java
    /*
     * Copyright 2012 The Netty Project
     *
     * The Netty Project licenses this file to you under the Apache License,
     * version 2.0 (the "License"); you may not use this file except in
compliance
     * with the License. You may obtain a copy of the License at:
     *
     *   <https://www.apache.org/licenses/LICENSE-2.0>
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT
     * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
the
     * License for the specific language governing permissions and
limitations
     * under the License.
     */
    package org.example.echo;

    import javax.net.ssl.SSLEngine;

    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBufAllocator;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;

    import io.netty.handler.codec.compression.JZlibDecoder;
    import io.netty.handler.codec.compression.JZlibEncoder;
    import io.netty.handler.codec.compression.ZlibEncoder;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.ssl.SslContext;
    import io.netty.handler.ssl.SslContextBuilder;
    import io.netty.handler.ssl.SslHandler;
    import io.netty.handler.ssl.util.InsecureTrustManagerFactory;

    /**
     * Sends one message when a connection is open and echoes back any
received
     * data to the server.  Simply put, the echo client initiates the
ping-pong
     * traffic between the echo client and server by sending the first
message to
     * the server.
     */
    public final class EchoClient {

        static final String HOST = System.getProperty("host", "127.0.0.1");
        static final int PORT = Integer.parseInt(System.getProperty("port",
"8007"));
        static final int SIZE = Integer.parseInt(System.getProperty("size",
"8"));

        public static void main(String[] args) throws Exception {

            // Configure the client.
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group)
                 .channel(NioSocketChannel.class)
                 .option(ChannelOption.TCP_NODELAY, true)
                 .handler(new ChannelInitializer<SocketChannel>() {
                     @Override
                     public void initChannel(SocketChannel ch) throws
Exception {
                         ChannelPipeline p = ch.pipeline();
                         ZlibEncoder encoder = new JZlibEncoder(6);
                         p.addFirst("deflater", encoder);
                         p.addFirst("inflater", new JZlibDecoder());

                         SslContext context = SslContextBuilder.forClient()

 .trustManager(InsecureTrustManagerFactory.INSTANCE)
                                 .build();
                         SSLEngine engine =
context.newEngine(ByteBufAllocator.DEFAULT);
                         engine.setUseClientMode(true);

                          // checking the behavior with changing addLast()
and addFirst()
                         p.addLast(new SslHandler(engine));
                         // p.addFirst(new SslHandler(engine));

                         p.addLast(new LoggingHandler(LogLevel.INFO));
                         p.addLast(new EchoClientHandler());
                     }
                 });

                // Start the client.
                ChannelFuture f = b.connect(HOST, PORT).sync();

                // Wait until the connection is closed.
                f.channel().closeFuture().sync();
            } finally {
                // Shut down the event loop to terminate all threads.
                group.shutdownGracefully();
            }
        }
    }
    ```

- EchoClientHandler

    ```log
    /*
     * Copyright 2012 The Netty Project
     *
     * The Netty Project licenses this file to you under the Apache License,
     * version 2.0 (the "License"); you may not use this file except in
compliance
     * with the License. You may obtain a copy of the License at:
     *
     *   <https://www.apache.org/licenses/LICENSE-2.0>
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT
     * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
the
     * License for the specific language governing permissions and
limitations
     * under the License.
     */
    package org.example.echo;

    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;

    /**
     * Handler implementation for the echo client.  It initiates the
ping-pong
     * traffic between the echo client and server by sending the first
message to
     * the server.
     */
    public class EchoClientHandler extends ChannelInboundHandlerAdapter {

        private final ByteBuf firstMessage;

        /**
         * Creates a client-side handler.
         */
        public EchoClientHandler() {
            firstMessage = Unpooled.buffer(EchoClient.SIZE);
            for (int i = 0; i < firstMessage.capacity(); i ++) {
                firstMessage.writeByte((byte) i);
            }
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            ctx.writeAndFlush(firstMessage);
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            ctx.write(msg);
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) {
           ctx.flush();
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable
cause) {
            // Close the connection when an exception is raised.
            cause.printStackTrace();
            ctx.close();
        }
    }
    ```

* As implemented in flume v1.9.0, when the SslHandler is used at addFirst()
for both EchoClient and EchoServer, packet capture verified that a TLS
handshake is being carried out and communication is occurring successfully.
* In cases where the SslHandler was used with addFirst() in EchoClient and
addLast() in EchoServer, it was confirmed through packet capture that a
failure in the TLS handshake occurred. This closely mirrored the packet
captures observed in Flume, and the execution logs also noted similar
entries as the connection between flume 1.9.0 and 1.10.1. I guess, in this
case, the server tries to decompress the `Client Hello` and fails.
* As implemented in flume v1.10.1, when the SslHandler is used with
addLast() for both EchoClient and EchoServer, the communication itself is
successful. However, a TLS handshake could not be confirmed, and it
appeared that only TCP packets were being exchanged.

### possible solution

Ideally, it would be preferable to communicate using the TLS protocol when
SSL is enabled. Therefore, I propose applying a patch below to Flume
v1.10.1 that, similar to Flume 1.9.0, utilises SslHandler with addFirst().

```
diff --git
a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
index 4dd07261..11c9292b 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
@@ -196,7 +196,7 @@ public class AvroSource extends
SslContextAwareAbstractSource implements EventDr
                   pipeline.addFirst("inflater", new JZlibDecoder());
                 }
                 Optional<SSLEngine> engine = getSslEngine(false);
-                engine.ifPresent(sslEngine -> pipeline.addLast("ssl", new
SslHandler(sslEngine)));
+                engine.ifPresent(sslEngine -> pipeline.addFirst("ssl", new
SslHandler(sslEngine)));
                 if (enableIpFilter) {
                   logger.info("Setting up ipFilter with the following rule
definition: " +
                           patternRuleConfigDefinition);
diff --git
a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
index cc495695..f2458e3b 100644
---
a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
+++
b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java
@@ -132,7 +132,7 @@ public class NettyAvroRpcClient extends
SSLContextAwareAbstractRpcClient {
                 SSLEngine engine = createSSLEngine();
                 if (engine != null) {
                   engine.setUseClientMode(true);
-                  pipeline.addLast("ssl", new SslHandler(engine));
+                  pipeline.addFirst("ssl", new SslHandler(engine));
                 }
               });
       avroClient =
SpecificRequestor.getClient(AvroSourceProtocol.Callback.class, transceiver);
```

I hope that this information is helpful and that the proposed solution is
considered viable.
Looking forward to your feedback on this matter.

Thank you for your time and continued efforts to maintain and improve
Apache Flume.

Kind Regards,
Yusuke Wakuta

Reply via email to