michaeljmarshall opened a new pull request #12779:
URL: https://github.com/apache/pulsar/pull/12779
…livery of messages
### Motivation
I discovered a race condition in Pulsar’s Java Client `ProducerImpl` that
can lead to messages persisted out-of-order for a single producer sending to a
non-partitioned topic. This PR removes the race condition by keeping track of
the current `cnx` and introducing a new update to the state while in a lock on
the `ProducerImpl.this`.
### Reproducing the issue
I can consistently reproduce the issue by interrupting an active producer
sending messages to Pulsar. I interrupt the producer by restarting the broker
hosting the producer's topic or by unloading the producer's topic/namespace.
Because of the nature of this race condition, the out of order issue does not
happen every time, but it does happen frequently enough to have noticed it. In
order to increase the probability of the error, my test set up included
producing to 100 topics on a 3 broker pulsar cluster at a total rate of 50k
msgs/sec. I determined that the messages were out of order in two ways. First,
by producing messages from a single thread and putting my own, monotonically
increasing sequence id as the messages payload. Second, by inspecting the
message's sequence id assigned by the pulsar java client. Inspecting the
messages using the reader api revealed messages in sequences like `1,4,2,3,4`.
The 4 in that sequence is duplicated because when the client receives an ack fo
r an unexpected message, it re-sends all pending messages.
### Description of the problem
The problem comes from the way that the pulsar producer changes state when a
connection is closed and when a connection is opened. Here are the state
changes that take place when a connection closes and when one opens:
#### Connection closed:
1. Set `cnx` to `null`, as long as the current `cnx` is the connection being
closed.
2. Set the state to `Connecting`.
3. Schedule a runnable to get a new `cnx`.
4. Once the task from step 3 is run, asynchronously get a `cnx` and then
call `ProducerImpl#connectionOpened`.
#### ProducerImpl#connectionOpened:
1. Set `cnx` to the new connection.
2. Send `Producer` command to the broker to register the new producer.
3. Once the producer is registered, schedule a runnable to redeliver
`pendingMessages`.
4. Once the task from step 3 is run, go to `Ready`, as long as the current
state is `Uninitialized`, `Connecting`, or `RegisteringSchema`.
There is nothing that prevents a connection from being closed while another
connection is currently being established in the `connectionOpened` method.
This is exactly what exposes the race condition fixed by this PR. In the race,
a connection is established and we call `connectionOpened` and successfully
register the producer with the broker. Then, that connection is closed before
step 4 of `connectionOpened` and the state changes from `Connecting` to
`Connecting`. Because step 4 only checks that the current state is
`Uninitialized`, `Connecting`, or `RegisteringSchema`, it updates the state to
`Ready`. When adding some extra logging, I could see that the we changed the
state to `Ready` while `cnx()` returned `null`. At this point, messages
wouldn't yet deliver. When the new connection is established, the
`connectionOpened` is called, and we set the new `cnx`. Since our state is
still `Ready`, we start delivering any new messages received by the client.
These are out of order. We
asynchronously register the producer with the broker and then messages start
to persist.
### Modifications
1. Update where the producer’s `epoch` value is incremented. It was
previously updated before getting a connection. However, this seems prone to
races. By updating it when we have the connection and within a lock on the
producer, we ensure that the connection gets the next highest epoch number.
2. Set the state to `Connecting` in the `connectionOpened` method. This is
important because the connection could have been closed after the check for
`cnx() != null` in the `recoverProcessOpSendMsgFrom` method but before that
method gets to the point of setting state to `Ready`. Since we update the state
within the lock on `ProducerImpl.this`, we won't delivery any messages. There
is still a chance that the broker will have state `Ready` and `cnx() == null`.
3. Use the producer's `epoch` value to ensure the `cnx` reference passed to
`recoverProcessOpSendMsgFrom` is still the current `cnx`. Ensure that `cnx() !=
null`. These checks ensure that it is safe to update state to `Ready`.
### Alternatives
Instead of using the `epoch` value, we could have checked that `cnx() ==
cnx` in the `recoverProcessOpSendMsgFrom`. This check would be valid in all
cases except where the next connection is the same connection. I think this
would happen if a topic were unloaded from a broker and then loaded back on to
the same broker. The epoch value gives a consistent way to know that the `cnx`
is the current `cnx`.
We could have added a new state called `RegisteringProducer`. I investigated
this option, but it seemed more complicated and less elegant than this
solution. I chose the simpler solution here.
### Verifying this change
I tried to implement a unit test that would consistently reproduce this
issue. I was only able to do so when I introduced a 50 millisecond delay in the
scheduling of the runnable within `resendMessages`. Using that method, this PR
prevented the race. I also built a custom client with this PR, and I can
confirm that I observed the log line indicating `Producer epoch mismatch or the
current connection is null.`. This log confirms that the race would have
happened but was avoided.
Also, I provided extra detail in this PR description since I wasn't able to
add a new test to specifically verify this change. I think the change is pretty
straightforward, and I try to add enough context to make the change easy to
understand.
### Does this pull request potentially affect one of the following parts:
*If `yes` was chosen, please highlight the changes*
- Dependencies (does it add or upgrade a dependency): no
- The public API: no
- The schema: no
- The default values of configurations: no
- The wire protocol: no
- The rest endpoints: no
- The admin cli options: no
- Anything that affects deployment: no
### Documentation
- [x] `no-need-doc`
This is an internal change to the client. We should make sure to include
this fix in the release notes, but no documentation changes need to be made.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]