Re: JEP 321: HTTP Client - Use of Flow.Subscriber and Flow.Publisher

2017-12-13 Thread Chris Hegarty

On 12/12/17 11:13, Chris Hegarty wrote:


I filed the following JIRA issue to track this discussion
and proposal.

   https://bugs.openjdk.java.net/browse/JDK-8193365

I'll start the process of bringing it into 10, unless there
are any final comments. FTR, I'm happy where we ended up on
this.


And link to the CSR that contains the full proposed specification
changes:
  https://bugs.openjdk.java.net/browse/JDK-8193366

-Chris.


Re: JEP 321: HTTP Client - Use of Flow.Subscriber and Flow.Publisher

2017-12-12 Thread Chris Hegarty


I filed the following JIRA issue to track this discussion
and proposal.

  https://bugs.openjdk.java.net/browse/JDK-8193365

I'll start the process of bringing it into 10, unless there
are any final comments. FTR, I'm happy where we ended up on
this.

-Chris.

On 11/12/17 15:48, Chris Hegarty wrote:

James,

On 11/12/17 00:47, James Roper wrote:
 > Hi Chris,
 >
 > This looks like a straight forward way to solve the problem with minimal
 > disruption from the existing API. Can I make a few suggestions though?

Of course, your input here is much appreciated.

 > We could add a contentLength parameter to fromPublisher, to allow
 > Flow.Publishers where the content length is known to be easily converted
 > to BodyPublisher:
 >
 > static BodyPublisher fromPublisher(Flow.Publisher publisher,
 > int contentLength) {
 >  ...
 > }

Good idea. Added ( as can be seen below ).

 > This would mean if you were receiving a servlet request body and
 > publishing it to another location, then you could do something like this
 > (this uses a reactive streams implementation on top of the servlet API
 > that I wrote):
 >
 > HttpServletRequest request = ...
 > long contentLength = -1;
 > if (request.getHeader("Content-Length") != null) {
 >contentLength = Long.parseLong(request.getHeader("Content-Length"));
 > }
 > Publisher publisher = new
 > RequestPublisher(request.startAsync(), 8192);
 >
 > HttpRequest clientRequest = HttpRequest.newBuilder(target)
 >.POST(BodyPublisher.fromPublisher(publisher, contentLength))
 >.build()

Nice.

 > Perhaps the method could be overloaded for both supplying and not
 > supplying a content length.

I think an overload is justified here. Added.

 > Similarly, I think a fromSubscriber API that accepted a
 > CompletionStage would be a little more fluent than having to supply
 > it externally:

Daniel and I discussed this too, but I opted to leave it out initially
for simplicity. I think if we have two overloads, then the simple case
can still be supported with little ceremony, while allowing a more
powerful variant.

 > public static  BodyHandler fromSubscriber(Subscriber List> subscriber, CompletionStage bodyFuture) {
 >...
 > }
 >
 > Then you could have something like this:
 >
 > TextSubscriber subscriber = ...;  // accumulates bytes and transforms
 > them into a CompletionStage.
 > CompletionStage result = subscriber.getTextResult();
 >
 > CompletableFuture cf =  client
 >.sendAsync(request, BodyHandler.fromSubscriber(subscriber, result));
 > String text = cf.join();
 >
 > Likewise, this could be an overload of fromSubscriber if we want the
 > option of not specifying a body future.
 >
 > One thing I think needs to be carefully specified is, if the method
 > doesn't accept a CompletionStage, when/how the CompletionStage returned
 > from send is redeemed.

Hmmm... this could be tricky. I think we can avoid the scenario
completely by accepting a finishing function that can generate/return
the value to use for completion, rather than the CF itself.

Here is an outline of all of the above:

   public interface BodyPublisher extends Flow.Publisher {

 /**
  * Returns a request body publisher whose body is retrieved from the
  * given {@code Flow.Publisher}. The returned request body publisher
  * has an unknown content length.
  *
  * @apiNote This method can be used as an adapter between {@code
  * BodyPublisher} and {@code Flow.Publisher}, where the amount of
  * request body that the publisher will publish is unknown.
  *
  * @param publisher the publisher responsible for publishing the body
  * @return a BodyPublisher
  */
 static BodyPublisher fromPublisher(Flow.PublisherByteBuffer> publisher) { ... }


 /**
  * Returns a request body publisher whose body is retrieved from the
  * given {@code Flow.Publisher}. The returned request body publisher
  * has the given content length.
  *
  *  The given {@code contentLength} is a positive number, that
  * represents the exact amount of bytes the {@code publisher} must
  * publish.
  *
  * @apiNote This method can be used as an adapter between {@code
  * BodyPublisher} and {@code Flow.Publisher}, where the amount of
  * request body that the publisher will publish is known.
  *
  * @param publisher the publisher responsible for publishing the body
  * @param contentLength a positive number representing the exact
  *  amount of bytes the publisher will publish
  * @throws IllegalArgumentException if the content length is
  *  non-positive
  * @return a BodyPublisher
  */
 static BodyPublisher fromPublisher(Flow.PublisherByteBuffer> publisher,

long contentLength) { ... }


   public interface BodyHandler {

 /**
  * Returns a response body handler that returns a {@link 
BodySubscriber

  * B

Re: JEP 321: HTTP Client - Use of Flow.Subscriber and Flow.Publisher

2017-12-11 Thread Chris Hegarty

James,

On 11/12/17 00:47, James Roper wrote:
> Hi Chris,
>
> This looks like a straight forward way to solve the problem with minimal
> disruption from the existing API. Can I make a few suggestions though?

Of course, your input here is much appreciated.

> We could add a contentLength parameter to fromPublisher, to allow
> Flow.Publishers where the content length is known to be easily converted
> to BodyPublisher:
>
> static BodyPublisher fromPublisher(Flow.Publisher publisher,
> int contentLength) {
>  ...
> }

Good idea. Added ( as can be seen below ).

> This would mean if you were receiving a servlet request body and
> publishing it to another location, then you could do something like this
> (this uses a reactive streams implementation on top of the servlet API
> that I wrote):
>
> HttpServletRequest request = ...
> long contentLength = -1;
> if (request.getHeader("Content-Length") != null) {
>contentLength = Long.parseLong(request.getHeader("Content-Length"));
> }
> Publisher publisher = new
> RequestPublisher(request.startAsync(), 8192);
>
> HttpRequest clientRequest = HttpRequest.newBuilder(target)
>.POST(BodyPublisher.fromPublisher(publisher, contentLength))
>.build()

Nice.

> Perhaps the method could be overloaded for both supplying and not
> supplying a content length.

I think an overload is justified here. Added.

> Similarly, I think a fromSubscriber API that accepted a
> CompletionStage would be a little more fluent than having to supply
> it externally:

Daniel and I discussed this too, but I opted to leave it out initially
for simplicity. I think if we have two overloads, then the simple case
can still be supported with little ceremony, while allowing a more
powerful variant.

> public static  BodyHandler fromSubscriber(Subscriber List> subscriber, CompletionStage bodyFuture) {
>...
> }
>
> Then you could have something like this:
>
> TextSubscriber subscriber = ...;  // accumulates bytes and transforms
> them into a CompletionStage.
> CompletionStage result = subscriber.getTextResult();
>
> CompletableFuture cf =  client
>.sendAsync(request, BodyHandler.fromSubscriber(subscriber, result));
> String text = cf.join();
>
> Likewise, this could be an overload of fromSubscriber if we want the
> option of not specifying a body future.
>
> One thing I think needs to be carefully specified is, if the method
> doesn't accept a CompletionStage, when/how the CompletionStage returned
> from send is redeemed.

Hmmm... this could be tricky. I think we can avoid the scenario
completely by accepting a finishing function that can generate/return
the value to use for completion, rather than the CF itself.

Here is an outline of all of the above:

  public interface BodyPublisher extends Flow.Publisher {

/**
 * Returns a request body publisher whose body is retrieved from the
 * given {@code Flow.Publisher}. The returned request body publisher
 * has an unknown content length.
 *
 * @apiNote This method can be used as an adapter between {@code
 * BodyPublisher} and {@code Flow.Publisher}, where the amount of
 * request body that the publisher will publish is unknown.
 *
 * @param publisher the publisher responsible for publishing the body
 * @return a BodyPublisher
 */
static BodyPublisher fromPublisher(Flow.PublisherByteBuffer> publisher) { ... }


/**
 * Returns a request body publisher whose body is retrieved from the
 * given {@code Flow.Publisher}. The returned request body publisher
 * has the given content length.
 *
 *  The given {@code contentLength} is a positive number, that
 * represents the exact amount of bytes the {@code publisher} must
 * publish.
 *
 * @apiNote This method can be used as an adapter between {@code
 * BodyPublisher} and {@code Flow.Publisher}, where the amount of
 * request body that the publisher will publish is known.
 *
 * @param publisher the publisher responsible for publishing the body
 * @param contentLength a positive number representing the exact
 *  amount of bytes the publisher will publish
 * @throws IllegalArgumentException if the content length is
 *  non-positive
 * @return a BodyPublisher
 */
static BodyPublisher fromPublisher(Flow.PublisherByteBuffer> publisher,

   long contentLength) { ... }


  public interface BodyHandler {

/**
 * Returns a response body handler that returns a {@link BodySubscriber
 * BodySubscriber}{@code } obtained from {@linkplain
 * BodySubscriber#fromSubscriber(Subscriber)}, with the given
 * {@code subscriber}.
 *
 *  The response body is not available through this, or the {@code
 * HttpResponse} API, but instead all response body is forwarded to the
 * given {@code subscriber}, which should make it available, if
 * appropriate, through some o

Re: JEP 321: HTTP Client - Use of Flow.Subscriber and Flow.Publisher

2017-12-10 Thread James Roper
Hi Chris,

This looks like a straight forward way to solve the problem with minimal
disruption from the existing API. Can I make a few suggestions though?

We could add a contentLength parameter to fromPublisher, to allow
Flow.Publishers where the content length is known to be easily converted to
BodyPublisher:

static BodyPublisher fromPublisher(Flow.Publisher publisher,
int contentLength) {
...
}

This would mean if you were receiving a servlet request body and publishing
it to another location, then you could do something like this (this uses a
reactive streams implementation on top of the servlet API that I wrote):

HttpServletRequest request = ...
long contentLength = -1;
if (request.getHeader("Content-Length") != null) {
  contentLength = Long.parseLong(request.getHeader("Content-Length"));
}
Publisher publisher = new RequestPublisher(request.startAsync(),
8192);

HttpRequest clientRequest = HttpRequest.newBuilder(target)
  .POST(BodyPublisher.fromPublisher(publisher, contentLength))
  .build()

Perhaps the method could be overloaded for both supplying and not supplying
a content length.

Similarly, I think a fromSubscriber API that accepted a CompletionStage
would be a little more fluent than having to supply it externally:

public static  BodyHandler fromSubscriber(Subscriber> subscriber, CompletionStage bodyFuture) {
  ...
}

Then you could have something like this:

TextSubscriber subscriber = ...;  // accumulates bytes and transforms them
into a CompletionStage.
CompletionStage result = subscriber.getTextResult();

CompletableFuture cf =  client
  .sendAsync(request, BodyHandler.fromSubscriber(subscriber, result));
String text = cf.join();

Likewise, this could be an overload of fromSubscriber if we want the option
of not specifying a body future.

One thing I think needs to be carefully specified is, if the method doesn't
accept a CompletionStage, when/how the CompletionStage returned from send
is redeemed.

Regards,

James

On 9 December 2017 at 04:31, Chris Hegarty  wrote:

> James,
>
> Thanks for taking the time to look at this, and sending your thoughts.
>
> On 08/12/17 00:30, James Roper wrote:
> > Hi all,
> >
> > I wanted to start a discussion about the use of Flow.Subscriber and
> > Flow.Publisher in JEP 321 (HTTP Client API).
> >
> > It seems that users are required to implement their own publishers and
> > subscribers, that is, they can't take a Flow.Publisher or
> > Flow.Subscriber provided by another reactive streams implementation, and
> > pass it on to the HttpClient API. The reason for this is that the
> > HttpClient API doesn't accept Flow.Publisher/Flow.Subscriber, rather it
> > extends them in HttpRequest.BodyPublisher and
> > HttpResponse.BodySubscriber, and then requires the user to return
> > instances of those sub interfaces from their BodyHandlers. ...
>
> Great point. I think we can address this with straight forward adapters.
> For example:
>
>   public interface BodyPublisher extends Flow.Publisher {
>
>  /**
>  * Returns a request body publisher whose body is retrieved from the
>  * given {@code Flow.Publisher}. The returned request body publisher
>  * has an unknown content length.
>  *
>  * @apiNote This method can be used as an adapter between {@code
>  * BodyPublisher} and {@code Flow.Publisher}.
>  *
>  * @param publisher the publisher responsible for publishing the body
>  * @return a BodyPublisher
>  */
> static BodyPublisher fromPublisher(Flow.Publisher
> publisher) {
> ...
> }
>
> ...
>
>public BodySubscriber apply(int statusCode, HttpHeaders
> responseHeaders);
>
>  /**
>   * Returns a response body handler that returns a {@link
> BodySubscriber
>   * BodySubscriber}{@code } obtained from {@link
>   * BodySubscriber#fromSubscriber(Subscriber)}.
>   *
>   * @apiNote This method can be used as an adapter between {@code
>   * BodySubscriber} and {@code Flow.Subscriber}.
>   *
>   *  For example:
>   *  {@code
>   * TextSubscriber subscriber = ...;  // accumulates bytes and
> transforms them into a String.
>   * Supplier result = subscriber::getTextResult;
>   *
>   * CompletableFuture cf =  client
>   * .sendAsync(request, BodyHandler.fromSubscriber(sub
> scriber))
>   * .thenApply((response -> result.get()));
>   * String text = cf.join();
>   * }
>   *
>   * @param subscriber the subscriber
>   * @return a response body handler
>   */
>  public static BodyHandler fromSubscriber(Subscriber List> subscriber) {
>  ...
>  }
>
>  // Add an equivalent BodySubscriber ...
>
>
> This would allow the API to retain its Flow specific types ( that add
> additional HTTP specific and API behavior ), while interacting, without
> much fuss, with regular Publishers and Subscribers.
>
> -Chris.
>



-- 
*James Roper*
*Senior Octonaut*

Lightbend  – Build reac

Re: JEP 321: HTTP Client - Use of Flow.Subscriber and Flow.Publisher

2017-12-08 Thread Chris Hegarty

James,

Thanks for taking the time to look at this, and sending your thoughts.

On 08/12/17 00:30, James Roper wrote:
> Hi all,
>
> I wanted to start a discussion about the use of Flow.Subscriber and
> Flow.Publisher in JEP 321 (HTTP Client API).
>
> It seems that users are required to implement their own publishers and
> subscribers, that is, they can't take a Flow.Publisher or
> Flow.Subscriber provided by another reactive streams implementation, and
> pass it on to the HttpClient API. The reason for this is that the
> HttpClient API doesn't accept Flow.Publisher/Flow.Subscriber, rather it
> extends them in HttpRequest.BodyPublisher and
> HttpResponse.BodySubscriber, and then requires the user to return
> instances of those sub interfaces from their BodyHandlers. ...

Great point. I think we can address this with straight forward adapters.
For example:

  public interface BodyPublisher extends Flow.Publisher {

 /**
 * Returns a request body publisher whose body is retrieved from the
 * given {@code Flow.Publisher}. The returned request body publisher
 * has an unknown content length.
 *
 * @apiNote This method can be used as an adapter between {@code
 * BodyPublisher} and {@code Flow.Publisher}.
 *
 * @param publisher the publisher responsible for publishing the body
 * @return a BodyPublisher
 */
static BodyPublisher fromPublisher(Flow.Publisher 
publisher) {

...
}

...

   public BodySubscriber apply(int statusCode, HttpHeaders 
responseHeaders);


 /**
  * Returns a response body handler that returns a {@link 
BodySubscriber

  * BodySubscriber}{@code } obtained from {@link
  * BodySubscriber#fromSubscriber(Subscriber)}.
  *
  * @apiNote This method can be used as an adapter between {@code
  * BodySubscriber} and {@code Flow.Subscriber}.
  *
  *  For example:
  *  {@code
  * TextSubscriber subscriber = ...;  // accumulates bytes and 
transforms them into a String.

  * Supplier result = subscriber::getTextResult;
  *
  * CompletableFuture cf =  client
  * .sendAsync(request, BodyHandler.fromSubscriber(subscriber))
  * .thenApply((response -> result.get()));
  * String text = cf.join();
  * }
  *
  * @param subscriber the subscriber
  * @return a response body handler
  */
 public static BodyHandler fromSubscriber(SubscriberList> subscriber) {

 ...
 }

 // Add an equivalent BodySubscriber ...


This would allow the API to retain its Flow specific types ( that add
additional HTTP specific and API behavior ), while interacting, without
much fuss, with regular Publishers and Subscribers.

-Chris.