worryg0d commented on code in PR #1773: URL: https://github.com/apache/cassandra-gocql-driver/pull/1773#discussion_r1766755329
########## crc.go: ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gocql + +import "hash/crc32" + +var ( + // Initial CRC32 bytes: 0xFA, 0x2D, 0x55, 0xCA + initialCRC32Bytes = []byte{0xfa, 0x2d, 0x55, 0xca} +) + +// ChecksumIEEE calculates the CRC32 checksum of the given byte slice. +func ChecksumIEEE(b []byte) uint32 { + crc := crc32.NewIEEE() + crc.Reset() + crc.Write(initialCRC32Bytes) // Include initial CRC32 bytes + crc.Write(b) + return crc.Sum32() +} + +const ( + crc24Init = 0x875060 // Initial value for CRC24 calculation + crc24Poly = 0x1974F0B // Polynomial for CRC24 calculation +) + +// KoopmanChecksum calculates the CRC24 checksum using the Koopman polynomial. +func KoopmanChecksum(buf []byte) uint32 { Review Comment: I agree here and added some tests to cover this important part. ########## frame.go: ########## @@ -2070,3 +2085,251 @@ func (f *framer) writeBytesMap(m map[string][]byte) { f.writeBytes(v) } } + +func (f *framer) prepareModernLayout() error { + // Ensure protocol version is V5 or higher + if f.proto < protoVersion5 { + panic("Modern layout is not supported with version V4 or less") + } + + selfContained := true + + var ( + adjustedBuf []byte + tempBuf []byte + err error + ) + + // Process the buffer in chunks if it exceeds the max payload size + for len(f.buf) > maxPayloadSize { + if f.compres != nil { + tempBuf, err = newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres) + } else { + tempBuf, err = newUncompressedFrame(f.buf[:maxPayloadSize], false) + } + if err != nil { + return err + } + + adjustedBuf = append(adjustedBuf, tempBuf...) + f.buf = f.buf[maxPayloadSize:] + selfContained = false + } + + // Process the remaining buffer + if f.compres != nil { + tempBuf, err = newCompressedFrame(f.buf, selfContained, f.compres) + } else { + tempBuf, err = newUncompressedFrame(f.buf, selfContained) + } + if err != nil { + return err + } + + adjustedBuf = append(adjustedBuf, tempBuf...) + f.buf = adjustedBuf + + return nil +} + +func readUncompressedFrame(r io.Reader) ([]byte, bool, error) { + const headerSize = 6 + header := [headerSize + 1]byte{} + + // Read the frame header + if _, err := io.ReadFull(r, header[:headerSize]); err != nil { + return nil, false, fmt.Errorf("gocql: failed to read uncompressed frame, err: %w", err) + } + + // Compute and verify the header CRC24 + computedHeaderCRC24 := KoopmanChecksum(header[:3]) + readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF + if computedHeaderCRC24 != readHeaderCRC24 { + return nil, false, fmt.Errorf("gocql: header crc24 mismatch, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24) + } + + // Extract the payload length and self-contained flag + headerInt := binary.LittleEndian.Uint32(header[:4]) + payloadLen := int(headerInt & 0x1FFFF) + isSelfContained := (headerInt & (1 << 17)) != 0 + + // Read the payload + payload := make([]byte, payloadLen) + if _, err := io.ReadFull(r, payload); err != nil { + return nil, false, fmt.Errorf("gocql: failed to read uncompressed frame payload, err: %w", err) + } + + // Read and verify the payload CRC32 + if _, err := io.ReadFull(r, header[:4]); err != nil { + return nil, false, fmt.Errorf("gocql: failed to read payload crc32, err: %w", err) + } + + computedPayloadCRC32 := ChecksumIEEE(payload) + readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4]) + if computedPayloadCRC32 != readPayloadCRC32 { + return nil, false, fmt.Errorf("gocql: payload crc32 mismatch, computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32) + } + + return payload, isSelfContained, nil +} + +const maxPayloadSize = 128*1024 - 1 + +func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte, error) { + const ( + headerSize = 6 + selfContainedBit = 1 << 17 + ) + + payloadLen := len(payload) + if payloadLen > maxPayloadSize { + return nil, fmt.Errorf("payload length (%d) exceeds maximum size of 128 KiB", payloadLen) + } + + header := make([]byte, headerSize) Review Comment: Good catch! I moved the frame buffer initializing to allow the header to be written directly into the frame. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

