kjothen opened a new issue, #18349:
URL: https://github.com/apache/pulsar/issues/18349

   ### Search before asking
   
   - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) 
and found nothing similar.
   
   
   ### Version
   
   OS = macOS Ventura (13.0)
   Pulsar = 2.10.2
   
   
   
   ### Minimal reproduce step
   
   Attempt to consume an encrypted message in a Consumer using either a 
mismatching or missing encryption key, and configuring cryptoFailureAction = 
CONSUME, results in an internal exception+disconnect+retry infinite loop and 
the receive() call never returns.
   
   ### What did you expect to see?
   
   Expected to see the byte buffer of the encrypted message returned from 
receive(), as per the documentation.
   
   [Decrypting encrypted messages at the consumer 
application](https://pulsar.apache.org/docs/cookbooks-encryption/#decrypting-encrypted-messages-at-the-consumer-application)
   
   ### What did you see instead?
   
   Two types of exception:
   
   1. If the consumer's implementation of the CryptoKeyReader:getPrivateKey 
returns null then the consumer throws this type of exception and gets caught in 
an infinite retry loop after auto-reconnecting:
   ```
   20:50:50.606 [pulsar-client-io-79-1] ERROR 
o.a.p.c.impl.crypto.MessageCryptoBc - 
[persistent://tenant-1/namespace-1/topic-1-partition-0] [tenant-2-subscriber] 
Failed to decrypt data key tenant-key-1 to decrypt messages Cannot invoke 
"org.apache.pulsar.client.api.EncryptionKeyInfo.getKey()" because "keyInfo" is 
null
   20:50:50.606 [pulsar-client-io-79-1] WARN  
o.a.pulsar.client.impl.ConsumerImpl - 
[persistent://tenant-1/namespace-1/topic-1-partition-0][tenant-2-subscriber][d7a64][org.apache.pulsar.common.api.proto.MessageIdData@6596540d]
 Decryption failed. Consuming encrypted message since config is set to consume.
   20:50:50.607 [pulsar-client-io-79-1] WARN  o.a.pulsar.client.impl.ClientCnx 
- [localhost/127.0.0.1:50561] Got exception java.lang.IllegalArgumentException: 
Invalid unknonwn tag type: 6
        at 
org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270)
        at 
org.apache.pulsar.common.api.proto.SingleMessageMetadata.parseFrom(SingleMessageMetadata.java:470)
        at 
org.apache.pulsar.common.protocol.Commands.deSerializeSingleMessageInBatch(Commands.java:1719)
        at 
org.apache.pulsar.client.impl.ConsumerImpl.newSingleMessage(ConsumerImpl.java:1127)
        at 
org.apache.pulsar.client.impl.ConsumerImpl.receiveIndividualMessagesFromBatch(ConsumerImpl.java:1520)
        at 
org.apache.pulsar.client.impl.ConsumerImpl.messageReceived(ConsumerImpl.java:1369)
        at 
org.apache.pulsar.client.impl.ClientCnx.handleMessage(ClientCnx.java:459)
        at 
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:187)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
        at 
org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
        at 
org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
        at 
org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at 
org.apache.pulsar.shade.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at 
org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
        at 
org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
        at 
org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
        at 
org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
        at 
org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
        at 
org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at 
org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:1589)
   
   20:50:50.710 [pulsar-client-io-79-1] INFO  o.a.pulsar.client.impl.ClientCnx 
- [id: 0x8de9df33, L:/127.0.0.1:50569 ! R:localhost/127.0.0.1:50561] 
Disconnected
   20:50:50.710 [pulsar-client-io-79-1] INFO  
o.a.p.client.impl.ConnectionHandler - 
[persistent://tenant-1/namespace-1/topic-1-partition-0] [standalone-0-0] Closed 
connection [id: 0x8de9df33, L:/127.0.0.1:50569 ! R:localhost/127.0.0.1:50561] 
-- Will try again in 0.1 s
   20:50:50.711 [pulsar-client-io-79-1] INFO  
o.a.p.client.impl.ConnectionHandler - 
[persistent://tenant-1/namespace-1/topic-1-partition-0] [tenant-2-subscriber] 
Closed connection [id: 0x8de9df33, L:/127.0.0.1:50569 ! 
R:localhost/127.0.0.1:50561] -- Will try again in 0.1 s
   20:50:50.711 [pulsar-client-io-79-1] INFO  
o.a.p.client.impl.ConnectionHandler - 
[persistent://tenant-1/namespace-1/topic-1-partition-0] [tenant-1-subscriber] 
Closed connection [id: 0x8de9df33, L:/127.0.0.1:50569 ! 
R:localhost/127.0.0.1:50561] -- Will try again in 0.1 s
   20:50:50.813 [pulsar-timer-83-1] INFO  o.a.p.client.impl.ConnectionHandler - 
[persistent://tenant-1/namespace-1/topic-1-partition-0] [standalone-0-0] 
Reconnecting after timeout
   20:50:50.813 [pulsar-timer-83-1] INFO  o.a.p.client.impl.ConnectionHandler - 
[persistent://tenant-1/namespace-1/topic-1-partition-0] [tenant-2-subscriber] 
Reconnecting after timeout
   20:50:50.816 [pulsar-client-io-79-1] INFO  o.a.p.client.impl.ConnectionPool 
- [[id: 0x549c5e03, L:/127.0.0.1:50570 - R:localhost/127.0.0.1:50561]] 
Connected to server
   20:50:50.816 [pulsar-client-io-79-1] INFO  o.a.pulsar.client.impl.ClientCnx 
- [id: 0x549c5e03, L:/127.0.0.1:50570 - R:localhost/127.0.0.1:50561] Connected 
through proxy to target broker at localhost:6650
   20:50:50.819 [pulsar-client-io-79-1] INFO  
o.a.pulsar.client.impl.ConsumerImpl - 
[persistent://tenant-1/namespace-1/topic-1-partition-0][tenant-2-subscriber] 
Subscribing to topic on cnx [id: 0x549c5e03, L:/127.0.0.1:50570 - 
R:localhost/127.0.0.1:50561], consumerId 0
   20:50:50.819 [pulsar-client-io-79-1] INFO  
o.a.pulsar.client.impl.ProducerImpl - 
[persistent://tenant-1/namespace-1/topic-1-partition-0] [standalone-0-0] 
Creating producer on cnx [id: 0x549c5e03, L:/127.0.0.1:50570 - 
R:localhost/127.0.0.1:50561]
   20:50:50.832 [pulsar-timer-83-1] INFO  o.a.p.client.impl.ConnectionHandler - 
[persistent://tenant-1/namespace-1/topic-1-partition-0] [tenant-1-subscriber] 
Reconnecting after timeout
   20:50:50.832 [pulsar-client-io-79-1] INFO  
o.a.pulsar.client.impl.ConsumerImpl - 
[persistent://tenant-1/namespace-1/topic-1-partition-0][tenant-2-subscriber] 
Subscribed to topic on localhost/127.0.0.1:50561 -- consumer: 0
   20:50:50.834 [pulsar-client-io-79-1] INFO  
o.a.pulsar.client.impl.ConsumerImpl - 
[persistent://tenant-1/namespace-1/topic-1-partition-0][tenant-1-subscriber] 
Subscribing to topic on cnx [id: 0x549c5e03, L:/127.0.0.1:50570 - 
R:localhost/127.0.0.1:50561], consumerId 1
   20:50:50.839 [pulsar-client-io-79-1] ERROR 
o.a.p.c.impl.crypto.MessageCryptoBc - 
[persistent://tenant-1/namespace-1/topic-1-partition-0] [tenant-2-subscriber] 
Failed to decrypt data key tenant-key-1 to decrypt messages Cannot invoke 
"org.apache.pulsar.client.api.EncryptionKeyInfo.getKey()" because "keyInfo" is 
null
   20:50:50.852 [pulsar-client-io-79-1] WARN  
o.a.pulsar.client.impl.ConsumerImpl - 
[persistent://tenant-1/namespace-1/topic-1-partition-0][tenant-2-subscriber][d7a64][org.apache.pulsar.common.api.proto.MessageIdData@1b2135c9]
 Decryption failed. Consuming encrypted message since config is set to consume.
   20:50:50.853 [pulsar-client-io-79-1] WARN  o.a.pulsar.client.impl.ClientCnx 
- [localhost/127.0.0.1:50561] Got exception java.lang.IllegalArgumentException: 
Invalid unknonwn tag type: 6
        at 
org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270)
        at 
org.apache.pulsar.common.api.proto.SingleMessageMetadata.parseFrom(SingleMessageMetadata.java:470)
        at 
org.apache.pulsar.common.protocol.Commands.deSerializeSingleMessageInBatch(Commands.java:1719)
        at 
org.apache.pulsar.client.impl.ConsumerImpl.newSingleMessage(ConsumerImpl.java:1127)
        at 
org.apache.pulsar.client.impl.ConsumerImpl.receiveIndividualMessagesFromBatch(ConsumerImpl.java:1520)
        at 
org.apache.pulsar.client.impl.ConsumerImpl.messageReceived(ConsumerImpl.java:1369)
        at 
org.apache.pulsar.client.impl.ClientCnx.handleMessage(ClientCnx.java:459)
        at 
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:187)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
   ...
   ```
   
   2. If the consumer's implementation of the CryptoKeyReader:getPrivateKey 
returns a mismatched key (any other key) this type of exception and gets caught 
in an infinite retry loop after auto-reconnecting:
   
   ```
   20:42:38.220 [pulsar-client-io-35-1] INFO  
o.a.pulsar.client.impl.ConsumerImpl - 
[persistent://tenant-1/namespace-1/topic-1-partition-0][tenant-2-subscriber] 
Subscribed to topic on localhost/127.0.0.1:50512 -- consumer: 0
   2022-11-04T20:42:38.224Z Kierans-iMac.local INFO 
[com.repldriven.mono.pulsar.crypto:?] - Trying to read private key: find 
tenant-key-1 in ("tenant-key-1")
   20:42:38.227 [pulsar-client-io-35-1] ERROR 
o.a.p.c.impl.crypto.MessageCryptoBc - 
[persistent://tenant-1/namespace-1/topic-1-partition-0] [tenant-2-subscriber] 
Failed to decrypt data key tenant-key-1 to decrypt messages unable to process 
block
   20:42:38.227 [pulsar-client-io-35-1] WARN  
o.a.pulsar.client.impl.ConsumerImpl - 
[persistent://tenant-1/namespace-1/topic-1-partition-0][tenant-2-subscriber][c585c][org.apache.pulsar.common.api.proto.MessageIdData@31a4eea0]
 Decryption failed. Consuming encrypted message since config is set to consume.
   20:42:38.227 [pulsar-client-io-35-1] WARN  o.a.pulsar.client.impl.ClientCnx 
- [localhost/127.0.0.1:50512] Got exception java.lang.IllegalArgumentException: 
Invalid unknonwn tag type: 4
        at 
org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270)
        at 
org.apache.pulsar.common.api.proto.SingleMessageMetadata.parseFrom(SingleMessageMetadata.java:470)
        at 
org.apache.pulsar.common.protocol.Commands.deSerializeSingleMessageInBatch(Commands.java:1719)
        at 
org.apache.pulsar.client.impl.ConsumerImpl.newSingleMessage(ConsumerImpl.java:1127)
        at 
org.apache.pulsar.client.impl.ConsumerImpl.receiveIndividualMessagesFromBatch(ConsumerImpl.java:1520)
        at 
org.apache.pulsar.client.impl.ConsumerImpl.messageReceived(ConsumerImpl.java:1369)
        at 
org.apache.pulsar.client.impl.ClientCnx.handleMessage(ClientCnx.java:459)
        at 
org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:187)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
        at 
org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at 
org.apache.pulsar.shade.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at 
org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
        at 
org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
        at 
org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
        at 
org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
        at 
org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
        at 
org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at 
org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:1589)
   
   20:42:38.276 [pulsar-client-io-35-1] INFO  o.a.pulsar.client.impl.ClientCnx 
- [id: 0xe7e22a30, L:/127.0.0.1:50521 ! R:localhost/127.0.0.1:50512] 
Disconnected
   20:42:38.291 [pulsar-timer-39-1] INFO  o.a.p.client.impl.ConnectionHandler - 
[persistent://tenant-1/namespace-1/topic-1-partition-0] [tenant-1-subscriber] 
Reconnecting after connection was closed
   20:42:38.291 [pulsar-client-io-35-1] INFO  
o.a.p.client.impl.ConnectionHandler - 
[persistent://tenant-1/namespace-1/topic-1-partition-0] [standalone-0-0] Closed 
connection [id: 0xe7e22a30, L:/127.0.0.1:50521 ! R:localhost/127.0.0.1:50512] 
-- Will try again in 0.1 s
   20:42:38.291 [pulsar-client-io-35-1] INFO  
o.a.p.client.impl.ConnectionHandler - 
[persistent://tenant-1/namespace-1/topic-1-partition-0] [tenant-2-subscriber] 
Closed connection [id: 0xe7e22a30, L:/127.0.0.1:50521 ! 
R:localhost/127.0.0.1:50512] -- Will try again in 0.1 s
   20:42:38.295 [pulsar-client-io-35-1] INFO  o.a.p.client.impl.ConnectionPool 
- [[id: 0x3a71b0d2, L:/127.0.0.1:50522 - R:localhost/127.0.0.1:50512]] 
Connected to server
   20:42:38.295 [pulsar-client-io-35-1] INFO  o.a.pulsar.client.impl.ClientCnx 
- [id: 0x3a71b0d2, L:/127.0.0.1:50522 - R:localhost/127.0.0.1:50512] Connected 
through proxy to target broker at localhost:6650
   20:42:38.309 [pulsar-client-io-35-1] INFO  
o.a.pulsar.client.impl.ConsumerImpl - 
[persistent://tenant-1/namespace-1/topic-1-partition-0][tenant-1-subscriber] 
Subscribing to topic on cnx [id: 0x3a71b0d2, L:/127.0.0.1:50522 - 
R:localhost/127.0.0.1:50512], consumerId 1
   20:42:38.318 [pulsar-client-io-35-1] INFO  
o.a.pulsar.client.impl.ConsumerImpl - 
[persistent://tenant-1/namespace-1/topic-1-partition-0][tenant-1-subscriber] 
Subscribed to topic on localhost/127.0.0.1:50512 -- consumer: 1
   ...
   ```
   
   ### Anything else?
   
   There is no mention of any issue with cryptoFailureAction=CONSUME anywhere 
in the issues at all.
   
   The tests in 
[SimpleProducerConsumerTest.java](https://github.com/apache/pulsar/blob/master/tests/pulsar-client-admin-shade-test/src/test/java/org/apache/pulsar/tests/integration/SimpleProducerConsumerTest.java)
 do not cover the above use case: they assume in the CONSUME failure action 
case that the consumer hasn't supplied a `cryptoKeyReader` implementation at 
all. The above bug describes the more likely scenario that the consumer has a 
`cryptoKeyReader` but doesn't have the right key.
   
   ### Are you willing to submit a PR?
   
   - [ ] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to