Hi Chris,

This looks like a straight forward way to solve the problem with minimal
disruption from the existing API. Can I make a few suggestions though?

We could add a contentLength parameter to fromPublisher, to allow
Flow.Publishers where the content length is known to be easily converted to
BodyPublisher:

static BodyPublisher fromPublisher(Flow.Publisher<ByteBuffer> publisher,
int contentLength) {
    ...
}

This would mean if you were receiving a servlet request body and publishing
it to another location, then you could do something like this (this uses a
reactive streams implementation on top of the servlet API that I wrote):

HttpServletRequest request = ...
long contentLength = -1;
if (request.getHeader("Content-Length") != null) {
  contentLength = Long.parseLong(request.getHeader("Content-Length"));
}
Publisher<ByteBuffer> publisher = new RequestPublisher(request.startAsync(),
8192);

HttpRequest clientRequest = HttpRequest.newBuilder(target)
  .POST(BodyPublisher.fromPublisher(publisher, contentLength))
  .build()

Perhaps the method could be overloaded for both supplying and not supplying
a content length.

Similarly, I think a fromSubscriber API that accepted a CompletionStage<T>
would be a little more fluent than having to supply it externally:

public static <T> BodyHandler<T> fromSubscriber(Subscriber<? super
List<ByteBuffer>> subscriber, CompletionStage<T> bodyFuture) {
  ...
}

Then you could have something like this:

TextSubscriber subscriber = ...;  // accumulates bytes and transforms them
into a CompletionStage<String>.
CompletionStage<String> result = subscriber.getTextResult();

CompletableFuture<String> cf =  client
  .sendAsync(request, BodyHandler.fromSubscriber(subscriber, result));
String text = cf.join();

Likewise, this could be an overload of fromSubscriber if we want the option
of not specifying a body future.

One thing I think needs to be carefully specified is, if the method doesn't
accept a CompletionStage, when/how the CompletionStage returned from send
is redeemed.

Regards,

James

On 9 December 2017 at 04:31, Chris Hegarty <chris.hega...@oracle.com> wrote:

> James,
>
> Thanks for taking the time to look at this, and sending your thoughts.
>
> On 08/12/17 00:30, James Roper wrote:
> > Hi all,
> >
> > I wanted to start a discussion about the use of Flow.Subscriber and
> > Flow.Publisher in JEP 321 (HTTP Client API).
> >
> > It seems that users are required to implement their own publishers and
> > subscribers, that is, they can't take a Flow.Publisher or
> > Flow.Subscriber provided by another reactive streams implementation, and
> > pass it on to the HttpClient API. The reason for this is that the
> > HttpClient API doesn't accept Flow.Publisher/Flow.Subscriber, rather it
> > extends them in HttpRequest.BodyPublisher and
> > HttpResponse.BodySubscriber, and then requires the user to return
> > instances of those sub interfaces from their BodyHandlers. ...
>
> Great point. I think we can address this with straight forward adapters.
> For example:
>
>   public interface BodyPublisher extends Flow.Publisher<ByteBuffer> {
>
>      /**
>      * Returns a request body publisher whose body is retrieved from the
>      * given {@code Flow.Publisher}. The returned request body publisher
>      * has an unknown content length.
>      *
>      * @apiNote This method can be used as an adapter between {@code
>      * BodyPublisher} and {@code Flow.Publisher}.
>      *
>      * @param publisher the publisher responsible for publishing the body
>      * @return a BodyPublisher
>      */
>     static BodyPublisher fromPublisher(Flow.Publisher<ByteBuffer>
> publisher) {
>         ...
>     }
>
>     ...
>
>    public BodySubscriber<T> apply(int statusCode, HttpHeaders
> responseHeaders);
>
>      /**
>       * Returns a response body handler that returns a {@link
> BodySubscriber
>       * BodySubscriber}{@code <Void>} obtained from {@link
>       * BodySubscriber#fromSubscriber(Subscriber)}.
>       *
>       * @apiNote This method can be used as an adapter between {@code
>       * BodySubscriber} and {@code Flow.Subscriber}.
>       *
>       * <p> For example:
>       * <pre> {@code
>       * TextSubscriber subscriber = ...;  // accumulates bytes and
> transforms them into a String.
>       * Supplier<String> result = subscriber::getTextResult;
>       *
>       * CompletableFuture<String> cf =  client
>       *         .sendAsync(request, BodyHandler.fromSubscriber(sub
> scriber))
>       *         .thenApply((response -> result.get()));
>       * String text = cf.join();
>       * }</pre>
>       *
>       * @param subscriber the subscriber
>       * @return a response body handler
>       */
>      public static BodyHandler<Void> fromSubscriber(Subscriber<? super
> List<ByteBuffer>> subscriber) {
>          ...
>      }
>
>      // Add an equivalent BodySubscriber ...
>
>
> This would allow the API to retain its Flow specific types ( that add
> additional HTTP specific and API behavior ), while interacting, without
> much fuss, with regular Publishers and Subscribers.
>
> -Chris.
>



-- 
*James Roper*
*Senior Octonaut*

Lightbend <https://www.lightbend.com/> – Build reactive apps!
Twitter: @jroper <https://twitter.com/jroper>

Reply via email to