sopel39 commented on code in PR #11781:
URL: https://github.com/apache/iceberg/pull/11781#discussion_r1890160296
##########
core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java:
##########
@@ -171,38 +171,55 @@ public void limitQueueSize() {
}
@Test
- public void queueSizeOne() {
Review Comment:
The test is no longer viable as queue size is no longer a hard limit. The
queue size will be at min the amount of items produced by single
`org.apache.iceberg.util.ParallelIterable.Task`
##########
core/src/main/java/org/apache/iceberg/util/ParallelIterable.java:
##########
@@ -153,6 +155,7 @@ private synchronized boolean checkTasks() {
try {
Optional<Task<T>> continuation = taskFutures[i].get();
continuation.ifPresent(yieldedTasks::addLast);
+ taskFutures[i] = null;
Review Comment:
This is because of the
```
// submit a new task if there is space in the queue
if (queue.size() < maxQueueSize) {
```
below. The queue might be full, hence `taskFutures[i]` should remain empty
as it was already added to `yieldedTasks`
##########
core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java:
##########
@@ -148,17 +150,15 @@ public void limitQueueSize() {
.collect(ImmutableMultiset.toImmutableMultiset());
int maxQueueSize = 20;
- ExecutorService executor = Executors.newCachedThreadPool();
+ ExecutorService executor = Executors.newSingleThreadExecutor();
Review Comment:
`ParallelIterable` will still spawn `2 *
ThreadPools.WORKER_THREAD_POOL_SIZE` tasks. These tasks won't be immediately
picked up. When they are started, they are guaranteed to finish. This means
that queue size will be temporary higher than `maxQueueSize`. In order to
properly test the limits, we need to reduce executor size (as it will be in
practice) to constraint number of concurrently running tasks.
##########
core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java:
##########
@@ -171,38 +171,55 @@ public void limitQueueSize() {
}
@Test
- public void queueSizeOne() {
- List<Iterable<Integer>> iterables =
- ImmutableList.of(
- () -> IntStream.range(0, 100).iterator(),
- () -> IntStream.range(0, 100).iterator(),
- () -> IntStream.range(0, 100).iterator());
+ @Timeout(10)
+ public void noDeadlock() {
+ ExecutorService executor = Executors.newFixedThreadPool(1);
+ Semaphore semaphore = new Semaphore(1);
- Multiset<Integer> expectedValues =
- IntStream.range(0, 100)
- .boxed()
- .flatMap(i -> Stream.of(i, i, i))
- .collect(ImmutableMultiset.toImmutableMultiset());
+ List<Iterable<Integer>> iterablesA =
+ ImmutableList.of(
+ testIterable(
+ semaphore::acquire, semaphore::release, IntStream.range(0,
100).iterator()));
+ List<Iterable<Integer>> iterablesB =
+ ImmutableList.of(
+ testIterable(
+ semaphore::acquire, semaphore::release, IntStream.range(200,
300).iterator()));
- ExecutorService executor = Executors.newCachedThreadPool();
- ParallelIterable<Integer> parallelIterable = new
ParallelIterable<>(iterables, executor, 1);
- ParallelIterator<Integer> iterator = (ParallelIterator<Integer>)
parallelIterable.iterator();
+ ParallelIterable<Integer> parallelIterableA = new
ParallelIterable<>(iterablesA, executor, 1);
+ ParallelIterable<Integer> parallelIterableB = new
ParallelIterable<>(iterablesB, executor, 1);
Review Comment:
no. Language version issue?
##########
core/src/main/java/org/apache/iceberg/util/ParallelIterable.java:
##########
@@ -165,7 +168,10 @@ private synchronized boolean checkTasks() {
}
}
- taskFutures[i] = submitNextTask();
+ // submit a new task if there is space in the queue
+ if (queue.size() < maxQueueSize) {
+ taskFutures[i] = submitNextTask();
Review Comment:
no. It's just an optimization to avoid active loop (while queue is full) via
executor which increases congestion.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]