Dear Apache Flume Team, I am Yusuke Wakuta. I hope this message finds you well. I am writing to report a potential bug that I've encountered in Apache Flume. I appreciate the excellent work you have produced, and I hope the information I provide below on the issue will contribute to enhancing Apache Flume's functionality.
## AvroSource/AvroSink: Failed to connect with `ssl = true`, `compression-type = deflate` between flume v1.9.0 and flume v1.10.1 In flume v1.10.1, the location of adding SslHandler to netty's pipeline has changed from the method in v1.9.0, so AvroSink (flume 1.9.0) and AvroSource (flume 1.10) with both compression and ssl enabled, there was a behavior where communication failed. ### Detail When using the settings `compression-type = deflate` and `ssl = true` respectively with AvroSink in Flume (version 1.9.0) and AvroSource in Flume (version 1.10.1), an exception occurs, resulting in a failed communication. Hereafter, the AvroSink is referred to as current_step_flume, while the AvroSource is referred to as next_step_flume. * AvroSink setting in current_step_flume ``` agent.sinks.avro-sink1.type = avro agent.sinks.avro-sink1.channel = ch1 agent.sinks.avro-sink1.hostname = {{flume.hostname.next_step_flume}} agent.sinks.avro-sink1.port = 61414 agent.sinks.avro-sink1.ssl = true agent.sinks.avro-sink1.compression-type = deflate ``` * AvroSource setting in next_step_flume ``` agent.sources.avro-source1.bind = 0.0.0.0 agent.sources.avro-source1.channels = ch2 agent.sources.avro-source1.port = 61414 agent.sources.avro-source1.ssl = true agent.sources.avro-source1.compression-type = deflate agent.sources.avro-source1.type = avro ``` * error log on current_step_flume ```log 03 Aug 2023 19:24:57,830 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:158) - Unable to deliver event. Exception follows. org.apache.flume.EventDeliveryException: Failed to send events at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:398) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: ${NEXT_STEP_FLUME_HOSTNAME}, port: 61414 }: Failed to send batch at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:310) at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:380) ... 3 more Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: ${NEXT_STEP_FLUME_HOSTNAME}, port: 61414 }: Exception thrown from remote handler at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:392) at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:369) at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:298) ... 4 more Caused by: java.util.concurrent.ExecutionException: org.jboss.netty.handler.ssl.NotSslRecordException: not an SSL/TLS record: 789c030000000001 at org.apache.avro.ipc.CallFuture.get(CallFuture.java:128) at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:384) ... 6 more Caused by: org.jboss.netty.handler.ssl.NotSslRecordException: not an SSL/TLS record: 789c030000000001 at org.jboss.netty.handler.ssl.SslHandler.decode(SslHandler.java:857) at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:425) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ... 1 more ``` * error log on next_step_flume ```log 03 Aug 2023 19:24:57,830 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:158) - Unable to deliver event. Exception follows. org.apache.flume.EventDeliveryException: Failed to send events at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:398) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: ${NEXT_STEP_FLUME_HOSTNAME}, port: 61414 }: Failed to send batch at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:310) at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:380) ... 3 more Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: ${NEXT_STEP_FLUME_HOSTNAME}, port: 61414 }: Exception thrown from remote handler at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:392) at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:369) at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:298) ... 4 more Caused by: java.util.concurrent.ExecutionException: org.jboss.netty.handler.ssl.NotSslRecordException: not an SSL/TLS record: 789c030000000001 at org.apache.avro.ipc.CallFuture.get(CallFuture.java:128) at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:384) ... 6 more Caused by: org.jboss.netty.handler.ssl.NotSslRecordException: not an SSL/TLS record: 789c030000000001 at org.jboss.netty.handler.ssl.SslHandler.decode(SslHandler.java:857) at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:425) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ... 1 more ``` I inspected the packet capture and found the TLS handshake fails after the client sends `Client Hello` to the server. ### The cause of this behavior The issue was due to the different positions where the SslHandler was added to the netty pipeline in flume v1.9.0 and flume v1.10.1. In v1.9.0, SslHandler was added at the very beginning of the pipeline with addFirst() <https://github.com/apache/flume/blob/d4fcab4f501d41597bc616921329a4339f73585e/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java#L443>. On the other hand, as part of the changes made in FLUME-3363 - Upgrade Netty to version 4. Update Avro to 1.11.0. Update PMD plugin <https://github.com/apache/flume/commit/2810811f66da74342d19854a23fcd498b12aea9a> to accommodate FLUME-3363, it was altered to add the SslHandler with addLast(). I have not been able to find out the exact reason for this change from addFirst() to addLast() in this commit. To determine which is appropriate between adding the sslHandler with addFirst() and adding it with addLast(), I have created a sample case based on netty's EchoServer <https://github.com/netty/netty/tree/4.1/example/src/main/java/io/netty/example/echo> . ### the minimum code example to reproduce the behavior I made the minimum code example with netty server and client. And I took packet capture between the client and the server. The server consists of two classes,, EchoServer and Echo ServerHandler. * EchoServer ```java /* * Copyright 2012 The Netty Project * * The Netty Project licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at: * * <https://www.apache.org/licenses/LICENSE-2.0> * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations * under the License. */ package org.example.echo; import javax.net.ssl.SSLEngine; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.compression.JZlibDecoder; import io.netty.handler.codec.compression.JZlibEncoder; import io.netty.handler.codec.compression.ZlibEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.util.SelfSignedCertificate; /** * Echoes back any received data from a client. */ public final class EchoServer { static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); public static void main(String[] args) throws Exception { // Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); final EchoServerHandler serverHandler = new EchoServerHandler(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); ZlibEncoder encoder = new JZlibEncoder(6); p.addFirst("deflater", encoder); p.addFirst("inflater", new JZlibDecoder()); SelfSignedCertificate ssc = new SelfSignedCertificate(); SslContext context = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()) .build(); SSLEngine engine = context.newEngine(ByteBufAllocator.DEFAULT); engine.setUseClientMode(false); // checking the behavior with changing addLast() and addFirst() p.addLast(new SslHandler(engine)); // p.addFirst(new SslHandler(engine)); p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } }); // Start the server. ChannelFuture f = b.bind(PORT).sync(); // Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } ``` - EchoServerHandler ```java /* * Copyright 2012 The Netty Project * * The Netty Project licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at: * * <https://www.apache.org/licenses/LICENSE-2.0> * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations * under the License. */ package org.example.echo; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * Handler implementation for the echo server. */ @Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.write(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } } ``` And the client consists of two classes, EchoClient and EchoClientHandler - EchoClient ```java /* * Copyright 2012 The Netty Project * * The Netty Project licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at: * * <https://www.apache.org/licenses/LICENSE-2.0> * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations * under the License. */ package org.example.echo; import javax.net.ssl.SSLEngine; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.compression.JZlibDecoder; import io.netty.handler.codec.compression.JZlibEncoder; import io.netty.handler.codec.compression.ZlibEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; /** * Sends one message when a connection is open and echoes back any received * data to the server. Simply put, the echo client initiates the ping-pong * traffic between the echo client and server by sending the first message to * the server. */ public final class EchoClient { static final String HOST = System.getProperty("host", "127.0.0.1"); static final int PORT = Integer.parseInt(System.getProperty("port", "8007")); static final int SIZE = Integer.parseInt(System.getProperty("size", "8")); public static void main(String[] args) throws Exception { // Configure the client. EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); ZlibEncoder encoder = new JZlibEncoder(6); p.addFirst("deflater", encoder); p.addFirst("inflater", new JZlibDecoder()); SslContext context = SslContextBuilder.forClient() .trustManager(InsecureTrustManagerFactory.INSTANCE) .build(); SSLEngine engine = context.newEngine(ByteBufAllocator.DEFAULT); engine.setUseClientMode(true); // checking the behavior with changing addLast() and addFirst() p.addLast(new SslHandler(engine)); // p.addFirst(new SslHandler(engine)); p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new EchoClientHandler()); } }); // Start the client. ChannelFuture f = b.connect(HOST, PORT).sync(); // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { // Shut down the event loop to terminate all threads. group.shutdownGracefully(); } } } ``` - EchoClientHandler ```log /* * Copyright 2012 The Netty Project * * The Netty Project licenses this file to you under the Apache License, * version 2.0 (the "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at: * * <https://www.apache.org/licenses/LICENSE-2.0> * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations * under the License. */ package org.example.echo; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * Handler implementation for the echo client. It initiates the ping-pong * traffic between the echo client and server by sending the first message to * the server. */ public class EchoClientHandler extends ChannelInboundHandlerAdapter { private final ByteBuf firstMessage; /** * Creates a client-side handler. */ public EchoClientHandler() { firstMessage = Unpooled.buffer(EchoClient.SIZE); for (int i = 0; i < firstMessage.capacity(); i ++) { firstMessage.writeByte((byte) i); } } @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(firstMessage); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ctx.write(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } } ``` * As implemented in flume v1.9.0, when the SslHandler is used at addFirst() for both EchoClient and EchoServer, packet capture verified that a TLS handshake is being carried out and communication is occurring successfully. * In cases where the SslHandler was used with addFirst() in EchoClient and addLast() in EchoServer, it was confirmed through packet capture that a failure in the TLS handshake occurred. This closely mirrored the packet captures observed in Flume, and the execution logs also noted similar entries as the connection between flume 1.9.0 and 1.10.1. I guess, in this case, the server tries to decompress the `Client Hello` and fails. * As implemented in flume v1.10.1, when the SslHandler is used with addLast() for both EchoClient and EchoServer, the communication itself is successful. However, a TLS handshake could not be confirmed, and it appeared that only TCP packets were being exchanged. ### possible solution Ideally, it would be preferable to communicate using the TLS protocol when SSL is enabled. Therefore, I propose applying a patch below to Flume v1.10.1 that, similar to Flume 1.9.0, utilises SslHandler with addFirst(). ``` diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java index 4dd07261..11c9292b 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java @@ -196,7 +196,7 @@ public class AvroSource extends SslContextAwareAbstractSource implements EventDr pipeline.addFirst("inflater", new JZlibDecoder()); } Optional<SSLEngine> engine = getSslEngine(false); - engine.ifPresent(sslEngine -> pipeline.addLast("ssl", new SslHandler(sslEngine))); + engine.ifPresent(sslEngine -> pipeline.addFirst("ssl", new SslHandler(sslEngine))); if (enableIpFilter) { logger.info("Setting up ipFilter with the following rule definition: " + patternRuleConfigDefinition); diff --git a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java index cc495695..f2458e3b 100644 --- a/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java +++ b/flume-ng-sdk/src/main/java/org/apache/flume/api/NettyAvroRpcClient.java @@ -132,7 +132,7 @@ public class NettyAvroRpcClient extends SSLContextAwareAbstractRpcClient { SSLEngine engine = createSSLEngine(); if (engine != null) { engine.setUseClientMode(true); - pipeline.addLast("ssl", new SslHandler(engine)); + pipeline.addFirst("ssl", new SslHandler(engine)); } }); avroClient = SpecificRequestor.getClient(AvroSourceProtocol.Callback.class, transceiver); ``` I hope that this information is helpful and that the proposed solution is considered viable. Looking forward to your feedback on this matter. Thank you for your time and continued efforts to maintain and improve Apache Flume. Kind Regards, Yusuke Wakuta