This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 46beaa0 Fix potential resource leak in ParquetReader (#9852)
46beaa0 is described below
commit 46beaa06408f814936ce5171bbec25887d7aec94
Author: Jihoon Son <[email protected]>
AuthorDate: Sat May 16 09:57:12 2020 -0700
Fix potential resource leak in ParquetReader (#9852)
* Fix potential resource leak in ParquetReader
* add test
* never thrown exception
* catch potential exceptions
---
.../org/apache/druid/data/input/InputFormat.java | 3 +-
.../data/input/parquet/ParquetInputFormat.java | 3 +-
.../druid/data/input/parquet/ParquetReader.java | 35 +++---
.../data/input/parquet/BaseParquetReaderTest.java | 4 +-
.../parquet/ParquetReaderResourceLeakTest.java | 120 +++++++++++++++++++++
.../seekablestream/SettableByteEntityReader.java | 2 +-
6 files changed, 148 insertions(+), 19 deletions(-)
diff --git a/core/src/main/java/org/apache/druid/data/input/InputFormat.java
b/core/src/main/java/org/apache/druid/data/input/InputFormat.java
index a3a5dd2..485a76b 100644
--- a/core/src/main/java/org/apache/druid/data/input/InputFormat.java
+++ b/core/src/main/java/org/apache/druid/data/input/InputFormat.java
@@ -32,7 +32,6 @@ import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.guice.annotations.UnstableApi;
import java.io.File;
-import java.io.IOException;
/**
* InputFormat abstracts the file format of input data.
@@ -64,5 +63,5 @@ public interface InputFormat
InputRowSchema inputRowSchema,
InputEntity source,
File temporaryDirectory
- ) throws IOException;
+ );
}
diff --git
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java
index c49581a..1de4c55 100644
---
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java
+++
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetInputFormat.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.conf.Configuration;
import javax.annotation.Nullable;
import java.io.File;
-import java.io.IOException;
import java.util.Objects;
public class ParquetInputFormat extends NestedInputFormat
@@ -69,7 +68,7 @@ public class ParquetInputFormat extends NestedInputFormat
InputRowSchema inputRowSchema,
InputEntity source,
File temporaryDirectory
- ) throws IOException
+ )
{
return new ParquetReader(conf, inputRowSchema, source, temporaryDirectory,
getFlattenSpec(), binaryAsString);
}
diff --git
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
index 2c76253..1bdd011 100644
---
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
+++
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
@@ -20,6 +20,7 @@
package org.apache.druid.data.input.parquet;
import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputEntity.CleanableFile;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.IntermediateRowParsingReader;
@@ -45,12 +46,12 @@ import java.util.NoSuchElementException;
public class ParquetReader extends IntermediateRowParsingReader<Group>
{
+ private final Configuration conf;
private final InputRowSchema inputRowSchema;
+ private final InputEntity source;
+ private final File temporaryDirectory;
private final ObjectFlattener<Group> flattener;
- private final org.apache.parquet.hadoop.ParquetReader<Group> reader;
- private final Closer closer;
-
ParquetReader(
Configuration conf,
InputRowSchema inputRowSchema,
@@ -58,31 +59,41 @@ public class ParquetReader extends
IntermediateRowParsingReader<Group>
File temporaryDirectory,
JSONPathSpec flattenSpec,
boolean binaryAsString
- ) throws IOException
+ )
{
+ this.conf = conf;
this.inputRowSchema = inputRowSchema;
+ this.source = source;
+ this.temporaryDirectory = temporaryDirectory;
this.flattener = ObjectFlatteners.create(flattenSpec, new
ParquetGroupFlattenerMaker(binaryAsString));
+ }
- closer = Closer.create();
+ @Override
+ protected CloseableIterator<Group> intermediateRowIterator() throws
IOException
+ {
+ final Closer closer = Closer.create();
byte[] buffer = new byte[InputEntity.DEFAULT_FETCH_BUFFER_SIZE];
- final InputEntity.CleanableFile file =
closer.register(source.fetch(temporaryDirectory, buffer));
- final Path path = new Path(file.file().toURI());
-
final ClassLoader currentClassLoader =
Thread.currentThread().getContextClassLoader();
+ final org.apache.parquet.hadoop.ParquetReader<Group> reader;
try {
+ final CleanableFile file =
closer.register(source.fetch(temporaryDirectory, buffer));
+ final Path path = new Path(file.file().toURI());
+
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
reader =
closer.register(org.apache.parquet.hadoop.ParquetReader.builder(new
GroupReadSupport(), path)
.withConf(conf)
.build());
}
+ catch (Exception e) {
+ // We don't expect to see any exceptions thrown in the above try clause,
+ // but we catch it just in case to avoid any potential resource leak.
+ closer.close();
+ throw new RuntimeException(e);
+ }
finally {
Thread.currentThread().setContextClassLoader(currentClassLoader);
}
- }
- @Override
- protected CloseableIterator<Group> intermediateRowIterator()
- {
return new CloseableIterator<Group>()
{
Group value = null;
diff --git
a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/BaseParquetReaderTest.java
b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/BaseParquetReaderTest.java
index 87f9229..b96cb7b 100644
---
a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/BaseParquetReaderTest.java
+++
b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/BaseParquetReaderTest.java
@@ -39,7 +39,7 @@ class BaseParquetReaderTest
{
ObjectWriter DEFAULT_JSON_WRITER = new
ObjectMapper().writerWithDefaultPrettyPrinter();
- InputEntityReader createReader(String parquetFile, InputRowSchema schema,
JSONPathSpec flattenSpec) throws IOException
+ InputEntityReader createReader(String parquetFile, InputRowSchema schema,
JSONPathSpec flattenSpec)
{
return createReader(parquetFile, schema, flattenSpec, false);
}
@@ -49,7 +49,7 @@ class BaseParquetReaderTest
InputRowSchema schema,
JSONPathSpec flattenSpec,
boolean binaryAsString
- ) throws IOException
+ )
{
FileEntity entity = new FileEntity(new File(parquetFile));
ParquetInputFormat parquet = new ParquetInputFormat(flattenSpec,
binaryAsString, new Configuration());
diff --git
a/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/ParquetReaderResourceLeakTest.java
b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/ParquetReaderResourceLeakTest.java
new file mode 100644
index 0000000..251fa34
--- /dev/null
+++
b/extensions-core/parquet-extensions/src/test/java/org/apache/druid/data/input/parquet/ParquetReaderResourceLeakTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input.parquet;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.druid.data.input.InputEntityReader;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.FileEntity;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.Objects;
+
+public class ParquetReaderResourceLeakTest extends BaseParquetReaderTest
+{
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Test
+ public void testFetchOnReadCleanupAfterExhaustingIterator() throws
IOException
+ {
+ InputRowSchema schema = new InputRowSchema(
+ new TimestampSpec("timestamp", "iso", null),
+ new DimensionsSpec(
+ DimensionsSpec.getDefaultSchemas(ImmutableList.of("page",
"language", "user", "unpatrolled"))
+ ),
+ Collections.emptyList()
+ );
+ FetchingFileEntity entity = new FetchingFileEntity(new
File("example/wiki/wiki.parquet"));
+ ParquetInputFormat parquet = new ParquetInputFormat(JSONPathSpec.DEFAULT,
false, new Configuration());
+ File tempDir = temporaryFolder.newFolder();
+ InputEntityReader reader = parquet.createReader(schema, entity, tempDir);
+ Assert.assertEquals(0, Objects.requireNonNull(tempDir.list()).length);
+ try (CloseableIterator<InputRow> iterator = reader.read()) {
+ Assert.assertTrue(Objects.requireNonNull(tempDir.list()).length > 0);
+ while (iterator.hasNext()) {
+ iterator.next();
+ }
+ }
+ Assert.assertEquals(0, Objects.requireNonNull(tempDir.list()).length);
+ }
+
+ private static class FetchingFileEntity extends FileEntity
+ {
+ private FetchingFileEntity(File file)
+ {
+ super(file);
+ }
+
+ @Override
+ public CleanableFile fetch(File temporaryDirectory, byte[] fetchBuffer)
+ {
+ // Copied from InputEntity
+ try {
+ final File tempFile = File.createTempFile("druid-input-entity",
".tmp", temporaryDirectory);
+ try (InputStream is = open()) {
+ FileUtils.copyLarge(
+ is,
+ tempFile,
+ fetchBuffer,
+ getRetryCondition(),
+ DEFAULT_MAX_NUM_FETCH_TRIES,
+ StringUtils.format("Failed to fetch into [%s]",
tempFile.getAbsolutePath())
+ );
+ }
+
+ return new CleanableFile()
+ {
+ @Override
+ public File file()
+ {
+ return tempFile;
+ }
+
+ @Override
+ public void close()
+ {
+ if (!tempFile.delete()) {
+ LOG.warn("Failed to remove file[%s]",
tempFile.getAbsolutePath());
+ }
+ }
+ };
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntityReader.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntityReader.java
index e34ee67..25d752d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntityReader.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SettableByteEntityReader.java
@@ -60,7 +60,7 @@ class SettableByteEntityReader implements InputEntityReader
this.indexingTmpDir = indexingTmpDir;
}
- void setEntity(ByteEntity entity) throws IOException
+ void setEntity(ByteEntity entity)
{
this.delegate = new TransformingInputEntityReader(
// Yes, we are creating a new reader for every stream chunk.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]