Maybe the spec could be tighter around this, but it's not unreasonable
that there is a
delay in receiving onComplete() notification because of the subscriber
controlled flow control.
Notifying onError() is not subject to flow control; so you might expect
that it would be triggered immediately.
Michael.
On 21/02/2017, 11:32, Pavel Bucek wrote:
SubmissionPublisher#closeExceptionally does trigger
Subscriber#onError, but based on javadoc, I cannot really be sure that
it will be called, since it contains exactly the same wording as
SubmissionPublisher#close
/** * Unless already closed, issues {@link *
Flow.Subscriber#onError(Throwable) onError} signals to current *
subscribers with the given error, and disallows subsequent * attempts
to publish. Future subscribers also receive the given * error. Upon
return, this method does <em>NOT</em>guarantee * that all subscribers
have yet completed. * * @param error the {@code onError} argument sent
to subscribers * @throws NullPointerException if error is null */
So, Pavel, if that is not a bug, how can the SubmissionPublisher be
closed in a way that subscribers are notified?
Thanks for the link to the other mailing list - do I understand it
correctly that I should move this thread there?
Thanks and regards,
Pavel
On 21/02/2017 12:15, Pavel Rappo wrote:
I believe, the most appropriate place for concurrency-related
questions is
http://altair.cs.oswego.edu/mailman/listinfo/concurrency-interest
As for the question itself. I don't think this behaviour is a bug.
SubmissionPublisher.close() seems to be a graceful way of shutting
down (in
contrast with SubmissionPublisher.closeExceptionally()), akin to
putting a EOF
on an input stream.
My reading of the javadoc is that after SubmissionPublisher.close has
been
invoked, the publisher will no longer accept any attempts to publish
items and
will call Subscriber.onClose() *eventually*.
On 21 Feb 2017, at 09:24, Pavel Bucek <pavel.bu...@oracle.com> wrote:
there is a formatting issue in the code snippet, publisher.close()
should be on the new line:
{
SubmissionPublisher<String> publisher =new SubmissionPublisher<>();
publisher.subscribe(new Flow.Subscriber<String>() {
@Override public void onSubscribe(Flow.Subscription
subscription) { }
@Override public void onNext(String item) { }
@Override public void onError(Throwable throwable) {
System.out.println("onError()");
}
@Override public void onComplete() {
System.out.println("onComplete()");
}
});
publisher.submit("item");// if this is commented out,
#onComplete is invoked.
publisher.close();
}
On 21/02/2017 10:16, Pavel Bucek wrote:
Hi all,
firstly - please let me know if this is is a wrong place to send
this; I wasn't able to find list specific to concurrency.
Consider following example:
{
SubmissionPublisher<String> publisher =new
SubmissionPublisher<>();
publisher.subscribe(new Flow.Subscriber<String>() {
@Override public void onSubscribe(Flow.Subscription
subscription) { }
@Override public void onNext(String item) { }
@Override public void onError(Throwable throwable) {
System.out.println("onError()");
}
@Override public void onComplete() {
System.out.println("onComplete()");
}
});
publisher.submit("item");// if this is commented out,
#onComplete is invoked. publisher.close();
}
I'd expect that Subscriber#onComplete is invoked after calling
publisher.close(), but it is not happening. Curiously, when I
comment out 'publisher.submit("item")', Subscriber#onComplete is
indeed invoked.
SubmissionPublisher#close() javadoc says:
/** * Unless already closed, issues {@link *
Flow.Subscriber#onComplete() onComplete} signals to current *
subscribers, and disallows subsequent attempts to publish. * Upon
return, this method does <em>NOT</em>guarantee that all *
subscribers have yet completed. */
So it seems like it will be invoked in different thread or
something like that, but it is not invoked ever (or more precisely
- not during 10 second after the publisher is closed. There is
nothing else running on that particular jvm instance).
Also, publisher#isClosed() returns true and
publisher#getNumberOfSubscribers() returns 0.
I'm using Java(TM) SE Runtime Environment (build
9-ea+157-jigsaw-nightly-h6115-20170219)
What am I doing wrong?
Thanks and regards, Pavel