Re: websockets

2018-02-12 Thread Chuck Davis
Hi Pavel:

Thanks for your input.  You guys are a lot smarter than I am so I
won't pretend to have a situation you haven't thought about.

My typical use case is to fetch database data from the server using
ejb.  I return the data in serialized java objects to my JavaFX
clients (websockets are not just for browsers though few seem to
realize that!). There is never a way to tell how much data may be
returned.  It may be from a job costing system that returns thousands
of rows.  I like the idea of using websockets rather than a direct ejb
connection because I want, in the future, to add server pushes to my
application.  i.e. if some user adds a ledger account all registered
GL listeners would get the new account in their desktop.  Neither ejb
nor rmi provides that capability.  In fact, nothing else does except
websockets as far as I know.

I'm sure I can learn to use the new .request(long) feature.  Looking,
as you mentioned, at the Flow api I see where they used the
UnboundedSubscription with request(Long.MAX_VALUE) which was my
initial reaction to this new wrinkle.

I am under the impression that this new API requires the developer to
buffer the messages until all the data is complete. As I recall (it's
been a few months since I implemented) decoders handled that in the
old API and if I am correct that is going to put a lot more work on
the plate of developers.  If my understanding is correct I'd suggest
listeners should do the buffering and deliver a complete message.  If
it's written once in the implementation it would save thousands of
developer hours recreating the same wheel.  At least this approach
makes sense in my use case.  Partial pieces of serialized objects are
useless.

Can you expound a little more on this:  "Finally, each of the
listener's methods asks the user to signal when the user has processed
the message it bears [2]."  All I see is that the method returns a
CompletionStage which triggers a procession of CompletionStage objects
(not sure yet how this ends except by returning a null?) and I don't
see in the docs how this signals the listener except that the method
exits and may be called again.  I'm sorry to be so dense here.

Finally, is there a target release to graduate from incubator?  I'm
kinda stuck until the dust settles.  May it hit jdk10 or 11?
"Incubating Feature. Will be removed in a future release." sounds
so.ominous!

Again, Pavel, thanks for your response and interest.


Re: websockets

2018-02-12 Thread Chris Hegarty

James,

On 10/02/18 07:38, James Roper wrote:

...

https://developer.lightbend.com/blog/2018-02-06-reactive-streams-ee4j/index.html


Regarding: 
https://github.com/jroper/reactive-streams-servlet/blob/7a2a651b706bb0612f6d11311e442f82ce307ed2/reactive-streams-servlet/src/main/java/org/reactivestreams/servlet/RequestPublisher.java


If I'm not mistaken, this appears to be mainly an adapter
from Publisher of InputStream to Publisher of ByteBuffer.
Did you overlook HttpRequest.BodyPublisher
fromInputStream​(Supplier streamSupplier)?
Or did you run into some issue with it?

-Chris.


Re: websockets

2018-02-12 Thread Pavel Rappo
Hello James,

Thanks for the comprehensive reply to Chuck's email. Now regarding your own 
email.

> On 10 Feb 2018, at 07:38, James Roper  wrote:
> 
> 
> 
> But that brings me to a problem that I'd like to give as feedback to the 
> implementers - this API is not Reactive Streams, and so therefore can't take 
> advantage of Reactive Streams implementations, and more problematic, can't 
> interop with other Reactive Streams sinks/sources. If I wanted to stream a 
> WebSocket into a message broker that supports Reactive Streams, I can't. I 
> would definitely hope that Reactive Streams support could be added to this 
> API, at a minimum as a wrapper, so that application developers can easily 
> focus on their business problems, plumbing and transforming messages from one 
> place to another, rather than having to deal with implementing concurrent 
> code to pass messages.
> 

I totally understand your concern over the lack of a Reactive Streams interface
to this low-level API. All I can say is that it's not that we haven't tried. As
usual the devil is in the detail. There were at least a couple of major issues
we couldn't find a satisfactory solution to.

The first one is how to communicate errors back to the user's code that
publishes messages to WebSocket. This issue has been extensively discussed here 
[1].
The only signal the publisher can receive from a subscriber in the case of a
failure is a cancellation signal. After this signal the subscription is
considered cancelled. Now, there are different solutions to this problem, but
none of the ones we looked at seemed comprehensive. For example, such errors
can be propagated by the WebSocket to the user's subscriber receiving messages.
In which case WebSocket input and output will become tied and the WebSocket
client will seem to require both the publisher and the subscriber to be present
at the same time.

The second issue is how to communicate completion signals from processing
individual messages. That might be handy in the case the implementation or the
application decide to recycle buffers they use. With a naive RS interface to
WebSocket all the user's publisher can receive from the WebSocket's subscriber
is a request for a number of extra messages. Using only this number the
publisher cannot deduce which of the previously published messages have been
actually sent. This problem also has a number of solutions. One of which would
be to use more streams. An extra publisher WebSocket would use to emit outcomes
of sending messages and an extra subscriber WebSocket would use to receive
signals from the user once a message has been received. To be honest it looks
cumbersome.

There are also a dozen of smaller issues that have to be resolved before
creating a well-defined RS interface to WebSocket.

> It may well require wrapping messages in a high level object - text, binary, 
> ping, pong, etc, to differentiate between the message types.
> 

We went great lengths in supporting different kinds of requirements in this API.
One of the such requirements that arose from a number of discussions on this
mailing list was not to force unnecessary allocations, garbage creation and
copying. To this end we utilised completion stages for send/receive operations 
[2].
We abandoned types for messages and used a method-naming scheme instead (I
believe with RS the equivalent would be to provide a stream per message type).
Even WebSocket.Listener was designed such that one instance could (if required)
service many WebSocket instances. That's the reason each method in Listener has
a WebSocket argument.

> 
> 
> https://developer.lightbend.com/blog/2018-02-06-reactive-streams-ee4j/index.html
> 

Thanks for the link.

That said, I think once we have satisfactory answers to the questions above,
we might provide an RS adapter you were talking about to this low-level API.

Meanwhile it is possible to built one already, on top of the existing API even
though it is not the same as RS, semantically they are very similar.

Thanks,
-Pavel

---
[1] https://github.com/reactive-streams/reactive-streams-jvm/issues/271
[2] At one point we were even considering using callbacks similar to
java.nio.channels.CompletionHandler instead of 
java.util.concurrent.CompletionStage
for both sending and receiving messages. All this is for the sake of not
creating extra objects when this is considered expensive. Who knows, we
might come back to retrofit the API and add this capability later.



Re: websockets

2018-02-12 Thread Viktor Klang
Hi Chuck,

(Disclosure: I'm an RS SIG founding member.)

On Sat, Feb 10, 2018 at 7:14 PM, Chuck Davis  wrote:

> Hi James:
>
> Thanks for your response and the information in your (and other)
> blog(s).  I haven't had time to learn all the new features of jdk9 yet
> so a look at Flow was interesting.
>
> My first impression is that a time constraint would be better than
> destroying asynchrony.


The good new is that no asynchrony is destroyed. The first sentence of
reactive-streams.org quite literally says:

"Reactive Streams is an initiative to provide a standard for *asynchronous*
stream processing with non-blocking back pressure." (emphasis mine)


> Rather than telling the source to only send
> one message and thus effectively make it a synchronous process, tell
> the source to only send one message per 100 millis or 1000 millis or
> whatever is appropriate back-pressure while my other threads are doing
> their thing ??


Not only would that not work—as in solve the problem—and it would lead to
abysmal performance.



> When the WebSocket.Listener has a complete message
> trigger a notification event to registered listeners.  And if the
> programmer wants to process intermediate results that would not be too
> difficult for the listener to track.
>
> I can understand the potential need for "back-pressure" but I think
> conserving the asynchronous nature of WebSocket is a high priority as
> well.  Indeed, I've built my application on that feature of
> WebSockets.
>

Fortunately there is no tradeoff needed there. :-)


>
> Thanks again.  At least I'm a bit more enlightened about the issue
> being addressed.
>
> Chuck
>
>
>
>
>
> On Fri, Feb 9, 2018 at 11:38 PM, James Roper  wrote:
> > Hi Chuck,
> >
> > Presumably this API is similar in intention to the request method used in
> > reactive streams (aka java.util.concurrent.Flow), that is, request is the
> > means by which backpressure is propagated. One major problem with JDK8
> > WebSockets is there's no way to asynchronously propagate backpressure,
> you
> > have to accept every message that comes as it comes, you can't tell the
> > other end to back off, which means if it's producing messages faster than
> > you can consume them, your only two options are to fail fast, or risk
> > running out of memory.  Reactive Streams solves this by requiring
> consumers
> > to signal demand for data/messages before they receive any - an
> invocation
> > of the request method is just saying how many more elements the consumer
> is
> > currently ready to receive, and can be invoked many times, as the
> consumer
> > processes messages and is ready to receive more. Generally, for Reactive
> > Streams, application developers are not expected to implement or invoke
> > these APIs directly, instead, they are expected to use reactive streams
> > implementations like Akka Streams, rxJava or Reactor, which efficiently
> > manage buffering and keeping the buffer at an appropriate level for the
> > application developer, so the application developer can just focus on
> their
> > business concerns.
> >
> > But that brings me to a problem that I'd like to give as feedback to the
> > implementers - this API is not Reactive Streams, and so therefore can't
> take
> > advantage of Reactive Streams implementations, and more problematic,
> can't
> > interop with other Reactive Streams sinks/sources. If I wanted to stream
> a
> > WebSocket into a message broker that supports Reactive Streams, I can't.
> I
> > would definitely hope that Reactive Streams support could be added to
> this
> > API, at a minimum as a wrapper, so that application developers can easily
> > focus on their business problems, plumbing and transforming messages from
> > one place to another, rather than having to deal with implementing
> > concurrent code to pass messages. It may well require wrapping messages
> in a
> > high level object - text, binary, ping, pong, etc, to differentiate
> between
> > the message types.
> >
> > For a broader context of how this would fit into the broader Reactive
> > Streams ecosystem, I've published a blog post of what it would look like
> if
> > Java EE/EE4J were to adopt Reactive Streams everywhere, as it happens
> this
> > also includes proposals for using Reactive Streams in the JSR 356
> WebSocket
> > spec:
> >
> > https://developer.lightbend.com/blog/2018-02-06-reactive-
> streams-ee4j/index.html
> >
> > Regards,
> >
> > James
> >
> > On 9 February 2018 at 19:16, Chuck Davis  wrote:
> >>
> >> I've been using jdk8 websockets to develop my desktop java
> >> applications.  Now that jdk9 is on my machine I started looking at
> >> websockets and I'm not at all sure I like what I see.  Can someone
> >> familiar with this feature please explain the rationale for what is
> >> happening?
> >>
> >> I'm concerned, at this initial stage, primarily by
> >> WebSocket.request(long).  This "feature" seems to have at least two
>