findepi commented on code in PR #10691:
URL: https://github.com/apache/iceberg/pull/10691#discussion_r1685429472
##########
core/src/main/java/org/apache/iceberg/util/ParallelIterable.java:
##########
@@ -136,30 +169,33 @@ private boolean checkTasks() {
}
}
- return !closed && (tasks.hasNext() || hasRunningTask);
+ return !closed.get() && (tasks.hasNext() || hasRunningTask);
}
- private Future<?> submitNextTask() {
- if (!closed && tasks.hasNext()) {
- return workerPool.submit(tasks.next());
+ private CompletableFuture<Optional<Task<T>>> submitNextTask() {
+ if (!closed.get()) {
+ if (!yieldedTasks.isEmpty()) {
+ return CompletableFuture.supplyAsync(yieldedTasks.removeFirst(),
workerPool);
+ } else if (tasks.hasNext()) {
+ return CompletableFuture.supplyAsync(tasks.next(), workerPool);
+ }
}
return null;
}
@Override
public synchronized boolean hasNext() {
- Preconditions.checkState(!closed, "Already closed");
-
- // if the consumer is processing records more slowly than the producers,
then this check will
- // prevent tasks from being submitted. while the producers are running,
this will always
- // return here before running checkTasks. when enough of the tasks are
finished that the
- // consumer catches up, then lots of new tasks will be submitted at
once. this behavior is
- // okay because it ensures that records are not stacking up waiting to
be consumed and taking
- // up memory.
- //
- // consumers that process results quickly will periodically exhaust the
queue and submit new
- // tasks when checkTasks runs. fast consumers should not be delayed.
- if (!queue.isEmpty()) {
+ Preconditions.checkState(!closed.get(), "Already closed");
+
+ // If the consumer is processing records more slowly than the producers,
the producers will
+ // eventually fill the queue and yield, returning continuations.
Continuations and new tasks
+ // are started by checkTasks(). The check here prevents us from
restarting continuations or
Review Comment:
i think there is a lot we can change about this code. I don't like the fact
that method called "check tasks" has side-effect of scheduling new tasks. I
also don't like the fact we do thread sleep. At minimum, the sleep should be
interrupted when new item is put into the queue.
however, i wanted to avoid doing too many changes in single PR. I thought
it's generally discouraged.
@stevenzwu would be OK to consider the sleep(10) to be pre-existing and not
address now? Or do you think this sleep, when combined with changes introduced
in this PR, has overall more negative effect that it used to be?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]