michaeljmarshall opened a new pull request #12779:
URL: https://github.com/apache/pulsar/pull/12779


   …livery of messages
   
   ### Motivation
   
   I discovered a race condition in Pulsar’s Java Client `ProducerImpl` that 
can lead to messages persisted out-of-order for a single producer sending to a 
non-partitioned topic. This PR removes the race condition by keeping track of 
the current `cnx` and introducing a new update to the state while in a lock on 
the `ProducerImpl.this`.
   
   ### Reproducing the issue
   I can consistently reproduce the issue by interrupting an active producer 
sending messages to Pulsar. I interrupt the producer by restarting the broker 
hosting the producer's topic or by unloading the producer's topic/namespace. 
Because of the nature of this race condition, the out of order issue does not 
happen every time, but it does happen frequently enough to have noticed it. In 
order to increase the probability of the error, my test set up included 
producing to 100 topics on a 3 broker pulsar cluster at a total rate of 50k 
msgs/sec. I determined that the messages were out of order in two ways. First, 
by producing messages from a single thread and putting my own, monotonically 
increasing sequence id as the messages payload. Second, by inspecting the 
message's sequence id assigned by the pulsar java client. Inspecting the 
messages using the reader api revealed messages in sequences like `1,4,2,3,4`. 
The 4 in that sequence is duplicated because when the client receives an ack fo
 r an unexpected message, it re-sends all pending messages.
   
   ### Description of the problem
   The problem comes from the way that the pulsar producer changes state when a 
connection is closed and when a connection is opened. Here are the state 
changes that take place when a connection closes and when one opens:
   
   #### Connection closed:
   1. Set `cnx` to `null`, as long as the current `cnx` is the connection being 
closed.
   2. Set the state to `Connecting`.
   3. Schedule a runnable to get a new `cnx`.
   4. Once the task from step 3 is run, asynchronously get a `cnx` and then 
call `ProducerImpl#connectionOpened`.
   
   #### ProducerImpl#connectionOpened:
   1. Set `cnx` to the new connection.
   2. Send `Producer` command to the broker to register the new producer.
   3. Once the producer is registered, schedule a runnable to redeliver 
`pendingMessages`.
   4. Once the task from step 3 is run, go to `Ready`, as long as the current 
state is `Uninitialized`, `Connecting`, or `RegisteringSchema`.
   
   There is nothing that prevents a connection from being closed while another 
connection is currently being established in the `connectionOpened` method. 
This is exactly what exposes the race condition fixed by this PR. In the race, 
a connection is established and we call `connectionOpened` and successfully 
register the producer with the broker. Then, that connection is closed before 
step 4 of `connectionOpened` and the state changes from `Connecting` to 
`Connecting`. Because step 4 only checks that the current state is 
`Uninitialized`, `Connecting`, or `RegisteringSchema`, it updates the state to 
`Ready`. When adding some extra logging, I could see that the we changed the 
state to `Ready` while `cnx()` returned `null`. At this point, messages 
wouldn't yet deliver. When the new connection is established, the 
`connectionOpened` is called, and we set the new `cnx`. Since our state is 
still `Ready`, we start delivering any new messages received by the client. 
These are out of order. We
  asynchronously register the producer with the broker and then messages start 
to persist.
   
   ### Modifications
   
   1. Update where the producer’s `epoch` value is incremented. It was 
previously updated before getting a connection. However, this seems prone to 
races. By updating it when we have the connection and within a lock on the 
producer, we ensure that the connection gets the next highest epoch number.
   2. Set the state to `Connecting` in the `connectionOpened` method. This is 
important because the connection could have been closed after the check for 
`cnx() != null` in the `recoverProcessOpSendMsgFrom` method but before that 
method gets to the point of setting state to `Ready`. Since we update the state 
within the lock on `ProducerImpl.this`, we won't delivery any messages. There 
is still a chance that the broker will have state `Ready` and `cnx() == null`.
   3. Use the producer's `epoch` value to ensure the `cnx` reference passed to 
`recoverProcessOpSendMsgFrom` is still the current `cnx`. Ensure that `cnx() != 
null`. These checks ensure that it is safe to update state to `Ready`.
   
   ### Alternatives
   Instead of using the `epoch` value, we could have checked that `cnx() == 
cnx` in the `recoverProcessOpSendMsgFrom`. This check would be valid in all 
cases except where the next connection is the same connection. I think this 
would happen if a topic were unloaded from a broker and then loaded back on to 
the same broker. The epoch value gives a consistent way to know that the `cnx` 
is the current `cnx`.
   
   We could have added a new state called `RegisteringProducer`. I investigated 
this option, but it seemed more complicated and less elegant than this 
solution. I chose the simpler solution here.
   
   ### Verifying this change
   
   I tried to implement a unit test that would consistently reproduce this 
issue. I was only able to do so when I introduced a 50 millisecond delay in the 
scheduling of the runnable within `resendMessages`. Using that method, this PR 
prevented the race. I also built a custom client with this PR, and I can 
confirm that I observed the log line indicating `Producer epoch mismatch or the 
current connection is null.`. This log confirms that the race would have 
happened but was avoided.
   
   Also, I provided extra detail in this PR description since I wasn't able to 
add a new test to specifically verify this change. I think the change is pretty 
straightforward, and I try to add enough context to make the change easy to 
understand.
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API: no
     - The schema: no
     - The default values of configurations: no
     - The wire protocol: no
     - The rest endpoints: no
     - The admin cli options: no
     - Anything that affects deployment: no
   
   ### Documentation
   
   - [x] `no-need-doc` 
     
     This is an internal change to the client. We should make sure to include 
this fix in the release notes, but no documentation changes need to be made.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to