hachikuji commented on a change in pull request #10793: URL: https://github.com/apache/kafka/pull/10793#discussion_r645883390
########## File path: server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java ########## @@ -69,16 +75,44 @@ public void write(ApiMessageAndVersion data, @Override public ApiMessageAndVersion read(Readable input, int size) { - short frameVersion = (short) input.readUnsignedVarint(); + short frameVersion; + try { + frameVersion = unsignedIntToShort(input.readUnsignedVarint(), "frame version"); + } catch (Exception e) { + throw new MetadataParseException(e); + } if (frameVersion != DEFAULT_FRAME_VERSION) { - throw new SerializationException("Could not deserialize metadata record due to unknown frame version " - + frameVersion + "(only frame version " + DEFAULT_FRAME_VERSION + " is supported)"); + throw new MetadataParseException("Could not deserialize metadata record due to unknown frame version " + + frameVersion + "(only frame version " + DEFAULT_FRAME_VERSION + " is supported)"); + } + short apiKey; + try { + apiKey = unsignedIntToShort(input.readUnsignedVarint(), "type"); + } catch (Exception e) { + throw new MetadataParseException(e); + } + short version; + try { + version = unsignedIntToShort(input.readUnsignedVarint(), "version"); Review comment: Maybe we can still improve the little helper. For example: ```java short readUnsignedIntAsShort(Readable input, String entity) { int val; try { val = input.readUnsignedVarint(); } catch (Exception e) { throw new MetadataParseException("Error while reading " + entity, e); } if (val > Short.MAX_VALUE) { throw new MetadataParseException("Value for " + entity + " was too large."); } return (short) val; } ``` ########## File path: metadata/src/test/java/org/apache/kafka/metadata/MetadataRecordSerdeTest.java ########## @@ -65,8 +67,152 @@ public void testDeserializeWithUnhandledFrameVersion() { buffer.flip(); MetadataRecordSerde serde = new MetadataRecordSerde(); - assertThrows(SerializationException.class, + assertThrows(MetadataParseException.class, () -> serde.read(new ByteBufferAccessor(buffer), 16)); } + /** + * Test attempting to parse an event which has a malformed frame version type varint. + */ + @Test + public void testParsingMalformedFrameVersionVarint() { + MetadataRecordSerde serde = new MetadataRecordSerde(); + ByteBuffer buffer = ByteBuffer.allocate(64); + buffer.clear(); + buffer.put((byte) 0x80); + buffer.put((byte) 0x80); + buffer.put((byte) 0x80); + buffer.put((byte) 0x80); + buffer.put((byte) 0x80); + buffer.put((byte) 0x80); + buffer.position(0); + buffer.limit(64); + assertThrows(MetadataParseException.class, + () -> serde.read(new ByteBufferAccessor(buffer), buffer.remaining())); + } + + /** + * Test attempting to parse an event which has a malformed message type varint. + */ + @Test + public void testParsingMalformedMessageTypeVarint() { + MetadataRecordSerde serde = new MetadataRecordSerde(); + ByteBuffer buffer = ByteBuffer.allocate(64); + buffer.clear(); + buffer.put((byte) 0x00); + buffer.put((byte) 0x80); + buffer.put((byte) 0x80); + buffer.put((byte) 0x80); + buffer.put((byte) 0x80); + buffer.put((byte) 0x80); + buffer.put((byte) 0x80); + buffer.position(0); + buffer.limit(64); + assertThrows(MetadataParseException.class, + () -> serde.read(new ByteBufferAccessor(buffer), buffer.remaining())); + } + + /** + * Test attempting to parse an event which has a malformed message version varint. + */ + @Test + public void testParsingMalformedMessageVersionVarint() { + MetadataRecordSerde serde = new MetadataRecordSerde(); + ByteBuffer buffer = ByteBuffer.allocate(64); + buffer.clear(); + buffer.put((byte) 0x00); + buffer.put((byte) 0x08); + buffer.put((byte) 0x80); + buffer.put((byte) 0x80); + buffer.put((byte) 0x80); + buffer.put((byte) 0x80); + buffer.put((byte) 0x80); + buffer.position(0); + buffer.limit(64); + assertThrows(MetadataParseException.class, + () -> serde.read(new ByteBufferAccessor(buffer), buffer.remaining())); + } + + /** + * Test attempting to parse an event which has a version > Short.MAX_VALUE + */ + @Test + public void testParsingVersionToLarge() { Review comment: nit: ..TooLarge ########## File path: server-common/src/main/java/org/apache/kafka/server/common/serialization/AbstractApiMessageSerde.java ########## @@ -69,16 +75,44 @@ public void write(ApiMessageAndVersion data, @Override public ApiMessageAndVersion read(Readable input, int size) { - short frameVersion = (short) input.readUnsignedVarint(); + short frameVersion; + try { + frameVersion = unsignedIntToShort(input.readUnsignedVarint(), "frame version"); + } catch (Exception e) { + throw new MetadataParseException(e); + } if (frameVersion != DEFAULT_FRAME_VERSION) { - throw new SerializationException("Could not deserialize metadata record due to unknown frame version " - + frameVersion + "(only frame version " + DEFAULT_FRAME_VERSION + " is supported)"); + throw new MetadataParseException("Could not deserialize metadata record due to unknown frame version " + + frameVersion + "(only frame version " + DEFAULT_FRAME_VERSION + " is supported)"); + } + short apiKey; + try { + apiKey = unsignedIntToShort(input.readUnsignedVarint(), "type"); + } catch (Exception e) { + throw new MetadataParseException(e); + } + short version; + try { + version = unsignedIntToShort(input.readUnsignedVarint(), "version"); + } catch (Exception e) { + throw new MetadataParseException(e); } - short apiKey = (short) input.readUnsignedVarint(); - short version = (short) input.readUnsignedVarint(); - ApiMessage record = apiMessageFor(apiKey); - record.read(input, version); + ApiMessage record; + try { + record = apiMessageFor(apiKey); + } catch (Exception e) { + throw new MetadataParseException(e); + } + try { + record.read(input, version); + } catch (Exception e) { + throw new MetadataParseException(e); Review comment: nit: I do think a short message would be helpful here, even if it's just "Failed to deserialize record with type {type}" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org