Gleiphir2769 opened a new issue, #833: URL: https://github.com/apache/pulsar-client-go/issues/833
#### Overview When message is corrupted when it is received, it should be discarded. https://github.com/apache/pulsar-client-go/blob/a09460ed6865700f9e5020e9108297d950a00644/pulsar/consumer_partition.go#L605-L609 `partitionConsumer.discardCorruptedMessage()` causes the `availablePermits` leak. Because `availablePermits` increased after `messageReceived()` without error (after `pc.queueCh <- messages`), discarding will cause the `pc.availablePermits++` has no chance to be excuted. So consumer's `availablePermits` may be exhausted causing `receive()` blocking. https://github.com/apache/pulsar-client-go/blob/a09460ed6865700f9e5020e9108297d950a00644/pulsar/consumer_partition.go#L933 In the Java client, the `availablePermits` will be increased when message has discarded.  #### Steps to reproduce It can be reproduce by following. ``` func TestAvailablePermitsLeak(t *testing.T) { client, err := NewClient(ClientOptions{ URL: serviceURL, }) assert.Nil(t, err) client.Close() topic := fmt.Sprintf("my-topic-test-ap-leak-%v", time.Now().Nanosecond()) // 1. Producer with valid key name producer, err := client.CreateProducer(ProducerOptions{ Topic: topic, Encryption: &ProducerEncryptionInfo{ KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem", "crypto/testdata/pri_key_rsa.pem"), Keys: []string{"client-rsa.pem"}, }, Schema: NewStringSchema(nil), DisableBatching: true, }) assert.Nil(t, err) assert.NotNil(t, producer) subscriptionName := "enc-failure-subcription" totalMessages := 2000 // 2. KeyReader is not set by the consumer // Receive should fail since KeyReader is not setup // because default behaviour of consumer is fail receiving message if error in decryption consumer, err := client.Subscribe(ConsumerOptions{ Topic: topic, SubscriptionName: subscriptionName, }) assert.Nil(t, err) messageFormat := "my-message-%v" for i := 0; i < totalMessages; i++ { _, err := producer.Send(context.Background(), &ProducerMessage{ Value: fmt.Sprintf(messageFormat, i), }) assert.Nil(t, err) } // 3. Discard action on decryption failure. Make the availablePermits leak consumer.Close() consumer, err = client.Subscribe(ConsumerOptions{ Topic: topic, SubscriptionName: subscriptionName, Decryption: &MessageDecryptionInfo{ ConsumerCryptoFailureAction: crypto.ConsumerCryptoFailureActionDiscard, }, Schema: NewStringSchema(nil), }) assert.Nil(t, err) assert.NotNil(t, consumer) // 4. It will receive no message when i > 1000 because the default availablePermits is 1000 for i := 0; i < totalMessages; i++ { ctx3, cancel3 := context.WithTimeout(context.Background(), 3*time.Second) msg, err := consumer.Receive(ctx3) cancel3() assert.NotNil(t, err) assert.Nil(t, msg, "Message received even aftet ConsumerCryptoFailureAction.Discard is set.") } } ``` #### System configuration **Pulsar version**: 2.8.3 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
