Repository: hadoop
Updated Branches:
  refs/heads/branch-2 6a016faf5 -> 806a8dea3


MAPREDUCE-6628. Potential memory leak in CryptoOutputStream. Contributed by 
Mariappan Asokan

(cherry picked from commit 9f192cc5ac4a6145e2eeaecba0a754d31e601898)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/806a8dea
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/806a8dea
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/806a8dea

Branch: refs/heads/branch-2
Commit: 806a8dea3894c22940a640b539782dd2c04df1f3
Parents: 6a016fa
Author: Chris Douglas <cdoug...@apache.org>
Authored: Fri Sep 9 11:12:44 2016 -0700
Committer: Chris Douglas <cdoug...@apache.org>
Committed: Fri Sep 9 11:14:30 2016 -0700

----------------------------------------------------------------------
 .../hadoop/crypto/CryptoOutputStream.java       | 22 +++++++-
 .../fs/crypto/CryptoFSDataOutputStream.java     |  8 ++-
 .../crypto/TestCryptoOutputStreamClosing.java   | 57 ++++++++++++++++++++
 .../java/org/apache/hadoop/mapred/MapTask.java  | 40 ++++++++++++--
 .../apache/hadoop/mapreduce/CryptoUtils.java    | 56 +++++++++++++------
 5 files changed, 159 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/806a8dea/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java
index 1753019..ab51bfe 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java
@@ -76,6 +76,7 @@ public class CryptoOutputStream extends FilterOutputStream 
implements
   private final byte[] key;
   private final byte[] initIV;
   private byte[] iv;
+  private boolean closeOutputStream;
   
   public CryptoOutputStream(OutputStream out, CryptoCodec codec, 
       int bufferSize, byte[] key, byte[] iv) throws IOException {
@@ -85,6 +86,13 @@ public class CryptoOutputStream extends FilterOutputStream 
implements
   public CryptoOutputStream(OutputStream out, CryptoCodec codec, 
       int bufferSize, byte[] key, byte[] iv, long streamOffset) 
       throws IOException {
+    this(out, codec, bufferSize, key, iv, streamOffset, true);
+  }
+
+  public CryptoOutputStream(OutputStream out, CryptoCodec codec,
+      int bufferSize, byte[] key, byte[] iv, long streamOffset,
+      boolean closeOutputStream)
+      throws IOException {
     super(out);
     CryptoStreamUtils.checkCodec(codec);
     this.bufferSize = CryptoStreamUtils.checkBufferSize(codec, bufferSize);
@@ -95,6 +103,7 @@ public class CryptoOutputStream extends FilterOutputStream 
implements
     inBuffer = ByteBuffer.allocateDirect(this.bufferSize);
     outBuffer = ByteBuffer.allocateDirect(this.bufferSize);
     this.streamOffset = streamOffset;
+    this.closeOutputStream = closeOutputStream;
     try {
       encryptor = codec.createEncryptor();
     } catch (GeneralSecurityException e) {
@@ -110,8 +119,14 @@ public class CryptoOutputStream extends FilterOutputStream 
implements
   
   public CryptoOutputStream(OutputStream out, CryptoCodec codec, 
       byte[] key, byte[] iv, long streamOffset) throws IOException {
+    this(out, codec, key, iv, streamOffset, true);
+  }
+
+  public CryptoOutputStream(OutputStream out, CryptoCodec codec,
+      byte[] key, byte[] iv, long streamOffset, boolean closeOutputStream)
+      throws IOException {
     this(out, codec, CryptoStreamUtils.getBufferSize(codec.getConf()), 
-        key, iv, streamOffset);
+        key, iv, streamOffset, closeOutputStream);
   }
   
   public OutputStream getWrappedStream() {
@@ -221,7 +236,10 @@ public class CryptoOutputStream extends FilterOutputStream 
implements
       return;
     }
     try {
-      super.close();
+      flush();
+      if (closeOutputStream) {
+        super.close();
+      }
       freeBuffers();
     } finally {
       closed = true;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/806a8dea/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/crypto/CryptoFSDataOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/crypto/CryptoFSDataOutputStream.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/crypto/CryptoFSDataOutputStream.java
index 3beb361..6450bc3 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/crypto/CryptoFSDataOutputStream.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/crypto/CryptoFSDataOutputStream.java
@@ -28,8 +28,14 @@ public class CryptoFSDataOutputStream extends 
FSDataOutputStream {
   
   public CryptoFSDataOutputStream(FSDataOutputStream out, CryptoCodec codec,
       int bufferSize, byte[] key, byte[] iv) throws IOException {
+    this(out, codec, bufferSize, key, iv, true);
+  }
+
+  public CryptoFSDataOutputStream(FSDataOutputStream out, CryptoCodec codec,
+      int bufferSize, byte[] key, byte[] iv, boolean closeOutputStream)
+      throws IOException {
     super(new CryptoOutputStream(out, codec, bufferSize, key, iv, 
-        out.getPos()), null, out.getPos()); 
+        out.getPos(), closeOutputStream), null, out.getPos());
     this.fsOut = out;
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/806a8dea/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoOutputStreamClosing.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoOutputStreamClosing.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoOutputStreamClosing.java
new file mode 100644
index 0000000..39e4bb8
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoOutputStreamClosing.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto;
+
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.mockito.Mockito.*;
+
+/**
+ * To test proper closing of underlying stream of CryptoOutputStream.
+ */
+public class TestCryptoOutputStreamClosing {
+  private static CryptoCodec codec;
+
+  @BeforeClass
+  public static void init() throws Exception {
+    codec = CryptoCodec.getInstance(new Configuration());
+  }
+
+  @Test
+  public void testOutputStreamClosing() throws Exception {
+    OutputStream outputStream = mock(OutputStream.class);
+    CryptoOutputStream cos = new CryptoOutputStream(outputStream, codec,
+        new byte[16], new byte[16], 0L, true);
+    cos.close();
+    verify(outputStream).close();
+  }
+
+  @Test
+  public void testOutputStreamNotClosing() throws Exception {
+    OutputStream outputStream = mock(OutputStream.class);
+    CryptoOutputStream cos = new CryptoOutputStream(outputStream, codec,
+        new byte[16], new byte[16], 0L, false);
+    cos.close();
+    verify(outputStream, never()).close();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/806a8dea/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
index c4957b7..82b8254 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
@@ -1587,6 +1587,7 @@ public class MapTask extends Task {
       final long size = distanceTo(bufstart, bufend, bufvoid) +
                   partitions * APPROX_HEADER_LENGTH;
       FSDataOutputStream out = null;
+      FSDataOutputStream partitionOut = null;
       try {
         // create spill file
         final SpillRecord spillRec = new SpillRecord(partitions);
@@ -1607,7 +1608,7 @@ public class MapTask extends Task {
           IFile.Writer<K, V> writer = null;
           try {
             long segmentStart = out.getPos();
-            FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, 
out);
+            partitionOut = CryptoUtils.wrapIfNecessary(job, out, false);
             writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, 
codec,
                                       spilledRecordsCounter);
             if (combinerRunner == null) {
@@ -1642,6 +1643,10 @@ public class MapTask extends Task {
 
             // close the writer
             writer.close();
+            if (partitionOut != out) {
+              partitionOut.close();
+              partitionOut = null;
+            }
 
             // record offsets
             rec.startOffset = segmentStart;
@@ -1670,6 +1675,9 @@ public class MapTask extends Task {
         ++numSpills;
       } finally {
         if (out != null) out.close();
+        if (partitionOut != null) {
+          partitionOut.close();
+        }
       }
     }
 
@@ -1682,6 +1690,7 @@ public class MapTask extends Task {
                                    int partition) throws IOException {
       long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
       FSDataOutputStream out = null;
+      FSDataOutputStream partitionOut = null;
       try {
         // create spill file
         final SpillRecord spillRec = new SpillRecord(partitions);
@@ -1696,7 +1705,7 @@ public class MapTask extends Task {
           try {
             long segmentStart = out.getPos();
             // Create a new codec, don't care!
-            FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, 
out);
+            partitionOut = CryptoUtils.wrapIfNecessary(job, out, false);
             writer = new IFile.Writer<K,V>(job, partitionOut, keyClass, 
valClass, codec,
                                             spilledRecordsCounter);
 
@@ -1708,6 +1717,10 @@ public class MapTask extends Task {
               mapOutputByteCounter.increment(out.getPos() - recordStart);
             }
             writer.close();
+            if (partitionOut != out) {
+              partitionOut.close();
+              partitionOut = null;
+            }
 
             // record offsets
             rec.startOffset = segmentStart;
@@ -1735,6 +1748,9 @@ public class MapTask extends Task {
         ++numSpills;
       } finally {
         if (out != null) out.close();
+        if (partitionOut != null) {
+          partitionOut.close();
+        }
       }
     }
 
@@ -1846,6 +1862,7 @@ public class MapTask extends Task {
 
       //The output stream for the final single output file
       FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
+      FSDataOutputStream finalPartitionOut = null;
 
       if (numSpills == 0) {
         //create dummy files
@@ -1854,10 +1871,15 @@ public class MapTask extends Task {
         try {
           for (int i = 0; i < partitions; i++) {
             long segmentStart = finalOut.getPos();
-            FSDataOutputStream finalPartitionOut = 
CryptoUtils.wrapIfNecessary(job, finalOut);
+            finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut,
+                false);
             Writer<K, V> writer =
               new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, 
codec, null);
             writer.close();
+            if (finalPartitionOut != finalOut) {
+              finalPartitionOut.close();
+              finalPartitionOut = null;
+            }
             rec.startOffset = segmentStart;
             rec.rawLength = writer.getRawLength() + 
CryptoUtils.cryptoPadding(job);
             rec.partLength = writer.getCompressedLength() + 
CryptoUtils.cryptoPadding(job);
@@ -1866,6 +1888,9 @@ public class MapTask extends Task {
           sr.writeToFile(finalIndexFile, job);
         } finally {
           finalOut.close();
+          if (finalPartitionOut != null) {
+            finalPartitionOut.close();
+          }
         }
         sortPhase.complete();
         return;
@@ -1909,7 +1934,7 @@ public class MapTask extends Task {
 
           //write merged output to disk
           long segmentStart = finalOut.getPos();
-          FSDataOutputStream finalPartitionOut = 
CryptoUtils.wrapIfNecessary(job, finalOut);
+          finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut, 
false);
           Writer<K, V> writer =
               new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, 
codec,
                                spilledRecordsCounter);
@@ -1922,6 +1947,10 @@ public class MapTask extends Task {
 
           //close
           writer.close();
+          if (finalPartitionOut != finalOut) {
+            finalPartitionOut.close();
+            finalPartitionOut = null;
+          }
 
           sortPhase.startNextPhase();
           
@@ -1933,6 +1962,9 @@ public class MapTask extends Task {
         }
         spillRec.writeToFile(finalIndexFile, job);
         finalOut.close();
+        if (finalPartitionOut != null) {
+          finalPartitionOut.close();
+        }
         for(int i = 0; i < numSpills; i++) {
           rfs.delete(filename[i],true);
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/806a8dea/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java
index c4130b1..c05b6b0 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/CryptoUtils.java
@@ -57,9 +57,9 @@ public class CryptoUtils {
   /**
    * This method creates and initializes an IV (Initialization Vector)
    * 
-   * @param conf
-   * @return byte[]
-   * @throws IOException
+   * @param conf configuration
+   * @return byte[] initialization vector
+   * @throws IOException exception in case of error
    */
   public static byte[] createIV(Configuration conf) throws IOException {
     CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf);
@@ -94,13 +94,33 @@ public class CryptoUtils {
    * "mapreduce.job.encrypted-intermediate-data.buffer.kb" Job configuration
    * variable.
    * 
-   * @param conf
-   * @param out
-   * @return FSDataOutputStream
-   * @throws IOException
+   * @param conf configuration
+   * @param out given output stream
+   * @return FSDataOutputStream encrypted output stream if encryption is
+   *         enabled; otherwise the given output stream itself
+   * @throws IOException exception in case of error
    */
   public static FSDataOutputStream wrapIfNecessary(Configuration conf,
       FSDataOutputStream out) throws IOException {
+    return wrapIfNecessary(conf, out, true);
+  }
+
+  /**
+   * Wraps a given FSDataOutputStream with a CryptoOutputStream. The size of 
the
+   * data buffer required for the stream is specified by the
+   * "mapreduce.job.encrypted-intermediate-data.buffer.kb" Job configuration
+   * variable.
+   *
+   * @param conf configuration
+   * @param out given output stream
+   * @param closeOutputStream flag to indicate whether closing the wrapped
+   *        stream will close the given output stream
+   * @return FSDataOutputStream encrypted output stream if encryption is
+   *         enabled; otherwise the given output stream itself
+   * @throws IOException exception in case of error
+   */
+  public static FSDataOutputStream wrapIfNecessary(Configuration conf,
+      FSDataOutputStream out, boolean closeOutputStream) throws IOException {
     if (isEncryptedSpillEnabled(conf)) {
       out.write(ByteBuffer.allocate(8).putLong(out.getPos()).array());
       byte[] iv = createIV(conf);
@@ -110,7 +130,7 @@ public class CryptoUtils {
             + Base64.encodeBase64URLSafeString(iv) + "]");
       }
       return new CryptoFSDataOutputStream(out, CryptoCodec.getInstance(conf),
-          getBufferSize(conf), getEncryptionKey(), iv);
+          getBufferSize(conf), getEncryptionKey(), iv, closeOutputStream);
     } else {
       return out;
     }
@@ -128,11 +148,12 @@ public class CryptoUtils {
    * LimitInputStream will ensure that the CryptoStream does not read past the
    * provided length from the given Input Stream.
    * 
-   * @param conf
-   * @param in
-   * @param length
-   * @return InputStream
-   * @throws IOException
+   * @param conf configuration
+   * @param in given input stream
+   * @param length maximum number of bytes to read from the input stream
+   * @return InputStream encrypted input stream if encryption is
+   *         enabled; otherwise the given input stream itself
+   * @throws IOException exception in case of error
    */
   public static InputStream wrapIfNecessary(Configuration conf, InputStream in,
       long length) throws IOException {
@@ -166,10 +187,11 @@ public class CryptoUtils {
    * "mapreduce.job.encrypted-intermediate-data.buffer.kb" Job configuration
    * variable.
    * 
-   * @param conf
-   * @param in
-   * @return FSDataInputStream
-   * @throws IOException
+   * @param conf configuration
+   * @param in given input stream
+   * @return FSDataInputStream encrypted input stream if encryption is
+   *         enabled; otherwise the given input stream itself
+   * @throws IOException exception in case of error
    */
   public static FSDataInputStream wrapIfNecessary(Configuration conf,
       FSDataInputStream in) throws IOException {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to