Hello!

To me, it looks like it's possible to make the better default
implementation. It could be done even as a separate static method:

static <T> Stream<T> ofPusher(Consumer<Consumer<? super T>> pusher) {
  return StreamSupport.stream(new Spliterator<>() {
    private Spliterator<T> delegate;

    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
      initDelegate();
      return delegate.tryAdvance(action);
    }

    private void initDelegate() {
      if (delegate == null) {
        Stream.Builder<T> builder = Stream.builder(); // or use
SpinedBuffer directly
        pusher.accept(builder);
        delegate = builder.build().spliterator();
      }
    }

    @Override
    public void forEachRemaining(Consumer<? super T> action) {
      if (delegate != null) {
        delegate.forEachRemaining(action);
      } else {
        pusher.accept(action);
      }
    }

    @Override
    public Spliterator<T> trySplit() {
      initDelegate();
      return delegate.trySplit();
    }

    @Override
    public long estimateSize() {
      return Long.MAX_VALUE;
    }

    @Override
    public int characteristics() {
      return 0;
    }
  }, false);
}

In this case, we are buffering only if short-circuit operation or
splitting is requested. Otherwise, forEachRemaining will just delegate
to the pusher.
Now, the default implementation could be rewritten as

<T, R> Stream<R> mapMulti(Stream<T> stream, BiConsumer<Consumer<?
super R>, ? super T> mapper) {
  Objects.requireNonNull(mapper);
  return stream.flatMap(e -> ofPusher(sink -> mapper.accept(sink, e)));
}

And now, I don't think it's necessary to specialize it at all.
Probably it's not necessary to introduce mapMulti at all as well, as
now it's a trivial delegate to ofPusher.

With best regards,
Tagir Valeev.

On Wed, Jun 24, 2020 at 5:58 PM Patrick Concannon
<patrick.concan...@oracle.com> wrote:
>
> Hi,
>
> Could someone please review myself and Julia's RFE and CSR for JDK-8238286 - 
> 'Add new flatMap stream operation that is more amenable to pushing’?
>
> This proposal is to add a new flatMap-like operation:
>
> `<R> Stream<R> mapMulti(BiConsumer<Consumer<R>, ? super T> mapper)`
>
> to the java.util.Stream class. This operation is more receptive to the 
> pushing or yielding of values than the current implementation that internally 
> assembles values (if any) into one or more streams. This addition includes 
> the primitive variations of the operation i.e. mapMultiToInt, IntStream 
> mapMulti, etc.
>
> issue: https://bugs.openjdk.java.net/browse/JDK-8238286 
> <https://bugs.openjdk.java.net/browse/JDK-8238286>
> csr: https://bugs.openjdk.java.net/browse/JDK-8248166 
> <https://bugs.openjdk.java.net/browse/JDK-8248166>
>
> webrev: http://cr.openjdk.java.net/~pconcannon/8238286/webrevs/webrev.00/ 
> <http://cr.openjdk.java.net/~pconcannon/8238286/webrevs/webrev.00/>
> specdiff: 
> http://cr.openjdk.java.net/~pconcannon/8238286/specdiff/specout.00/overview-summary.html
>   
> <http://cr.openjdk.java.net/~pconcannon/8238286/specdiff/specout.00/overview-summary.html>
>
>
> Kind regards,
> Patrick & Julia

Reply via email to