Repository: flume Updated Branches: refs/heads/flume-1.6 4c5b602c5 -> d56feccfb
FLUME-2416: Use CodecPool in compressed stream to prevent leak of direct buffers (Hari Shreedharan via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d56feccf Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d56feccf Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d56feccf Branch: refs/heads/flume-1.6 Commit: d56feccfb922165ab35a856f3d2cc65649093571 Parents: 4c5b602 Author: Jarek Jarcec Cecho <[email protected]> Authored: Thu Jun 26 20:57:31 2014 -0700 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Thu Jun 26 20:59:01 2014 -0700 ---------------------------------------------------------------------- .../flume/sink/hdfs/HDFSCompressedDataStream.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/d56feccf/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java index fe857c3..dc93e4f 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java @@ -29,8 +29,10 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionOutputStream; +import org.apache.hadoop.io.compress.Compressor; import org.apache.hadoop.io.compress.DefaultCodec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +50,7 @@ public class HDFSCompressedDataStream extends AbstractHDFSWriter { private Context serializerContext; private EventSerializer serializer; private boolean useRawLocalFileSystem; + private Compressor compressor; @Override public void configure(Context context) { @@ -83,7 +86,6 @@ public class HDFSCompressedDataStream extends AbstractHDFSWriter { "is not of type LocalFileSystem: " + hdfs.getClass().getName()); } } - boolean appending = false; if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile (dstPath)) { @@ -92,7 +94,10 @@ public class HDFSCompressedDataStream extends AbstractHDFSWriter { } else { fsOut = hdfs.create(dstPath); } - cmpOut = codec.createOutputStream(fsOut); + if(compressor == null) { + compressor = CodecPool.getCompressor(codec, conf); + } + cmpOut = codec.createOutputStream(fsOut, compressor); serializer = EventSerializerFactory.getInstance(serializerType, serializerContext, cmpOut); if (appending && !serializer.supportsReopen()) { @@ -148,6 +153,10 @@ public class HDFSCompressedDataStream extends AbstractHDFSWriter { fsOut.flush(); fsOut.sync(); cmpOut.close(); + if (compressor != null) { + CodecPool.returnCompressor(compressor); + compressor = null; + } unregisterCurrentStream(); }
