mccheah commented on a change in pull request #45: Lazily submit tasks in
ParallelIterable and add cancellation.
URL: https://github.com/apache/incubator-iceberg/pull/45#discussion_r240776853
##########
File path: core/src/main/java/com/netflix/iceberg/util/ParallelIterable.java
##########
@@ -19,73 +19,124 @@
package com.netflix.iceberg.util;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+import com.netflix.iceberg.io.CloseableGroup;
+import java.io.Closeable;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-public class ParallelIterable<T> implements Iterable<T> {
+public class ParallelIterable<T> extends CloseableGroup implements Iterable<T>
{
private final Iterable<Iterable<T>> iterables;
- private final ExecutorService trackingPool;
private final ExecutorService workerPool;
public ParallelIterable(Iterable<Iterable<T>> iterables,
- ExecutorService trackingPool,
ExecutorService workerPool) {
this.iterables = iterables;
- this.trackingPool = trackingPool;
this.workerPool = workerPool;
}
@Override
public Iterator<T> iterator() {
- return new ParallelIterator<>(iterables, trackingPool, workerPool);
+ ParallelIterator<T> iter = new ParallelIterator<>(iterables, workerPool);
+ addCloseable(iter);
+ return iter;
}
- private static class ParallelIterator<T> implements Iterator<T> {
+ private static class ParallelIterator<T> implements Iterator<T>, Closeable {
Review comment:
I'm wondering if there's a more idiomatic way to do this, particularly one
that doesn't require both:
1. Busy waiting. Generally it's a flag in concurrent programming if busy
waiting is used over alternative primitives like locks, conditions, queues,
monitors, etc.
2. Manual tracking of tasks by index.
I came up with the following alternative. Apologize that it has to be in
pseudo-code form; due to the nature of the problem it's pretty hard to explain
without the code. Let's see how this works out:
```
class ParallelIterator<T> implements Iterator<T>, Closeable {
private LinkedList<T> availableValues;
private LinkedList<Future<List<T>>> runningTasks;
private ExecutorService threadPool;
private Iterator<Iterable<T>> pendingValues;
// Constructor etc.
boolean hasNext() {
return !runningTasks.isEmpty() || !availableValues.isEmpty() ||
!pendingValues.isEmpty();
}
T next() {
if (!availableValues.isEmpty()) {
return availableValues.poll();
}
if (!runningTasks.isEmpty()) {
availableValues.addAll(runningTasks.poll().get());
return next(); // Or availableValues.poll() if we don't like recursion
}
if (pendingValues.hasNext()) {
// Buffer / eagerly submit some set of tasks, i.e. lookahead.
for (int i = 0; i < TASK_COUNT && pendingValues.hasNext(); i++) {
Iterable<T> nextPendingValues = pendingValues.next();
Future<List<T>> nextRunningTask = threadPool.submit(() ->
ImmutableList.copyOf(nextPendingValues));
runningTasks.add(nextRunningTask);
}
return next(); // Recursive call that checks will now check based on
running tasks
}
throw error; // No values remaining
}
}
```
The general idea is to keep a running iterator over the backing iterable.
When calling `next()`, submit tasks that are buffered into a worker queue of
futures; each future represents computing the next group of values. Then on
`next()`:
- Get an available value from a completed task, if possible
- Else check the work queue and see if a new batch of values is ready
- Otherwise submit more work and wait
What do you think about this approach? The advantages are:
- No busy waiting
- No need to maintain indices manually. Everything is done via collection
primitives (`poll`, `iterator`, etc.)
There's a few ways this framework can be adjusted. For example on `next`, if
we determine that there is only some minimum number of running tasks remaining,
we can choose to eagerly submit work ahead of the user actually requesting for
those values - thereby we pipeline the main thread's work on the values with
the worker thread's work that produces the values.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services