[
https://issues.apache.org/jira/browse/KAFKA-19846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Pranav Rathi updated KAFKA-19846:
---------------------------------
Summary: Fix ShareFetch RPC doesn't allow '/' and '+' in memberId field
(was: ShareFetch RPC doesn't allow '/' and '+' in memberId field)
> Fix ShareFetch RPC doesn't allow '/' and '+' in memberId field
> --------------------------------------------------------------
>
> Key: KAFKA-19846
> URL: https://issues.apache.org/jira/browse/KAFKA-19846
> Project: Kafka
> Issue Type: Sub-task
> Reporter: Pranav Rathi
> Assignee: Andrew Schofield
> Priority: Major
>
> While working on QfK implementation in librdkafka, I found that {{memberId}}
> field doesn't accept {{/}} and {{{}+{}}}.
> I get following error on the {*}broker side{*}.
> {code:java}
> [2025-10-23 15:10:55,319] ERROR [KafkaApi-1] Unexpected error handling
> request RequestHeader(apiKey=SHARE_FETCH, apiVersion=1, clientId=rdkafka,
> correlationId=4, headerVersion=2) --
> ShareFetchRequestData(groupId='share-group-1',
> memberId='/FvjN95PRhGaAuA8aQMuTw', shareSessionEpoch=0, maxWaitMs=500,
> minBytes=1, maxBytes=52428800, maxRecords=500, batchSize=500,
> topics=[FetchTopic(topicId=sgd0qCnHRL-t80afMzN9nA,
> partitions=[FetchPartition(partitionIndex=0, partitionMaxBytes=0,
> acknowledgementBatches=[])])], forgottenTopicsData=[]) with context
> RequestContext(header=RequestHeader(apiKey=SHARE_FETCH, apiVersion=1,
> clientId=rdkafka, correlationId=4, headerVersion=2),
> connectionId='127.0.0.1:9092-127.0.0.1:41442-0-4', clientAddress=/127.0.0.1,
> principal=User:ANONYMOUS, listenerName=ListenerName(PLAINTEXT),
> securityProtocol=PLAINTEXT,
> clientInformation=ClientInformation(softwareName=librdkafka,
> softwareVersion=2.12.0-RC1-13-g82dbc3-dirty-devel-O0),
> fromPrivilegedListener=true,
> principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@1648925])
> (kafka.server.KafkaApis) java.lang.IllegalArgumentException: Illegal base64
> character 2f at java.base/java.util.Base64$Decoder.decode0(Base64.java:848)
> ~[?:?] at java.base/java.util.Base64$Decoder.decode(Base64.java:566) ~[?:?]
> at java.base/java.util.Base64$Decoder.decode(Base64.java:589) ~[?:?] at
> org.apache.kafka.common.Uuid.fromString(Uuid.java:136)
> ~[kafka-clients-4.1.0.jar:?] at
> kafka.server.KafkaApis.handleShareFetchRequest(KafkaApis.scala:3157)
> [kafka_2.13-4.1.0.jar:?] at
> kafka.server.KafkaApis.handle(KafkaApis.scala:236) [kafka_2.13-4.1.0.jar:?]
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:158)
> [kafka_2.13-4.1.0.jar:?] at java.base/java.lang.Thread.run(Thread.java:840)
> [?:?] {code}
> *Client* gets {{UNKNOWN_SERVER_ERROR}} as response from the broker.
>
> Digging deeper I found that Uuid *{{fromString()}}* uses Base64 Url decoding
> and similarly *{{toString()}}* uses Base64 Url encoding.
> {code:java}
> public static Uuid fromString(String str) {
> if (str.length() > 24) {
> throw new IllegalArgumentException("Input string with prefix `"
> + str.substring(0, 24) + "` is too long to be decoded as a
> base64 UUID");
> }
> ByteBuffer uuidBytes =
> ByteBuffer.wrap(Base64.getUrlDecoder().decode(str));
> if (uuidBytes.remaining() != 16) {
> throw new IllegalArgumentException("Input string `" + str + "`
> decoded as "
> + uuidBytes.remaining() + " bytes, which is not equal to the
> expected 16 bytes "
> + "of a base64-encoded UUID");
> }
> return new Uuid(uuidBytes.getLong(), uuidBytes.getLong());
> } {code}
> {code:java}
> public String toString() {
> return
> Base64.getUrlEncoder().withoutPadding().encodeToString(getBytesFromUuid());
> } {code}
> {{I feel that Uuid should use normal Base64 encoding or decoding instead of
> Url one. If we use Url encoding and decoding then the Uuid itself changes.
> Generated Uuid and Url Base64 encoding for the Uuid will be different. }}{{}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)