[ 
https://issues.apache.org/jira/browse/KAFKA-15042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17752503#comment-17752503
 ] 

Adrian Preston commented on KAFKA-15042:
----------------------------------------

[~smashingquasar],

Looking at the hex dump of your APIVersions request, I think you are correctly 
encoding the empty tag field as a single 0x00 byte. It looks, however, like the 
compact string encoding (used for the 'client_software_name' and 
'client_software_version' fields) is adding an unexpected 0x01 as the first 
byte.

For comparison, here's the APIVersions request I get from Sarama:

{{00 00 00 21                (Length)}}
{{00 12                      (API key)}}
{{00 03                      (API version)}}
{{00 00 00 00                (Correlation ID)}}
{{00 07 77 65 62 2d 61 70 69 (Client ID)}}
{{00                         (Tagged fields)}}
{{08 77 65 62 2d 61 70 69    (Client software name)}}
{{06 30 2e 30 2e 31          (Client software version)}}
{{00                         (Tagged fields)}}

> Clarify documentation on Tagged fields
> --------------------------------------
>
>                 Key: KAFKA-15042
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15042
>             Project: Kafka
>          Issue Type: Wish
>          Components: docs
>         Environment: Using the Ubuntu/Kafka Docker image for testing purposes.
>            Reporter: Nicolas
>            Assignee: Adrian Preston
>            Priority: Major
>
> Hello,
> I am currently working on an implementation of the Kafka protocol.
> So far, all my code is working as intended through serialising requests and 
> deserialising response as long as I am not using the flex requests system.
> I am now trying to implement the flex requests system but the documentation 
> is scarce on the subject of tagged fields.
> If we take the Request Header v2:
>  
> {code:java}
> Request Header v2 => request_api_key request_api_version correlation_id 
> client_id TAG_BUFFER request_api_key => INT16 request_api_version => INT16 
> correlation_id => INT32 client_id => NULLABLE_STRING{code}
>  
> Here, the BNF seems violated. TAG_BUFFER is not a value in this situation. It 
> appears to be a type. It also does not appear within the detailed description 
> inside the BNF.
> TAG_BUFFER also does not refer to any declared type within the documentation. 
> It seems to indicate tagged fields though.
> Now when looking at tagged fields, the only mention of them within the 
> documentation is:
> {quote}Note that [KIP-482 tagged 
> fields|https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields]
>  can be added to a request without incrementing the version number. This 
> offers an additional way of evolving the message schema without breaking 
> compatibility. Tagged fields do not take up any space when the field is not 
> set. Therefore, if a field is rarely used, it is more efficient to make it a 
> tagged field than to put it in the mandatory schema. However, tagged fields 
> are ignored by recipients that don't know about them, which could pose a 
> challenge if this is not the behavior that the sender wants. In such cases, a 
> version bump may be more appropriate.
> {quote}
> This leads to the KIP-482 that does not clearly and explicitly detail the 
> process of writing and reading those tagged fields.
> I decided to look up existing clients to understand how they handle tagged 
> fields. I notably looked at kafka-js (JavaScript client) and librdkafka (C 
> client) and to my surprise, they have not implemented tagged fields. In fact, 
> they rely on a hack to skip them and ignore them completely.
> I also had a look at the [Java client bundled within Kafka within the Tagged 
> Fields 
> section|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/protocol/types/TaggedFields.java#L64].
> Now I am not a Java developer so I may not understand exactly this code. I 
> read the comment and implemented the logic following Google's Protobuf 
> specifications. The problem is, this leads to a request that outputs a stack 
> trace within Kafka (it would be appreciated to not just dump stack traces and 
> gracefully handle errors by the way).
>  
> As a reference, I tried to send an APIVersions (key: 18) (version: 3) request.
> My request reads as follows when converted to hexadecimal:
>  
> {code:java}
> Request header: 00 12 00 03 00 00 00 00 00 07 77 65 62 2d 61 70 69 00
> Request body: 01 08 77 65 62 2d 61 70 69 01 06 30 2e 30 2e 31 00
> Full request: 00 00 00 23 00 12 00 03 00 00 00 00 00 07 77 65 62 2d 61 70 69 
> 00 01 08 77 65 62 2d 61 70 69 01 06 30 2e 30 2e 31 00
> {code}
>  
> This creates a buffer underflow error within Kafka:
> {code:java}
> [2023-05-31 14:14:31,132] ERROR Exception while processing request from 
> 172.21.0.5:9092-172.21.0.3:59228-21 (kafka.network.Processor)
> org.apache.kafka.common.errors.InvalidRequestException: Error getting request 
> for apiKey: API_VERSIONS, apiVersion: 3, connectionId: 
> 172.21.0.5:9092-172.21.0.3:59228-21, listenerName: ListenerName(PLAINTEXT), 
> principal: User:ANONYMOUS
> Caused by: java.nio.BufferUnderflowException
>         at java.base/java.nio.HeapByteBuffer.get(HeapByteBuffer.java:182)
>         at java.base/java.nio.ByteBuffer.get(ByteBuffer.java:770)
>         at 
> org.apache.kafka.common.protocol.ByteBufferAccessor.readArray(ByteBufferAccessor.java:58)
>         at 
> org.apache.kafka.common.protocol.Readable.readUnknownTaggedField(Readable.java:53)
>         at 
> org.apache.kafka.common.message.ApiVersionsRequestData.read(ApiVersionsRequestData.java:133)
>         at 
> org.apache.kafka.common.message.ApiVersionsRequestData.<init>(ApiVersionsRequestData.java:74)
>         at 
> org.apache.kafka.common.requests.ApiVersionsRequest.parse(ApiVersionsRequest.java:119)
>         at 
> org.apache.kafka.common.requests.AbstractRequest.doParseRequest(AbstractRequest.java:207)
>         at 
> org.apache.kafka.common.requests.AbstractRequest.parseRequest(AbstractRequest.java:165)
>         at 
> org.apache.kafka.common.requests.RequestContext.parseRequest(RequestContext.java:95)
>         at 
> kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:101)
>         at 
> kafka.network.Processor.$anonfun$processCompletedReceives$1(SocketServer.scala:1030)
>         at 
> java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608)
>         at 
> kafka.network.Processor.processCompletedReceives(SocketServer.scala:1008)
>         at kafka.network.Processor.run(SocketServer.scala:893)
>         at java.base/java.lang.Thread.run(Thread.java:829){code}
>  
> I attempted many different approaches but I have to admit that I am making 
> absolutely no progress.
> I am creating this issue for two reasons:
>  * Obviously, I would very much appreciate an explanation on how to write and 
> read tagged fields with a detailed approach. What kind of bytes are expected? 
> What are the values format?
>  * Since mainstream clients aren't implementing this feature, it is wasted on 
> most users. Let's face it, very few people actually implement the binary 
> protocol. It is sad that this feature is not widely available to everyone, 
> especially since it can reduce requests loads. I think improving the 
> documentation to make the tagged fields clearly explained with at least one 
> example would greatly benefit the community.
> Thanks in advance for your answers!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to