Hello! To me, it looks like it's possible to make the better default implementation. It could be done even as a separate static method:
static <T> Stream<T> ofPusher(Consumer<Consumer<? super T>> pusher) { return StreamSupport.stream(new Spliterator<>() { private Spliterator<T> delegate; @Override public boolean tryAdvance(Consumer<? super T> action) { initDelegate(); return delegate.tryAdvance(action); } private void initDelegate() { if (delegate == null) { Stream.Builder<T> builder = Stream.builder(); // or use SpinedBuffer directly pusher.accept(builder); delegate = builder.build().spliterator(); } } @Override public void forEachRemaining(Consumer<? super T> action) { if (delegate != null) { delegate.forEachRemaining(action); } else { pusher.accept(action); } } @Override public Spliterator<T> trySplit() { initDelegate(); return delegate.trySplit(); } @Override public long estimateSize() { return Long.MAX_VALUE; } @Override public int characteristics() { return 0; } }, false); } In this case, we are buffering only if short-circuit operation or splitting is requested. Otherwise, forEachRemaining will just delegate to the pusher. Now, the default implementation could be rewritten as <T, R> Stream<R> mapMulti(Stream<T> stream, BiConsumer<Consumer<? super R>, ? super T> mapper) { Objects.requireNonNull(mapper); return stream.flatMap(e -> ofPusher(sink -> mapper.accept(sink, e))); } And now, I don't think it's necessary to specialize it at all. Probably it's not necessary to introduce mapMulti at all as well, as now it's a trivial delegate to ofPusher. With best regards, Tagir Valeev. On Wed, Jun 24, 2020 at 5:58 PM Patrick Concannon <patrick.concan...@oracle.com> wrote: > > Hi, > > Could someone please review myself and Julia's RFE and CSR for JDK-8238286 - > 'Add new flatMap stream operation that is more amenable to pushing’? > > This proposal is to add a new flatMap-like operation: > > `<R> Stream<R> mapMulti(BiConsumer<Consumer<R>, ? super T> mapper)` > > to the java.util.Stream class. This operation is more receptive to the > pushing or yielding of values than the current implementation that internally > assembles values (if any) into one or more streams. This addition includes > the primitive variations of the operation i.e. mapMultiToInt, IntStream > mapMulti, etc. > > issue: https://bugs.openjdk.java.net/browse/JDK-8238286 > <https://bugs.openjdk.java.net/browse/JDK-8238286> > csr: https://bugs.openjdk.java.net/browse/JDK-8248166 > <https://bugs.openjdk.java.net/browse/JDK-8248166> > > webrev: http://cr.openjdk.java.net/~pconcannon/8238286/webrevs/webrev.00/ > <http://cr.openjdk.java.net/~pconcannon/8238286/webrevs/webrev.00/> > specdiff: > http://cr.openjdk.java.net/~pconcannon/8238286/specdiff/specout.00/overview-summary.html > > <http://cr.openjdk.java.net/~pconcannon/8238286/specdiff/specout.00/overview-summary.html> > > > Kind regards, > Patrick & Julia