This is an automated email from the ASF dual-hosted git repository.

dzamo pushed a commit to branch 1.20
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 85aa70c2b3629291e0e80a3eae70df03c00a1192
Author: James Turton <[email protected]>
AuthorDate: Sat Oct 15 05:02:49 2022 +0800

    DRILL-8333: Resource leak when JsonLoader is built from a stream (#2678)
---
 .../store/kafka/decoders/JsonMessageReader.java    | 25 +---------
 .../easy/json/loader/ClosingStreamIterator.java    | 54 ++++++++++++++++++++++
 .../store/easy/json/loader/JsonLoaderImpl.java     | 10 ++++
 3 files changed, 66 insertions(+), 23 deletions(-)

diff --git 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
index a9aee5a990..de9ce644a6 100644
--- 
a/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
+++ 
b/contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.physical.resultSet.RowSetLoader;
 import org.apache.drill.exec.record.metadata.ColumnMetadata;
 import org.apache.drill.exec.record.metadata.MetadataUtils;
 import org.apache.drill.exec.store.easy.json.loader.JsonLoaderOptions;
+import org.apache.drill.exec.store.easy.json.loader.ClosingStreamIterator;
 import org.apache.drill.exec.store.easy.json.parser.TokenIterator;
 import org.apache.drill.exec.store.kafka.KafkaStoragePlugin;
 import org.apache.drill.exec.store.kafka.MetaDataField;
@@ -37,8 +38,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.util.Iterator;
 import java.util.Properties;
 import java.util.StringJoiner;
 
@@ -50,7 +49,7 @@ public class JsonMessageReader implements MessageReader {
 
   private static final Logger logger = 
LoggerFactory.getLogger(JsonMessageReader.class);
 
-  private final SingleElementIterator<InputStream> stream = new 
SingleElementIterator<>();
+  private final ClosingStreamIterator stream = new ClosingStreamIterator();
 
   private KafkaJsonLoader kafkaJsonLoader;
   private ResultSetLoader resultSetLoader;
@@ -156,24 +155,4 @@ public class JsonMessageReader implements MessageReader {
         .add("resultSetLoader=" + resultSetLoader)
         .toString();
   }
-
-  public static class SingleElementIterator<T> implements Iterator<T> {
-    private T value;
-
-    @Override
-    public boolean hasNext() {
-      return value != null;
-    }
-
-    @Override
-    public T next() {
-      T value = this.value;
-      this.value = null;
-      return value;
-    }
-
-    public void setValue(T value) {
-      this.value = value;
-    }
-  }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/ClosingStreamIterator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/ClosingStreamIterator.java
new file mode 100644
index 0000000000..0b67049e57
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/ClosingStreamIterator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.easy.json.loader;
+
+import java.io.InputStream;
+
+import org.apache.drill.common.AutoCloseables;
+
+import java.util.Iterator;
+
+/**
+ * It allows setting the current value in the iterator and can be used once 
after {@link #next} call
+ *
+ * @param <T> type of the value
+ */
+public class ClosingStreamIterator implements Iterator<InputStream> {
+    private InputStream value, last;
+
+    @Override
+    public boolean hasNext() {
+      if (value == null) {
+        AutoCloseables.closeSilently(last);
+        return false;
+      }
+      return true;
+    }
+
+    @Override
+    public InputStream next() {
+      this.last = this.value;
+      this.value = null;
+      return this.last;
+    }
+
+    public void setValue(InputStream value) {
+      AutoCloseables.closeSilently(this.value);
+      this.value = value;
+    }
+  }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
index edc687f4aa..2886815db8 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.EmptyErrorContext;
 import org.apache.drill.common.exceptions.UserException;
@@ -232,6 +233,7 @@ public class JsonLoaderImpl implements JsonLoader, 
ErrorFactory {
   private final JsonStructureParser parser;
   private final FieldFactory fieldFactory;
   private final ImplicitColumns implicitFields;
+  private final Iterable<InputStream> streams;
   private final int maxRows;
   private boolean eof;
 
@@ -254,6 +256,7 @@ public class JsonLoaderImpl implements JsonLoader, 
ErrorFactory {
     this.implicitFields = builder.implicitFields;
     this.maxRows = builder.maxRows;
     this.fieldFactory = buildFieldFactory(builder);
+    this.streams = builder.streams;
     this.parser = buildParser(builder);
   }
 
@@ -354,6 +357,13 @@ public class JsonLoaderImpl implements JsonLoader, 
ErrorFactory {
   @Override // JsonLoader
   public void close() {
     parser.close();
+    for (InputStream stream: streams) {
+      try {
+        AutoCloseables.close(stream);
+      } catch (Exception ex) {
+        logger.warn("Failed to close an input stream, a system resource leak 
may ensue.", ex);
+      }
+    }
   }
 
   @Override // ErrorFactory

Reply via email to