He-Pin commented on code in PR #1841:
URL: https://github.com/apache/pekko/pull/1841#discussion_r2105778819
##########
stream/src/main/scala/org/apache/pekko/stream/scaladsl/Hub.scala:
##########
@@ -854,7 +861,12 @@ private[pekko] class
BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
}
override def postStop(): Unit = {
- if (hubCallback ne null)
+ // If `postStop` is called before the consumer has processed the
`RegistrationPending`'s `Initialize` event,
+ // then the `Initialize` message will fail with a
`StreamDetachedException`,
+ // upon which the `RegistrationPending` logic itself unregisters
this consumer.
+ // In particular, this client must not send the `Unregister` event
itself because the values in
+ // `previousPublishedOffset` and `offset` are wrong.
+ if ((hubCallback ne null) && offsetInitialized)
Review Comment:
I see, which means there will be no leak in such scenario .
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]