Re: websockets
Hi Pavel: Thanks for your input. You guys are a lot smarter than I am so I won't pretend to have a situation you haven't thought about. My typical use case is to fetch database data from the server using ejb. I return the data in serialized java objects to my JavaFX clients (websockets are not just for browsers though few seem to realize that!). There is never a way to tell how much data may be returned. It may be from a job costing system that returns thousands of rows. I like the idea of using websockets rather than a direct ejb connection because I want, in the future, to add server pushes to my application. i.e. if some user adds a ledger account all registered GL listeners would get the new account in their desktop. Neither ejb nor rmi provides that capability. In fact, nothing else does except websockets as far as I know. I'm sure I can learn to use the new .request(long) feature. Looking, as you mentioned, at the Flow api I see where they used the UnboundedSubscription with request(Long.MAX_VALUE) which was my initial reaction to this new wrinkle. I am under the impression that this new API requires the developer to buffer the messages until all the data is complete. As I recall (it's been a few months since I implemented) decoders handled that in the old API and if I am correct that is going to put a lot more work on the plate of developers. If my understanding is correct I'd suggest listeners should do the buffering and deliver a complete message. If it's written once in the implementation it would save thousands of developer hours recreating the same wheel. At least this approach makes sense in my use case. Partial pieces of serialized objects are useless. Can you expound a little more on this: "Finally, each of the listener's methods asks the user to signal when the user has processed the message it bears [2]." All I see is that the method returns a CompletionStage which triggers a procession of CompletionStage objects (not sure yet how this ends except by returning a null?) and I don't see in the docs how this signals the listener except that the method exits and may be called again. I'm sorry to be so dense here. Finally, is there a target release to graduate from incubator? I'm kinda stuck until the dust settles. May it hit jdk10 or 11? "Incubating Feature. Will be removed in a future release." sounds so.ominous! Again, Pavel, thanks for your response and interest.
Re: websockets
James, On 10/02/18 07:38, James Roper wrote: ... https://developer.lightbend.com/blog/2018-02-06-reactive-streams-ee4j/index.html Regarding: https://github.com/jroper/reactive-streams-servlet/blob/7a2a651b706bb0612f6d11311e442f82ce307ed2/reactive-streams-servlet/src/main/java/org/reactivestreams/servlet/RequestPublisher.java If I'm not mistaken, this appears to be mainly an adapter from Publisher of InputStream to Publisher of ByteBuffer. Did you overlook HttpRequest.BodyPublisher fromInputStream(Supplier streamSupplier)? Or did you run into some issue with it? -Chris.
Re: websockets
Hello James, Thanks for the comprehensive reply to Chuck's email. Now regarding your own email. > On 10 Feb 2018, at 07:38, James Roperwrote: > > > > But that brings me to a problem that I'd like to give as feedback to the > implementers - this API is not Reactive Streams, and so therefore can't take > advantage of Reactive Streams implementations, and more problematic, can't > interop with other Reactive Streams sinks/sources. If I wanted to stream a > WebSocket into a message broker that supports Reactive Streams, I can't. I > would definitely hope that Reactive Streams support could be added to this > API, at a minimum as a wrapper, so that application developers can easily > focus on their business problems, plumbing and transforming messages from one > place to another, rather than having to deal with implementing concurrent > code to pass messages. > I totally understand your concern over the lack of a Reactive Streams interface to this low-level API. All I can say is that it's not that we haven't tried. As usual the devil is in the detail. There were at least a couple of major issues we couldn't find a satisfactory solution to. The first one is how to communicate errors back to the user's code that publishes messages to WebSocket. This issue has been extensively discussed here [1]. The only signal the publisher can receive from a subscriber in the case of a failure is a cancellation signal. After this signal the subscription is considered cancelled. Now, there are different solutions to this problem, but none of the ones we looked at seemed comprehensive. For example, such errors can be propagated by the WebSocket to the user's subscriber receiving messages. In which case WebSocket input and output will become tied and the WebSocket client will seem to require both the publisher and the subscriber to be present at the same time. The second issue is how to communicate completion signals from processing individual messages. That might be handy in the case the implementation or the application decide to recycle buffers they use. With a naive RS interface to WebSocket all the user's publisher can receive from the WebSocket's subscriber is a request for a number of extra messages. Using only this number the publisher cannot deduce which of the previously published messages have been actually sent. This problem also has a number of solutions. One of which would be to use more streams. An extra publisher WebSocket would use to emit outcomes of sending messages and an extra subscriber WebSocket would use to receive signals from the user once a message has been received. To be honest it looks cumbersome. There are also a dozen of smaller issues that have to be resolved before creating a well-defined RS interface to WebSocket. > It may well require wrapping messages in a high level object - text, binary, > ping, pong, etc, to differentiate between the message types. > We went great lengths in supporting different kinds of requirements in this API. One of the such requirements that arose from a number of discussions on this mailing list was not to force unnecessary allocations, garbage creation and copying. To this end we utilised completion stages for send/receive operations [2]. We abandoned types for messages and used a method-naming scheme instead (I believe with RS the equivalent would be to provide a stream per message type). Even WebSocket.Listener was designed such that one instance could (if required) service many WebSocket instances. That's the reason each method in Listener has a WebSocket argument. > > > https://developer.lightbend.com/blog/2018-02-06-reactive-streams-ee4j/index.html > Thanks for the link. That said, I think once we have satisfactory answers to the questions above, we might provide an RS adapter you were talking about to this low-level API. Meanwhile it is possible to built one already, on top of the existing API even though it is not the same as RS, semantically they are very similar. Thanks, -Pavel --- [1] https://github.com/reactive-streams/reactive-streams-jvm/issues/271 [2] At one point we were even considering using callbacks similar to java.nio.channels.CompletionHandler instead of java.util.concurrent.CompletionStage for both sending and receiving messages. All this is for the sake of not creating extra objects when this is considered expensive. Who knows, we might come back to retrofit the API and add this capability later.
Re: websockets
Hi Chuck, (Disclosure: I'm an RS SIG founding member.) On Sat, Feb 10, 2018 at 7:14 PM, Chuck Daviswrote: > Hi James: > > Thanks for your response and the information in your (and other) > blog(s). I haven't had time to learn all the new features of jdk9 yet > so a look at Flow was interesting. > > My first impression is that a time constraint would be better than > destroying asynchrony. The good new is that no asynchrony is destroyed. The first sentence of reactive-streams.org quite literally says: "Reactive Streams is an initiative to provide a standard for *asynchronous* stream processing with non-blocking back pressure." (emphasis mine) > Rather than telling the source to only send > one message and thus effectively make it a synchronous process, tell > the source to only send one message per 100 millis or 1000 millis or > whatever is appropriate back-pressure while my other threads are doing > their thing ?? Not only would that not work—as in solve the problem—and it would lead to abysmal performance. > When the WebSocket.Listener has a complete message > trigger a notification event to registered listeners. And if the > programmer wants to process intermediate results that would not be too > difficult for the listener to track. > > I can understand the potential need for "back-pressure" but I think > conserving the asynchronous nature of WebSocket is a high priority as > well. Indeed, I've built my application on that feature of > WebSockets. > Fortunately there is no tradeoff needed there. :-) > > Thanks again. At least I'm a bit more enlightened about the issue > being addressed. > > Chuck > > > > > > On Fri, Feb 9, 2018 at 11:38 PM, James Roper wrote: > > Hi Chuck, > > > > Presumably this API is similar in intention to the request method used in > > reactive streams (aka java.util.concurrent.Flow), that is, request is the > > means by which backpressure is propagated. One major problem with JDK8 > > WebSockets is there's no way to asynchronously propagate backpressure, > you > > have to accept every message that comes as it comes, you can't tell the > > other end to back off, which means if it's producing messages faster than > > you can consume them, your only two options are to fail fast, or risk > > running out of memory. Reactive Streams solves this by requiring > consumers > > to signal demand for data/messages before they receive any - an > invocation > > of the request method is just saying how many more elements the consumer > is > > currently ready to receive, and can be invoked many times, as the > consumer > > processes messages and is ready to receive more. Generally, for Reactive > > Streams, application developers are not expected to implement or invoke > > these APIs directly, instead, they are expected to use reactive streams > > implementations like Akka Streams, rxJava or Reactor, which efficiently > > manage buffering and keeping the buffer at an appropriate level for the > > application developer, so the application developer can just focus on > their > > business concerns. > > > > But that brings me to a problem that I'd like to give as feedback to the > > implementers - this API is not Reactive Streams, and so therefore can't > take > > advantage of Reactive Streams implementations, and more problematic, > can't > > interop with other Reactive Streams sinks/sources. If I wanted to stream > a > > WebSocket into a message broker that supports Reactive Streams, I can't. > I > > would definitely hope that Reactive Streams support could be added to > this > > API, at a minimum as a wrapper, so that application developers can easily > > focus on their business problems, plumbing and transforming messages from > > one place to another, rather than having to deal with implementing > > concurrent code to pass messages. It may well require wrapping messages > in a > > high level object - text, binary, ping, pong, etc, to differentiate > between > > the message types. > > > > For a broader context of how this would fit into the broader Reactive > > Streams ecosystem, I've published a blog post of what it would look like > if > > Java EE/EE4J were to adopt Reactive Streams everywhere, as it happens > this > > also includes proposals for using Reactive Streams in the JSR 356 > WebSocket > > spec: > > > > https://developer.lightbend.com/blog/2018-02-06-reactive- > streams-ee4j/index.html > > > > Regards, > > > > James > > > > On 9 February 2018 at 19:16, Chuck Davis wrote: > >> > >> I've been using jdk8 websockets to develop my desktop java > >> applications. Now that jdk9 is on my machine I started looking at > >> websockets and I'm not at all sure I like what I see. Can someone > >> familiar with this feature please explain the rationale for what is > >> happening? > >> > >> I'm concerned, at this initial stage, primarily by > >> WebSocket.request(long). This "feature" seems to have at least two >