DuLe created FLUME-3077:
---------------------------
Summary: Unexpected exception from downstream with Avro
sink+source compression (deflate)
Key: FLUME-3077
URL: https://issues.apache.org/jira/browse/FLUME-3077
Project: Flume
Issue Type: Question
Components: Sinks+Sources
Affects Versions: 1.7.0
Reporter: DuLe
I'm trying to setup a basic 2-tier Flume using the Avro source/sink to
communicate between tiers. I set the same compression-type on both side as the
following config:
# on the sink side:
agent.sinks.k11.type = avro
agent.sinks.k11.channel = c1
agent.sinks.k11.batch-size = 100
agent.sinks.k11.connect-timeout = 60000
agent.sinks.k11.request-timeout = 60000
agent.sinks.k11.reset-connection-interval = 3
agent.sinks.k11.hostname = <ip_source_host>
agent.sinks.k11.port = 4145
agent.sinks.k11.compression-type=deflate
agent.sinks.k11.compression-level = 6
agent.sinks.k11.maxIoWorkers = 8
# And the same on the source side:
collector.sources=r1
collector.sources.r1.bind=0.0.0.0
collector.sources.r1.channels=c1
collector.sources.r1.compression-level=6
collector.sources.r1.compression-type=deflate
collector.sources.r1.port=4145
collector.sources.r1.type=avro
# But I was getting this error:
id: 0x09405401, /10.199.6.225:32780 => /10.199.6.232:4145] BOUND:
/10.199.6.232:4145
24 Mar 2017 10:39:46,891 INFO [New I/O worker #6]
(org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171) -
[id: 0x09405401, /10.199.6.225:32780 => /10.199.6.232:4145] CONNECTED:
/10.199.6.225:32780
24 Mar 2017 10:39:46,913 INFO [New I/O worker #5]
(org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171) -
[id: 0xebef3dda, /10.199.6.225:60953 :> /10.199.6.232:4145] DISCONNECTED
24 Mar 2017 10:39:46,913 INFO [New I/O worker #5]
(org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171) -
[id: 0xebef3dda, /10.199.6.225:60953 :> /10.199.6.232:4145] UNBOUND
24 Mar 2017 10:39:46,913 INFO [New I/O worker #5]
(org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream:171) -
[id: 0xebef3dda, /10.199.6.225:60953 :> /10.199.6.232:4145] CLOSED
24 Mar 2017 10:39:46,913 INFO [New I/O worker #5]
(org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed:209) -
Connection to /10.199.6.225:60953 disconnected.
24 Mar 2017 10:39:46,913 WARN [New I/O worker #5]
(org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.exceptionCaught:201) -
Unexpected exception from downstream.
java.nio.channels.ClosedChannelException
at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.cleanUpWriteBuffer(AbstractNioWorker.java:433)
at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.writeFromUserCode(AbstractNioWorker.java:128)
at
org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.handleAcceptedSocket(NioServerSocketPipelineSink.java:99)
at
org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.eventSunk(NioServerSocketPipelineSink.java:36)
at
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:779)
at org.jboss.netty.channel.Channels.write(Channels.java:725)
at org.jboss.netty.channel.Channels.write(Channels.java:686)
at
org.jboss.netty.handler.codec.compression.ZlibEncoder.finishEncode(ZlibEncoder.java:380)
at
org.jboss.netty.handler.codec.compression.ZlibEncoder.handleDownstream(ZlibEncoder.java:316)
at
org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591)
at
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:784)
at
org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:54)
at
org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591)
at
org.jboss.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:582)
at org.jboss.netty.channel.Channels.close(Channels.java:812)
at
org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:197)
at
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed(NettyServer.java:212)
at
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:88)
at
org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
at
org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:493)
at
org.jboss.netty.handler.codec.frame.FrameDecoder.channelClosed(FrameDecoder.java:371)
at
org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:88)
at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at
org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
at
org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:60)
at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at
org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
at org.jboss.netty.channel.Channels.fireChannelClosed(Channels.java:468)
at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.close(AbstractNioWorker.java:375)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:93)
at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
at
org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at
org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at
org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
This warning appear after the channel closed the connection and occur
continuously. It only appear when config compression with "deflate"
Despite there are many continuously warnings, the data can decompress ok. Does
this warning effect to my flow ? I'm using Flume v1.7
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)