This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 46beaa0  Fix potential resource leak in ParquetReader (#9852)
46beaa0 is described below

commit 46beaa06408f814936ce5171bbec25887d7aec94
Author: Jihoon Son <[email protected]>
AuthorDate: Sat May 16 09:57:12 2020 -0700

    Fix potential resource leak in ParquetReader (#9852)
    
    * Fix potential resource leak in ParquetReader
    
    * add test
    
    * never thrown exception
    
    * catch potential exceptions
---
 .../org/apache/druid/data/input/InputFormat.java   |   3 +-
 .../data/input/parquet/ParquetInputFormat.java     |   3 +-
 .../druid/data/input/parquet/ParquetReader.java    |  35 +++---
 .../data/input/parquet/BaseParquetReaderTest.java  |   4 +-
 .../parquet/ParquetReaderResourceLeakTest.java     | 120 +++++++++++++++++++++
 .../seekablestream/SettableByteEntityReader.java   |   2 +-
 6 files changed, 148 insertions(+), 19 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/data/input/InputFormat.java 
b/core/src/main/java/org/apache/druid/data/input/InputFormat.java
index a3a5dd2..485a76b 100644
--- a/core/src/main/java/org/apache/druid/data/input/InputFormat.java
+++ b/core/src/main/java/org/apache/druid/data/input/InputFormat.java
@@ -32,7 +32,6 @@ import org.apache.druid.data.input.impl.SplittableInputSource;
 import org.apache.druid.guice.annotations.UnstableApi;
 
 import java.io.File;
-import java.io.IOException;
 
 /**
  * InputFormat abstracts the file format of input data.
@@ -64,5 +63,5 @@ public interface InputFormat
       InputRowSchema inputRowSchema,
       InputEntity source,
       File temporaryDirectory
-  ) throws IOException;
+  );
 }
diff --git 
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java
 
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java
index c49581a..1de4c55 100644
--- 
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java
+++ 
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.conf.Configuration;
 
 import javax.annotation.Nullable;
 import java.io.File;
-import java.io.IOException;
 import java.util.Objects;
 
 public class ParquetInputFormat extends NestedInputFormat
@@ -69,7 +68,7 @@ public class ParquetInputFormat extends NestedInputFormat
       InputRowSchema inputRowSchema,
       InputEntity source,
       File temporaryDirectory
-  ) throws IOException
+  )
   {
     return new ParquetReader(conf, inputRowSchema, source, temporaryDirectory, 
getFlattenSpec(), binaryAsString);
   }
diff --git 
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
 
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
index 2c76253..1bdd011 100644
--- 
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
+++ 
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
@@ -20,6 +20,7 @@
 package org.apache.druid.data.input.parquet;
 
 import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputEntity.CleanableFile;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.IntermediateRowParsingReader;
@@ -45,12 +46,12 @@ import java.util.NoSuchElementException;
 
 public class ParquetReader extends IntermediateRowParsingReader<Group>
 {
+  private final Configuration conf;
   private final InputRowSchema inputRowSchema;
+  private final InputEntity source;
+  private final File temporaryDirectory;
   private final ObjectFlattener<Group> flattener;
 
-  private final org.apache.parquet.hadoop.ParquetReader<Group> reader;
-  private final Closer closer;
-
   ParquetReader(
       Configuration conf,
       InputRowSchema inputRowSchema,
@@ -58,31 +59,41 @@ public class ParquetReader extends 
IntermediateRowParsingReader<Group>
       File temporaryDirectory,
       JSONPathSpec flattenSpec,
       boolean binaryAsString
-  ) throws IOException
+  )
   {
+    this.conf = conf;
     this.inputRowSchema = inputRowSchema;
+    this.source = source;
+    this.temporaryDirectory = temporaryDirectory;
     this.flattener = ObjectFlatteners.create(flattenSpec, new 
ParquetGroupFlattenerMaker(binaryAsString));
+  }
 
-    closer = Closer.create();
+  @Override
+  protected CloseableIterator<Group> intermediateRowIterator() throws 
IOException
+  {
+    final Closer closer = Closer.create();
     byte[] buffer = new byte[InputEntity.DEFAULT_FETCH_BUFFER_SIZE];
-    final InputEntity.CleanableFile file = 
closer.register(source.fetch(temporaryDirectory, buffer));
-    final Path path = new Path(file.file().toURI());
-
     final ClassLoader currentClassLoader = 
Thread.currentThread().getContextClassLoader();
+    final org.apache.parquet.hadoop.ParquetReader<Group> reader;
     try {
+      final CleanableFile file = 
closer.register(source.fetch(temporaryDirectory, buffer));
+      final Path path = new Path(file.file().toURI());
+
       
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
       reader = 
closer.register(org.apache.parquet.hadoop.ParquetReader.builder(new 
GroupReadSupport(), path)
                                                                       
.withConf(conf)
                                                                       
.build());
     }
+    catch (Exception e) {
+      // We don't expect to see any exceptions thrown in the above try clause,
+      // but we catch it just in case to avoid any potential resource leak.
+      closer.close();
+      throw new RuntimeException(e);
+    }
     finally {
       Thread.currentThread().setContextClassLoader(currentClassLoader);
     }
-  }
 
-  @Override
-  protected CloseableIterator<Group> intermediateRowIterator()
-  {
     return new CloseableIterator<Group>()
     {
       Group value = null;
diff --git 
a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/BaseParquetReaderTest.java
 
b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/BaseParquetReaderTest.java
index 87f9229..b96cb7b 100644
--- 
a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/BaseParquetReaderTest.java
+++ 
b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/BaseParquetReaderTest.java
@@ -39,7 +39,7 @@ class BaseParquetReaderTest
 {
   ObjectWriter DEFAULT_JSON_WRITER = new 
ObjectMapper().writerWithDefaultPrettyPrinter();
 
-  InputEntityReader createReader(String parquetFile, InputRowSchema schema, 
JSONPathSpec flattenSpec) throws IOException
+  InputEntityReader createReader(String parquetFile, InputRowSchema schema, 
JSONPathSpec flattenSpec)
   {
     return createReader(parquetFile, schema, flattenSpec, false);
   }
@@ -49,7 +49,7 @@ class BaseParquetReaderTest
       InputRowSchema schema,
       JSONPathSpec flattenSpec,
       boolean binaryAsString
-  ) throws IOException
+  )
   {
     FileEntity entity = new FileEntity(new File(parquetFile));
     ParquetInputFormat parquet = new ParquetInputFormat(flattenSpec, 
binaryAsString, new Configuration());
diff --git 
a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/ParquetReaderResourceLeakTest.java
 
b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/ParquetReaderResourceLeakTest.java
new file mode 100644
index 0000000..251fa34
--- /dev/null
+++ 
b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/ParquetReaderResourceLeakTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.parquet;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.FileEntity;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.Objects;
+
+public class ParquetReaderResourceLeakTest extends BaseParquetReaderTest
+{
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Test
+  public void testFetchOnReadCleanupAfterExhaustingIterator() throws 
IOException
+  {
+    InputRowSchema schema = new InputRowSchema(
+        new TimestampSpec("timestamp", "iso", null),
+        new DimensionsSpec(
+            DimensionsSpec.getDefaultSchemas(ImmutableList.of("page", 
"language", "user", "unpatrolled"))
+        ),
+        Collections.emptyList()
+    );
+    FetchingFileEntity entity = new FetchingFileEntity(new 
File("example/wiki/wiki.parquet"));
+    ParquetInputFormat parquet = new ParquetInputFormat(JSONPathSpec.DEFAULT, 
false, new Configuration());
+    File tempDir = temporaryFolder.newFolder();
+    InputEntityReader reader = parquet.createReader(schema, entity, tempDir);
+    Assert.assertEquals(0, Objects.requireNonNull(tempDir.list()).length);
+    try (CloseableIterator<InputRow> iterator = reader.read()) {
+      Assert.assertTrue(Objects.requireNonNull(tempDir.list()).length > 0);
+      while (iterator.hasNext()) {
+        iterator.next();
+      }
+    }
+    Assert.assertEquals(0, Objects.requireNonNull(tempDir.list()).length);
+  }
+
+  private static class FetchingFileEntity extends FileEntity
+  {
+    private FetchingFileEntity(File file)
+    {
+      super(file);
+    }
+
+    @Override
+    public CleanableFile fetch(File temporaryDirectory, byte[] fetchBuffer)
+    {
+      // Copied from InputEntity
+      try {
+        final File tempFile = File.createTempFile("druid-input-entity", 
".tmp", temporaryDirectory);
+        try (InputStream is = open()) {
+          FileUtils.copyLarge(
+              is,
+              tempFile,
+              fetchBuffer,
+              getRetryCondition(),
+              DEFAULT_MAX_NUM_FETCH_TRIES,
+              StringUtils.format("Failed to fetch into [%s]", 
tempFile.getAbsolutePath())
+          );
+        }
+
+        return new CleanableFile()
+        {
+          @Override
+          public File file()
+          {
+            return tempFile;
+          }
+
+          @Override
+          public void close()
+          {
+            if (!tempFile.delete()) {
+              LOG.warn("Failed to remove file[%s]", 
tempFile.getAbsolutePath());
+            }
+          }
+        };
+      }
+      catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntityReader.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntityReader.java
index e34ee67..25d752d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntityReader.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntityReader.java
@@ -60,7 +60,7 @@ class SettableByteEntityReader implements InputEntityReader
     this.indexingTmpDir = indexingTmpDir;
   }
 
-  void setEntity(ByteEntity entity) throws IOException
+  void setEntity(ByteEntity entity)
   {
     this.delegate = new TransformingInputEntityReader(
         // Yes, we are creating a new reader for every stream chunk.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to