Hi Chris, This looks like a straight forward way to solve the problem with minimal disruption from the existing API. Can I make a few suggestions though?
We could add a contentLength parameter to fromPublisher, to allow Flow.Publishers where the content length is known to be easily converted to BodyPublisher: static BodyPublisher fromPublisher(Flow.Publisher<ByteBuffer> publisher, int contentLength) { ... } This would mean if you were receiving a servlet request body and publishing it to another location, then you could do something like this (this uses a reactive streams implementation on top of the servlet API that I wrote): HttpServletRequest request = ... long contentLength = -1; if (request.getHeader("Content-Length") != null) { contentLength = Long.parseLong(request.getHeader("Content-Length")); } Publisher<ByteBuffer> publisher = new RequestPublisher(request.startAsync(), 8192); HttpRequest clientRequest = HttpRequest.newBuilder(target) .POST(BodyPublisher.fromPublisher(publisher, contentLength)) .build() Perhaps the method could be overloaded for both supplying and not supplying a content length. Similarly, I think a fromSubscriber API that accepted a CompletionStage<T> would be a little more fluent than having to supply it externally: public static <T> BodyHandler<T> fromSubscriber(Subscriber<? super List<ByteBuffer>> subscriber, CompletionStage<T> bodyFuture) { ... } Then you could have something like this: TextSubscriber subscriber = ...; // accumulates bytes and transforms them into a CompletionStage<String>. CompletionStage<String> result = subscriber.getTextResult(); CompletableFuture<String> cf = client .sendAsync(request, BodyHandler.fromSubscriber(subscriber, result)); String text = cf.join(); Likewise, this could be an overload of fromSubscriber if we want the option of not specifying a body future. One thing I think needs to be carefully specified is, if the method doesn't accept a CompletionStage, when/how the CompletionStage returned from send is redeemed. Regards, James On 9 December 2017 at 04:31, Chris Hegarty <chris.hega...@oracle.com> wrote: > James, > > Thanks for taking the time to look at this, and sending your thoughts. > > On 08/12/17 00:30, James Roper wrote: > > Hi all, > > > > I wanted to start a discussion about the use of Flow.Subscriber and > > Flow.Publisher in JEP 321 (HTTP Client API). > > > > It seems that users are required to implement their own publishers and > > subscribers, that is, they can't take a Flow.Publisher or > > Flow.Subscriber provided by another reactive streams implementation, and > > pass it on to the HttpClient API. The reason for this is that the > > HttpClient API doesn't accept Flow.Publisher/Flow.Subscriber, rather it > > extends them in HttpRequest.BodyPublisher and > > HttpResponse.BodySubscriber, and then requires the user to return > > instances of those sub interfaces from their BodyHandlers. ... > > Great point. I think we can address this with straight forward adapters. > For example: > > public interface BodyPublisher extends Flow.Publisher<ByteBuffer> { > > /** > * Returns a request body publisher whose body is retrieved from the > * given {@code Flow.Publisher}. The returned request body publisher > * has an unknown content length. > * > * @apiNote This method can be used as an adapter between {@code > * BodyPublisher} and {@code Flow.Publisher}. > * > * @param publisher the publisher responsible for publishing the body > * @return a BodyPublisher > */ > static BodyPublisher fromPublisher(Flow.Publisher<ByteBuffer> > publisher) { > ... > } > > ... > > public BodySubscriber<T> apply(int statusCode, HttpHeaders > responseHeaders); > > /** > * Returns a response body handler that returns a {@link > BodySubscriber > * BodySubscriber}{@code <Void>} obtained from {@link > * BodySubscriber#fromSubscriber(Subscriber)}. > * > * @apiNote This method can be used as an adapter between {@code > * BodySubscriber} and {@code Flow.Subscriber}. > * > * <p> For example: > * <pre> {@code > * TextSubscriber subscriber = ...; // accumulates bytes and > transforms them into a String. > * Supplier<String> result = subscriber::getTextResult; > * > * CompletableFuture<String> cf = client > * .sendAsync(request, BodyHandler.fromSubscriber(sub > scriber)) > * .thenApply((response -> result.get())); > * String text = cf.join(); > * }</pre> > * > * @param subscriber the subscriber > * @return a response body handler > */ > public static BodyHandler<Void> fromSubscriber(Subscriber<? super > List<ByteBuffer>> subscriber) { > ... > } > > // Add an equivalent BodySubscriber ... > > > This would allow the API to retain its Flow specific types ( that add > additional HTTP specific and API behavior ), while interacting, without > much fuss, with regular Publishers and Subscribers. > > -Chris. > -- *James Roper* *Senior Octonaut* Lightbend <https://www.lightbend.com/> – Build reactive apps! Twitter: @jroper <https://twitter.com/jroper>