Gleiphir2769 opened a new issue, #833:
URL: https://github.com/apache/pulsar-client-go/issues/833

   #### Overview
   
   When message is corrupted when it is received, it should be discarded. 
   
   
https://github.com/apache/pulsar-client-go/blob/a09460ed6865700f9e5020e9108297d950a00644/pulsar/consumer_partition.go#L605-L609
   
   `partitionConsumer.discardCorruptedMessage()` causes the `availablePermits` 
leak. Because `availablePermits` increased after `messageReceived()` without 
error (after `pc.queueCh <- messages`), discarding will cause the 
`pc.availablePermits++` has no chance to be excuted. So consumer's 
`availablePermits` may be exhausted causing `receive()` blocking.
    
   
https://github.com/apache/pulsar-client-go/blob/a09460ed6865700f9e5020e9108297d950a00644/pulsar/consumer_partition.go#L933
   
   In the Java client, the `availablePermits` will be increased when message 
has discarded.
   
![image](https://user-images.githubusercontent.com/50270213/186129734-82d0a35a-5107-400b-af34-4a2de1d340c1.png)
   
   
   #### Steps to reproduce
   
   It can be reproduce by following.
   ```
   func TestAvailablePermitsLeak(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: serviceURL,
        })
        assert.Nil(t, err)
        client.Close()
   
        topic := fmt.Sprintf("my-topic-test-ap-leak-%v", 
time.Now().Nanosecond())
   
        // 1. Producer with valid key name
        producer, err := client.CreateProducer(ProducerOptions{
                Topic: topic,
                Encryption: &ProducerEncryptionInfo{
                        KeyReader: 
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
                                "crypto/testdata/pri_key_rsa.pem"),
                        Keys: []string{"client-rsa.pem"},
                },
                Schema:          NewStringSchema(nil),
                DisableBatching: true,
        })
        assert.Nil(t, err)
        assert.NotNil(t, producer)
   
        subscriptionName := "enc-failure-subcription"
        totalMessages := 2000
   
        // 2. KeyReader is not set by the consumer
        // Receive should fail since KeyReader is not setup
        // because default behaviour of consumer is fail receiving message if 
error in decryption
        consumer, err := client.Subscribe(ConsumerOptions{
                Topic:            topic,
                SubscriptionName: subscriptionName,
        })
        assert.Nil(t, err)
   
        messageFormat := "my-message-%v"
        for i := 0; i < totalMessages; i++ {
                _, err := producer.Send(context.Background(), &ProducerMessage{
                        Value: fmt.Sprintf(messageFormat, i),
                })
                assert.Nil(t, err)
        }
   
        // 3. Discard action on decryption failure. Make the availablePermits 
leak
        consumer.Close()
   
        consumer, err = client.Subscribe(ConsumerOptions{
                Topic:            topic,
                SubscriptionName: subscriptionName,
                Decryption: &MessageDecryptionInfo{
                        ConsumerCryptoFailureAction: 
crypto.ConsumerCryptoFailureActionDiscard,
                },
                Schema: NewStringSchema(nil),
        })
        assert.Nil(t, err)
        assert.NotNil(t, consumer)
        
        // 4. It will receive no message when i > 1000 because the default 
availablePermits is 1000
        for i := 0; i < totalMessages; i++ {
                ctx3, cancel3 := context.WithTimeout(context.Background(), 
3*time.Second)
                msg, err := consumer.Receive(ctx3)
                cancel3()
                assert.NotNil(t, err)
                assert.Nil(t, msg, "Message received even aftet 
ConsumerCryptoFailureAction.Discard is set.")
        }
   }
   ```
   
   #### System configuration
   **Pulsar version**: 2.8.3
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to