This is an automated email from the ASF dual-hosted git repository.
amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new d3cb1b696a Core: Fix ParallelIterable memory leak where queue
continues to be populated even after iterator close (#9402)
d3cb1b696a is described below
commit d3cb1b696a1631a9cca4619f93252cd0a985fbfc
Author: Helt <[email protected]>
AuthorDate: Tue Jul 2 05:33:17 2024 +0800
Core: Fix ParallelIterable memory leak where queue continues to be
populated even after iterator close (#9402)
---
.../org/apache/iceberg/util/ParallelIterable.java | 6 +++
.../apache/iceberg/util/TestParallelIterable.java | 61 ++++++++++++++++++++++
2 files changed, 67 insertions(+)
diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
index 108757b415..d7221e7d45 100644
--- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
+++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
@@ -67,6 +67,12 @@ public class ParallelIterable<T> extends CloseableGroup
implements CloseableIter
try (Closeable ignored =
(iterable instanceof Closeable) ? (Closeable)
iterable : () -> {}) {
for (T item : iterable) {
+ // exit manually because
`ConcurrentLinkedQueue` can't be
+ // interrupted
+ if (closed) {
+ return;
+ }
+
queue.add(item);
}
} catch (IOException e) {
diff --git
a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
index 68685614d3..af9c6ec521 100644
--- a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
+++ b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Collections;
+import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
@@ -72,6 +73,66 @@ public class TestParallelIterable {
.untilAsserted(() -> assertThat(queue).isEmpty());
}
+ @Test
+ public void closeMoreDataParallelIteratorWithoutCompleteIteration()
+ throws IOException, IllegalAccessException, NoSuchFieldException {
+ ExecutorService executor = Executors.newFixedThreadPool(1);
+ Iterator<Integer> integerIterator =
+ new Iterator<Integer>() {
+ private int number = 1;
+
+ @Override
+ public boolean hasNext() {
+ if (number > 1000) {
+ return false;
+ }
+
+ number++;
+ return true;
+ }
+
+ @Override
+ public Integer next() {
+ try {
+ // sleep to control number generate rate
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ // Sleep interrupted, we ignore it!
+ }
+ return number;
+ }
+ };
+ Iterable<CloseableIterable<Integer>> transform =
+ Iterables.transform(
+ Lists.newArrayList(1),
+ item ->
+ new CloseableIterable<Integer>() {
+ @Override
+ public void close() {}
+
+ @Override
+ public CloseableIterator<Integer> iterator() {
+ return CloseableIterator.withClose(integerIterator);
+ }
+ });
+
+ ParallelIterable<Integer> parallelIterable = new
ParallelIterable<>(transform, executor);
+ CloseableIterator<Integer> iterator = parallelIterable.iterator();
+ Field queueField = iterator.getClass().getDeclaredField("queue");
+ queueField.setAccessible(true);
+ ConcurrentLinkedQueue<?> queue = (ConcurrentLinkedQueue<?>)
queueField.get(iterator);
+
+ assertThat(iterator.hasNext()).isTrue();
+ assertThat(iterator.next()).isNotNull();
+ Awaitility.await("Queue is populated")
+ .atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(() -> queueHasElements(iterator, queue));
+ iterator.close();
+ Awaitility.await("Queue is cleared")
+ .atMost(5, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertThat(queue).as("Queue is not empty after
cleaning").isEmpty());
+ }
+
private void queueHasElements(CloseableIterator<Integer> iterator, Queue
queue) {
assertThat(iterator.hasNext()).isTrue();
assertThat(iterator.next()).isNotNull();