hachikuji commented on a change in pull request #10793:
URL: https://github.com/apache/kafka/pull/10793#discussion_r645883390



##########
File path: 
server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java
##########
@@ -69,16 +75,44 @@ public void write(ApiMessageAndVersion data,
     @Override
     public ApiMessageAndVersion read(Readable input,
                                      int size) {
-        short frameVersion = (short) input.readUnsignedVarint();
+        short frameVersion;
+        try {
+            frameVersion = unsignedIntToShort(input.readUnsignedVarint(), 
"frame version");
+        } catch (Exception e) {
+            throw new MetadataParseException(e);
+        }
         if (frameVersion != DEFAULT_FRAME_VERSION) {
-            throw new SerializationException("Could not deserialize metadata 
record due to unknown frame version "
-                                                     + frameVersion + "(only 
frame version " + DEFAULT_FRAME_VERSION + " is supported)");
+            throw new MetadataParseException("Could not deserialize metadata 
record due to unknown frame version "
+                    + frameVersion + "(only frame version " + 
DEFAULT_FRAME_VERSION + " is supported)");
+        }
+        short apiKey;
+        try {
+            apiKey = unsignedIntToShort(input.readUnsignedVarint(), "type");
+        } catch (Exception e) {
+            throw new MetadataParseException(e);
+        }
+        short version;
+        try {
+            version = unsignedIntToShort(input.readUnsignedVarint(), 
"version");

Review comment:
       Maybe we can still improve the little helper. For example:
   ```java
   short readUnsignedIntAsShort(Readable input, String entity) {
     int val;
     try {
       val = input.readUnsignedVarint();
     } catch (Exception e) {
       throw new MetadataParseException("Error while reading " + entity, e);
     }
     if (val > Short.MAX_VALUE) {
       throw new MetadataParseException("Value for " + entity + " was too 
large.");
     }
     return (short) val;
   }
   ```
     

##########
File path: 
metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java
##########
@@ -65,8 +67,152 @@ public void testDeserializeWithUnhandledFrameVersion() {
         buffer.flip();
 
         MetadataRecordSerde serde = new MetadataRecordSerde();
-        assertThrows(SerializationException.class,
+        assertThrows(MetadataParseException.class,
             () -> serde.read(new ByteBufferAccessor(buffer), 16));
     }
 
+    /**
+     * Test attempting to parse an event which has a malformed frame version 
type varint.
+     */
+    @Test
+    public void testParsingMalformedFrameVersionVarint() {
+        MetadataRecordSerde serde = new MetadataRecordSerde();
+        ByteBuffer buffer = ByteBuffer.allocate(64);
+        buffer.clear();
+        buffer.put((byte) 0x80);
+        buffer.put((byte) 0x80);
+        buffer.put((byte) 0x80);
+        buffer.put((byte) 0x80);
+        buffer.put((byte) 0x80);
+        buffer.put((byte) 0x80);
+        buffer.position(0);
+        buffer.limit(64);
+        assertThrows(MetadataParseException.class,
+                () -> serde.read(new ByteBufferAccessor(buffer), 
buffer.remaining()));
+    }
+
+    /**
+     * Test attempting to parse an event which has a malformed message type 
varint.
+     */
+    @Test
+    public void testParsingMalformedMessageTypeVarint() {
+        MetadataRecordSerde serde = new MetadataRecordSerde();
+        ByteBuffer buffer = ByteBuffer.allocate(64);
+        buffer.clear();
+        buffer.put((byte) 0x00);
+        buffer.put((byte) 0x80);
+        buffer.put((byte) 0x80);
+        buffer.put((byte) 0x80);
+        buffer.put((byte) 0x80);
+        buffer.put((byte) 0x80);
+        buffer.put((byte) 0x80);
+        buffer.position(0);
+        buffer.limit(64);
+        assertThrows(MetadataParseException.class,
+                () -> serde.read(new ByteBufferAccessor(buffer), 
buffer.remaining()));
+    }
+
+    /**
+     * Test attempting to parse an event which has a malformed message version 
varint.
+     */
+    @Test
+    public void testParsingMalformedMessageVersionVarint() {
+        MetadataRecordSerde serde = new MetadataRecordSerde();
+        ByteBuffer buffer = ByteBuffer.allocate(64);
+        buffer.clear();
+        buffer.put((byte) 0x00);
+        buffer.put((byte) 0x08);
+        buffer.put((byte) 0x80);
+        buffer.put((byte) 0x80);
+        buffer.put((byte) 0x80);
+        buffer.put((byte) 0x80);
+        buffer.put((byte) 0x80);
+        buffer.position(0);
+        buffer.limit(64);
+        assertThrows(MetadataParseException.class,
+                () -> serde.read(new ByteBufferAccessor(buffer), 
buffer.remaining()));
+    }
+
+    /**
+     * Test attempting to parse an event which has a version > Short.MAX_VALUE
+     */
+    @Test
+    public void testParsingVersionToLarge() {

Review comment:
       nit: ..TooLarge

##########
File path: 
server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java
##########
@@ -69,16 +75,44 @@ public void write(ApiMessageAndVersion data,
     @Override
     public ApiMessageAndVersion read(Readable input,
                                      int size) {
-        short frameVersion = (short) input.readUnsignedVarint();
+        short frameVersion;
+        try {
+            frameVersion = unsignedIntToShort(input.readUnsignedVarint(), 
"frame version");
+        } catch (Exception e) {
+            throw new MetadataParseException(e);
+        }
         if (frameVersion != DEFAULT_FRAME_VERSION) {
-            throw new SerializationException("Could not deserialize metadata 
record due to unknown frame version "
-                                                     + frameVersion + "(only 
frame version " + DEFAULT_FRAME_VERSION + " is supported)");
+            throw new MetadataParseException("Could not deserialize metadata 
record due to unknown frame version "
+                    + frameVersion + "(only frame version " + 
DEFAULT_FRAME_VERSION + " is supported)");
+        }
+        short apiKey;
+        try {
+            apiKey = unsignedIntToShort(input.readUnsignedVarint(), "type");
+        } catch (Exception e) {
+            throw new MetadataParseException(e);
+        }
+        short version;
+        try {
+            version = unsignedIntToShort(input.readUnsignedVarint(), 
"version");
+        } catch (Exception e) {
+            throw new MetadataParseException(e);
         }
 
-        short apiKey = (short) input.readUnsignedVarint();
-        short version = (short) input.readUnsignedVarint();
-        ApiMessage record = apiMessageFor(apiKey);
-        record.read(input, version);
+        ApiMessage record;
+        try {
+            record = apiMessageFor(apiKey);
+        } catch (Exception e) {
+            throw new MetadataParseException(e);
+        }
+        try {
+            record.read(input, version);
+        } catch (Exception e) {
+            throw new MetadataParseException(e);

Review comment:
       nit: I do think a short message would be helpful here, even if it's just 
"Failed to deserialize record with type {type}"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to