Repository: flume
Updated Branches:
  refs/heads/trunk d0d00c371 -> 9940dcbfe


FLUME-2416: Use CodecPool in compressed stream to prevent leak of direct buffers

(Hari Shreedharan via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/9940dcbf
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/9940dcbf
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/9940dcbf

Branch: refs/heads/trunk
Commit: 9940dcbfefbe1248f65aa83f2f84e352ce022041
Parents: d0d00c3
Author: Jarek Jarcec Cecho <[email protected]>
Authored: Thu Jun 26 20:57:31 2014 -0700
Committer: Jarek Jarcec Cecho <[email protected]>
Committed: Thu Jun 26 20:57:31 2014 -0700

----------------------------------------------------------------------
 .../flume/sink/hdfs/HDFSCompressedDataStream.java      | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/9940dcbf/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
 
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
index fe857c3..dc93e4f 100644
--- 
a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
+++ 
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java
@@ -29,8 +29,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,6 +50,7 @@ public class HDFSCompressedDataStream extends 
AbstractHDFSWriter {
   private Context serializerContext;
   private EventSerializer serializer;
   private boolean useRawLocalFileSystem;
+  private Compressor compressor;
 
   @Override
   public void configure(Context context) {
@@ -83,7 +86,6 @@ public class HDFSCompressedDataStream extends 
AbstractHDFSWriter {
             "is not of type LocalFileSystem: " + hdfs.getClass().getName());
       }
     }
-
     boolean appending = false;
     if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile
     (dstPath)) {
@@ -92,7 +94,10 @@ public class HDFSCompressedDataStream extends 
AbstractHDFSWriter {
     } else {
       fsOut = hdfs.create(dstPath);
     }
-    cmpOut = codec.createOutputStream(fsOut);
+    if(compressor == null) {
+      compressor = CodecPool.getCompressor(codec, conf);
+    }
+    cmpOut = codec.createOutputStream(fsOut, compressor);
     serializer = EventSerializerFactory.getInstance(serializerType,
         serializerContext, cmpOut);
     if (appending && !serializer.supportsReopen()) {
@@ -148,6 +153,10 @@ public class HDFSCompressedDataStream extends 
AbstractHDFSWriter {
     fsOut.flush();
     fsOut.sync();
     cmpOut.close();
+    if (compressor != null) {
+      CodecPool.returnCompressor(compressor);
+      compressor = null;
+    }
     unregisterCurrentStream();
   }
 

Reply via email to