This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new eff292bd5f8 HADOOP-18383. Codecs with @DoNotPool annotation are not 
closed causing memory leak (#4739)
eff292bd5f8 is described below

commit eff292bd5f8a707b9402a3df2523a92cb7b24e70
Author: kevins-29 <100220899+kevins...@users.noreply.github.com>
AuthorDate: Mon Aug 15 19:14:02 2022 +0200

    HADOOP-18383. Codecs with @DoNotPool annotation are not closed causing 
memory leak (#4739)
---
 .../hadoop/io/compress/AlreadyClosedException.java | 33 +++++++++++++++++++
 .../org/apache/hadoop/io/compress/CodecPool.java   |  2 ++
 .../io/compress/zlib/BuiltInGzipDecompressor.java  | 13 +++++++-
 .../apache/hadoop/io/compress/TestCodecPool.java   | 38 ++++++++++++++++++++++
 4 files changed, 85 insertions(+), 1 deletion(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/AlreadyClosedException.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/AlreadyClosedException.java
new file mode 100644
index 00000000000..104ad24577f
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/AlreadyClosedException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress;
+
+import java.io.IOException;
+
+/**
+ * An exception class for when a closed compressor/decopressor is being used.
+ * {@link org.apache.hadoop.io.compress.Compressor}
+ * {@link org.apache.hadoop.io.compress.Decompressor}
+ */
+public class AlreadyClosedException extends IOException {
+
+  public AlreadyClosedException(String message) {
+    super(message);
+  }
+}
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java
index 69e8c99a1f4..9a18f9c52c7 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java
@@ -205,6 +205,7 @@ public class CodecPool {
     }
     // if the compressor can't be reused, don't pool it.
     if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
+      compressor.end();
       return;
     }
     compressor.reset();
@@ -225,6 +226,7 @@ public class CodecPool {
     }
     // if the decompressor can't be reused, don't pool it.
     if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
+      decompressor.end();
       return;
     }
     decompressor.reset();
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java
index 896d35eb180..24b8c392f76 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/BuiltInGzipDecompressor.java
@@ -23,6 +23,7 @@ import java.util.zip.Checksum;
 import java.util.zip.DataFormatException;
 import java.util.zip.Inflater;
 
+import org.apache.hadoop.io.compress.AlreadyClosedException;
 import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.io.compress.DoNotPool;
 import org.apache.hadoop.util.DataChecksum;
@@ -105,7 +106,11 @@ public class BuiltInGzipDecompressor implements 
Decompressor {
      * Immediately after the trailer (and potentially prior to the next gzip
      * member/substream header), without reset() having been called.
      */
-    FINISHED;
+    FINISHED,
+    /**
+     * Immediately after end() has been called.
+     */
+    ENDED;
   }
 
   /**
@@ -182,6 +187,10 @@ public class BuiltInGzipDecompressor implements 
Decompressor {
   throws IOException {
     int numAvailBytes = 0;
 
+    if (state == GzipStateLabel.ENDED) {
+      throw new AlreadyClosedException("decompress called on closed 
decompressor");
+    }
+
     if (state != GzipStateLabel.DEFLATE_STREAM) {
       executeHeaderState();
 
@@ -472,6 +481,8 @@ public class BuiltInGzipDecompressor implements 
Decompressor {
   @Override
   public synchronized void end() {
     inflater.end();
+
+    state = GzipStateLabel.ENDED;
   }
 
   /**
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java
index 1fb25cb9087..367b85862e8 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodecPool.java
@@ -19,6 +19,10 @@ package org.apache.hadoop.io.compress;
 
 import static org.junit.Assert.assertEquals;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.OutputStream;
+import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -26,6 +30,8 @@ import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.zlib.BuiltInGzipDecompressor;
+import org.apache.hadoop.test.LambdaTestUtils;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -189,4 +195,36 @@ public class TestCodecPool {
       CodecPool.returnDecompressor(decompressor);
     }
   }
+
+  @Test(timeout = 10000)
+  public void testDoNotPoolDecompressorNotUseableAfterReturn() throws 
Exception {
+
+    final GzipCodec gzipCodec = new GzipCodec();
+    gzipCodec.setConf(new Configuration());
+
+    final Random random = new Random();
+    final byte[] bytes = new byte[1024];
+    random.nextBytes(bytes);
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try (OutputStream outputStream = gzipCodec.createOutputStream(baos)) {
+      outputStream.write(bytes);
+    }
+
+    final byte[] gzipBytes = baos.toByteArray();
+    final ByteArrayInputStream bais = new ByteArrayInputStream(gzipBytes);
+
+    // BuiltInGzipDecompressor is an explicit example of a Decompressor
+    // with the @DoNotPool annotation
+    final Decompressor decompressor = new BuiltInGzipDecompressor();
+    CodecPool.returnDecompressor(decompressor);
+
+    final CompressionInputStream inputStream = 
gzipCodec.createInputStream(bais, decompressor);
+    LambdaTestUtils.intercept(
+            AlreadyClosedException.class,
+            "decompress called on closed decompressor",
+            "Decompressor from Codec with @DoNotPool should not be " +
+                    "useable after returning to CodecPool",
+        () -> inputStream.read());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to