[ 
https://issues.apache.org/jira/browse/KAFKA-19846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pranav Rathi updated KAFKA-19846:
---------------------------------
    Description: 
While working on QfK implementation in librdkafka, I found that {{memberId}} 
field doesn't accept {{/}} and {{{}+{}}}.

I get following error on the {*}broker side{*}.
{code:java}
[2025-10-23 15:10:55,319] ERROR [KafkaApi-1] Unexpected error handling request 
RequestHeader(apiKey=SHARE_FETCH, apiVersion=1, clientId=rdkafka, 
correlationId=4, headerVersion=2) -- 
ShareFetchRequestData(groupId='share-group-1', 
memberId='/FvjN95PRhGaAuA8aQMuTw', shareSessionEpoch=0, maxWaitMs=500, 
minBytes=1, maxBytes=52428800, maxRecords=500, batchSize=500, 
topics=[FetchTopic(topicId=sgd0qCnHRL-t80afMzN9nA, 
partitions=[FetchPartition(partitionIndex=0, partitionMaxBytes=0, 
acknowledgementBatches=[])])], forgottenTopicsData=[]) with context 
RequestContext(header=RequestHeader(apiKey=SHARE_FETCH, apiVersion=1, 
clientId=rdkafka, correlationId=4, headerVersion=2), 
connectionId='127.0.0.1:9092-127.0.0.1:41442-0-4', clientAddress=/127.0.0.1, 
principal=User:ANONYMOUS, listenerName=ListenerName(PLAINTEXT), 
securityProtocol=PLAINTEXT, 
clientInformation=ClientInformation(softwareName=librdkafka, 
softwareVersion=2.12.0-RC1-13-g82dbc3-dirty-devel-O0), 
fromPrivilegedListener=true, 
principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@1648925])
 (kafka.server.KafkaApis) java.lang.IllegalArgumentException: Illegal base64 
character 2f at java.base/java.util.Base64$Decoder.decode0(Base64.java:848) 
~[?:?] at java.base/java.util.Base64$Decoder.decode(Base64.java:566) ~[?:?] at 
java.base/java.util.Base64$Decoder.decode(Base64.java:589) ~[?:?] at 
org.apache.kafka.common.Uuid.fromString(Uuid.java:136) 
~[kafka-clients-4.1.0.jar:?] at 
kafka.server.KafkaApis.handleShareFetchRequest(KafkaApis.scala:3157) 
[kafka_2.13-4.1.0.jar:?] at kafka.server.KafkaApis.handle(KafkaApis.scala:236) 
[kafka_2.13-4.1.0.jar:?] at 
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:158) 
[kafka_2.13-4.1.0.jar:?] at java.base/java.lang.Thread.run(Thread.java:840) 
[?:?] {code}
*Client* gets {{UNKNOWN_SERVER_ERROR}} as response from the broker.

 

Digging deeper I found that Uuid *{{fromString()}}* uses Base64 Url decoding 
and similarly *{{toString()}}* uses Base64 Url encoding.
{code:java}
public static Uuid fromString(String str) {
        if (str.length() > 24) {
            throw new IllegalArgumentException("Input string with prefix `"
                + str.substring(0, 24) + "` is too long to be decoded as a 
base64 UUID");
        }

        ByteBuffer uuidBytes = 
ByteBuffer.wrap(Base64.getUrlDecoder().decode(str));
        if (uuidBytes.remaining() != 16) {
            throw new IllegalArgumentException("Input string `" + str + "` 
decoded as "
                + uuidBytes.remaining() + " bytes, which is not equal to the 
expected 16 bytes "
                + "of a base64-encoded UUID");
        }

        return new Uuid(uuidBytes.getLong(), uuidBytes.getLong());
} {code}
{code:java}
public String toString() {
        return 
Base64.getUrlEncoder().withoutPadding().encodeToString(getBytesFromUuid());
} {code}
{{I feel that Uuid should use normal Base64 encoding or decoding instead of Url 
one. If we use Url encoding and decoding then the Uuid itself changes. 
Generated Uuid and Url Base64 encoding for the Uuid will be different. }}{{}}

  was:
While working on QfK implementation in librdkafka, I found that {{memberId}} 
field doesn't accept {{/}} and {{{}+{}}}.

I get following error on the {*}broker side{*}.

 
{code:java}
[2025-10-23 15:10:55,319] ERROR [KafkaApi-1] Unexpected error handling request 
RequestHeader(apiKey=SHARE_FETCH, apiVersion=1, clientId=rdkafka, 
correlationId=4, headerVersion=2) -- 
ShareFetchRequestData(groupId='share-group-1', 
memberId='/FvjN95PRhGaAuA8aQMuTw', shareSessionEpoch=0, maxWaitMs=500, 
minBytes=1, maxBytes=52428800, maxRecords=500, batchSize=500, 
topics=[FetchTopic(topicId=sgd0qCnHRL-t80afMzN9nA, 
partitions=[FetchPartition(partitionIndex=0, partitionMaxBytes=0, 
acknowledgementBatches=[])])], forgottenTopicsData=[]) with context 
RequestContext(header=RequestHeader(apiKey=SHARE_FETCH, apiVersion=1, 
clientId=rdkafka, correlationId=4, headerVersion=2), 
connectionId='127.0.0.1:9092-127.0.0.1:41442-0-4', clientAddress=/127.0.0.1, 
principal=User:ANONYMOUS, listenerName=ListenerName(PLAINTEXT), 
securityProtocol=PLAINTEXT, 
clientInformation=ClientInformation(softwareName=librdkafka, 
softwareVersion=2.12.0-RC1-13-g82dbc3-dirty-devel-O0), 
fromPrivilegedListener=true, 
principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@1648925])
 (kafka.server.KafkaApis) java.lang.IllegalArgumentException: Illegal base64 
character 2f at java.base/java.util.Base64$Decoder.decode0(Base64.java:848) 
~[?:?] at java.base/java.util.Base64$Decoder.decode(Base64.java:566) ~[?:?] at 
java.base/java.util.Base64$Decoder.decode(Base64.java:589) ~[?:?] at 
org.apache.kafka.common.Uuid.fromString(Uuid.java:136) 
~[kafka-clients-4.1.0.jar:?] at 
kafka.server.KafkaApis.handleShareFetchRequest(KafkaApis.scala:3157) 
[kafka_2.13-4.1.0.jar:?] at kafka.server.KafkaApis.handle(KafkaApis.scala:236) 
[kafka_2.13-4.1.0.jar:?] at 
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:158) 
[kafka_2.13-4.1.0.jar:?] at java.base/java.lang.Thread.run(Thread.java:840) 
[?:?] {code}
 

*Client* gets {{UNKNOWN_SERVER_ERROR}} as response from the broker.

Digging deeper I found that Uuid {{fromString()}} uses Base64 Url decoding and 
similarly {{toString()}} uses Base64 Url encoding.
{code:java}
public static Uuid fromString(String str) {
        if (str.length() > 24) {
            throw new IllegalArgumentException("Input string with prefix `"
                + str.substring(0, 24) + "` is too long to be decoded as a 
base64 UUID");
        }

        ByteBuffer uuidBytes = 
ByteBuffer.wrap(Base64.getUrlDecoder().decode(str));
        if (uuidBytes.remaining() != 16) {
            throw new IllegalArgumentException("Input string `" + str + "` 
decoded as "
                + uuidBytes.remaining() + " bytes, which is not equal to the 
expected 16 bytes "
                + "of a base64-encoded UUID");
        }

        return new Uuid(uuidBytes.getLong(), uuidBytes.getLong());
} {code}
{code:java}
public String toString() {
        return 
Base64.getUrlEncoder().withoutPadding().encodeToString(getBytesFromUuid());
} {code}
{{I feel that Uuid should use normal Base64 encoding or decoding instead of Url 
one. If we use Url encoding and decoding then the Uuid itself changes. 
Generated Uuid and Url Base64 encoding for the Uuid will be different. }}{{}}


> ShareFetch RPC doesn't allow '/' and '+' in memberId field
> ----------------------------------------------------------
>
>                 Key: KAFKA-19846
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19846
>             Project: Kafka
>          Issue Type: Sub-task
>            Reporter: Pranav Rathi
>            Assignee: Andrew Schofield
>            Priority: Major
>
> While working on QfK implementation in librdkafka, I found that {{memberId}} 
> field doesn't accept {{/}} and {{{}+{}}}.
> I get following error on the {*}broker side{*}.
> {code:java}
> [2025-10-23 15:10:55,319] ERROR [KafkaApi-1] Unexpected error handling 
> request RequestHeader(apiKey=SHARE_FETCH, apiVersion=1, clientId=rdkafka, 
> correlationId=4, headerVersion=2) -- 
> ShareFetchRequestData(groupId='share-group-1', 
> memberId='/FvjN95PRhGaAuA8aQMuTw', shareSessionEpoch=0, maxWaitMs=500, 
> minBytes=1, maxBytes=52428800, maxRecords=500, batchSize=500, 
> topics=[FetchTopic(topicId=sgd0qCnHRL-t80afMzN9nA, 
> partitions=[FetchPartition(partitionIndex=0, partitionMaxBytes=0, 
> acknowledgementBatches=[])])], forgottenTopicsData=[]) with context 
> RequestContext(header=RequestHeader(apiKey=SHARE_FETCH, apiVersion=1, 
> clientId=rdkafka, correlationId=4, headerVersion=2), 
> connectionId='127.0.0.1:9092-127.0.0.1:41442-0-4', clientAddress=/127.0.0.1, 
> principal=User:ANONYMOUS, listenerName=ListenerName(PLAINTEXT), 
> securityProtocol=PLAINTEXT, 
> clientInformation=ClientInformation(softwareName=librdkafka, 
> softwareVersion=2.12.0-RC1-13-g82dbc3-dirty-devel-O0), 
> fromPrivilegedListener=true, 
> principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@1648925])
>  (kafka.server.KafkaApis) java.lang.IllegalArgumentException: Illegal base64 
> character 2f at java.base/java.util.Base64$Decoder.decode0(Base64.java:848) 
> ~[?:?] at java.base/java.util.Base64$Decoder.decode(Base64.java:566) ~[?:?] 
> at java.base/java.util.Base64$Decoder.decode(Base64.java:589) ~[?:?] at 
> org.apache.kafka.common.Uuid.fromString(Uuid.java:136) 
> ~[kafka-clients-4.1.0.jar:?] at 
> kafka.server.KafkaApis.handleShareFetchRequest(KafkaApis.scala:3157) 
> [kafka_2.13-4.1.0.jar:?] at 
> kafka.server.KafkaApis.handle(KafkaApis.scala:236) [kafka_2.13-4.1.0.jar:?] 
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:158) 
> [kafka_2.13-4.1.0.jar:?] at java.base/java.lang.Thread.run(Thread.java:840) 
> [?:?] {code}
> *Client* gets {{UNKNOWN_SERVER_ERROR}} as response from the broker.
>  
> Digging deeper I found that Uuid *{{fromString()}}* uses Base64 Url decoding 
> and similarly *{{toString()}}* uses Base64 Url encoding.
> {code:java}
> public static Uuid fromString(String str) {
>         if (str.length() > 24) {
>             throw new IllegalArgumentException("Input string with prefix `"
>                 + str.substring(0, 24) + "` is too long to be decoded as a 
> base64 UUID");
>         }
>         ByteBuffer uuidBytes = 
> ByteBuffer.wrap(Base64.getUrlDecoder().decode(str));
>         if (uuidBytes.remaining() != 16) {
>             throw new IllegalArgumentException("Input string `" + str + "` 
> decoded as "
>                 + uuidBytes.remaining() + " bytes, which is not equal to the 
> expected 16 bytes "
>                 + "of a base64-encoded UUID");
>         }
>         return new Uuid(uuidBytes.getLong(), uuidBytes.getLong());
> } {code}
> {code:java}
> public String toString() {
>         return 
> Base64.getUrlEncoder().withoutPadding().encodeToString(getBytesFromUuid());
> } {code}
> {{I feel that Uuid should use normal Base64 encoding or decoding instead of 
> Url one. If we use Url encoding and decoding then the Uuid itself changes. 
> Generated Uuid and Url Base64 encoding for the Uuid will be different. }}{{}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to