Hi David,
This is the effect of how parallel streams are implemented, where different
stages, which are not representible as a join-less Spliterator are executed as
a series of "islands" where the next isn't started until the former has
completed.
If you think about it, parallelization of a Stream works best when the entire
data set can be split amongst a set of worker threads, and that sort of implies
that you want eager pre-fetch of data, so if your dataset does not fit in
memory, that is likely to lead to less desirable outcomes.
What I was able to do for Gatherers is to implement "gather(…) +
collect(…)"-fusion so any number of consecutive gather(…)-operations
immediately followed by a collect(…) is run in the same "island".
So with that said, you could try something like the following:
static <T> Collector<T, ?, Void> forEach(Consumer<? super T> each) {
return Collector.of(() -> null, (v, e) -> each.accept(e), (l, r) -> l, (v)
-> null, Collector.Characteristics.IDENTITY_FINISH);
}
stream
.parallel()
.unordered()
.gather(Gatherers.windowFixed(BATCH_SIZE))
.collect(forEach(eachList -> println(eachList.getFirst())));
Cheers,
√
Viktor Klang
Software Architect, Java Platform Group
Oracle
________________________________
From: core-libs-dev <[email protected]> on behalf of David
Alayachew <[email protected]>
Sent: Monday, 11 November 2024 14:52
To: core-libs-dev <[email protected]>
Subject: Re: Question about Streams, Gatherers, and fetching too many elements
And just to avoid the obvious question, I can hold about 30 batches in memory
before the Out of Memory error occurs. So this is not an issue of my batch size
being too high.
But just to confirm, I set the batch size to 1, and it still ran into an out of
memory error. So I feel fairly confident saying that the Gatherer is trying to
grab all available data before sending any of it downstream.
On Mon, Nov 11, 2024, 8:46 AM David Alayachew
<[email protected]<mailto:[email protected]>> wrote:
Hello Core Libs Dev Team,
I was trying out Gatherers for a project at work, and ran into a rather sad
scenario.
I need to process a large file in batches. Each batch is small enough that I
can hold it in memory, but I cannot hold the entire file (and thus, all of the
batches) in memory at once.
Looking at the Gatherers API, I saw windowFixed and thought that it would be a
great match for my use case.
However, when trying it out, I was disappointed to see that it ran out of
memory very quickly. Here is my attempt at using it.
stream
.parallel()
.unordered()
.gather(Gatherers.windowFixed(BATCH_SIZE))
.forEach(eachList -> println(eachList.getFirst()))
;
As you can see, I am just splitting the file into batches, and printing out the
first of each batch. This is purely for example's sake, of course. I had
planned on building even more functionality on top of this, but I couldn't even
get past this example.
But anyways, not even a single one of them printed out. Which leads me to
believe that it's pulling all of them in the Gatherer.
I can get it to run successfully if I go sequentially, but not parallel.
Parallel gives me that out of memory error.
Is there any way for me to be able to have the Gatherer NOT pull in everything
while still remaining parallel and unordered?
Thank you for your time and help.
David Alayachew