lukasz-antoniak commented on code in PR #1773:
URL: 
https://github.com/apache/cassandra-gocql-driver/pull/1773#discussion_r1731230688


##########
crc.go:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package gocql
+
+import "hash/crc32"
+
+var (
+       // Initial CRC32 bytes: 0xFA, 0x2D, 0x55, 0xCA
+       initialCRC32Bytes = []byte{0xfa, 0x2d, 0x55, 0xca}
+)
+
+// ChecksumIEEE calculates the CRC32 checksum of the given byte slice.
+func ChecksumIEEE(b []byte) uint32 {
+       crc := crc32.NewIEEE()

Review Comment:
   Every object contains quite big array `type Table [256]uint32`. Cassandra on 
the server side creates only one CRC object per thread. Maybe we should also 
create a pool of those objects to minimise memory consumption if we process a 
lot of concurrent requests.



##########
conn.go:
##########
@@ -660,6 +673,16 @@ func (c *Conn) heartBeat(ctx context.Context) {
 }
 
 func (c *Conn) recv(ctx context.Context) error {
+       // If native proto v5+ is used and conn is set up, then we should
+       // unwrap payload body from v5 compressed/uncompressed frame
+       if c.version > protoVersion4 && c.connReady {

Review Comment:
   Maybe revert the condition. If connection has not been initialised, we 
should not relay on protocol version.



##########
conn.go:
##########
@@ -1394,9 +1483,10 @@ func (c *Conn) executeQuery(ctx context.Context, qry 
*Query) *Iter {
                params.skipMeta = !(c.session.cfg.DisableSkipMetadata || 
qry.disableSkipMetadata)
 
                frame = &writeExecuteFrame{
-                       preparedID:    info.id,
-                       params:        params,
-                       customPayload: qry.customPayload,
+                       preparedID:         info.id,
+                       preparedMetadataID: info.metadataID,
+                       params:             params,

Review Comment:
   `resultRowsFrame` does not handle `new_metadata_id` and does not propagate 
it to next iterator calls.
   ```
   <new_metadata_id> is [short bytes] representing the new, changed resultset
   metadata. The new metadata ID must also be used in subsequent executions of
   the corresponding prepared statement, if any.
   ```
   This is needed to solve 
[CASSANDRA-10786](https://issues.apache.org/jira/browse/CASSANDRA-10786).



##########
crc.go:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package gocql
+
+import "hash/crc32"
+
+var (
+       // Initial CRC32 bytes: 0xFA, 0x2D, 0x55, 0xCA
+       initialCRC32Bytes = []byte{0xfa, 0x2d, 0x55, 0xca}
+)
+
+// ChecksumIEEE calculates the CRC32 checksum of the given byte slice.
+func ChecksumIEEE(b []byte) uint32 {
+       crc := crc32.NewIEEE()
+       crc.Reset()

Review Comment:
   Call to reset is not needed.



##########
conn.go:
##########
@@ -777,6 +800,48 @@ func (c *Conn) handleTimeout() {
        }
 }
 
+func (c *Conn) recvProtoV5Frame(ctx context.Context) error {
+       var (
+               payload         []byte
+               isSelfContained bool
+               err             error
+       )
+
+       // Read frame based on compression
+       if c.compressor != nil {
+               payload, isSelfContained, err = readCompressedFrame(c.r, 
c.compressor)
+       } else {
+               payload, isSelfContained, err = readUncompressedFrame(c.r)
+       }
+       if err != nil {
+               return err
+       }
+
+       if isSelfContained {
+               // TODO handle case when there are more than 1 envelop inside 
the frame

Review Comment:
   This TODO has to be solved, because other V5 capable drivers do it as well. 
Quote from protocol:
   ```
   If self contained, then the payload includes one or more complete envelopes 
and can be fully processed immediately.
   ```
   Also implement tests for it.



##########
frame.go:
##########
@@ -1599,8 +1608,9 @@ func (f frameWriterFunc) buildFrame(framer *framer, 
streamID int) error {
 }
 
 type writeExecuteFrame struct {
-       preparedID []byte
-       params     queryParams
+       preparedID         []byte
+       preparedMetadataID []byte

Review Comment:
   For clarity change the property name to`<result_metadata_id>`.



##########
crc.go:
##########
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package gocql
+
+import "hash/crc32"
+
+var (
+       // Initial CRC32 bytes: 0xFA, 0x2D, 0x55, 0xCA
+       initialCRC32Bytes = []byte{0xfa, 0x2d, 0x55, 0xca}
+)
+
+// ChecksumIEEE calculates the CRC32 checksum of the given byte slice.
+func ChecksumIEEE(b []byte) uint32 {
+       crc := crc32.NewIEEE()
+       crc.Reset()
+       crc.Write(initialCRC32Bytes) // Include initial CRC32 bytes
+       crc.Write(b)
+       return crc.Sum32()
+}
+
+const (
+       crc24Init = 0x875060  // Initial value for CRC24 calculation
+       crc24Poly = 0x1974F0B // Polynomial for CRC24 calculation
+)
+
+// KoopmanChecksum calculates the CRC24 checksum using the Koopman polynomial.
+func KoopmanChecksum(buf []byte) uint32 {

Review Comment:
   Tests are missing for both CRC calculations. Somebody could unexpectedly 
change initial value and he would not realise it.



##########
conn.go:
##########
@@ -1223,9 +1310,10 @@ type StreamObserverContext interface {
 }
 
 type preparedStatment struct {
-       id       []byte
-       request  preparedMetadata
-       response resultMetadata
+       id         []byte
+       metadataID []byte

Review Comment:
   Please change the name to `result_metadata_id` as in the protocol not to 
create confusion.



##########
conn.go:
##########
@@ -1394,9 +1483,10 @@ func (c *Conn) executeQuery(ctx context.Context, qry 
*Query) *Iter {
                params.skipMeta = !(c.session.cfg.DisableSkipMetadata || 
qry.disableSkipMetadata)
 
                frame = &writeExecuteFrame{
-                       preparedID:    info.id,
-                       params:        params,
-                       customPayload: qry.customPayload,
+                       preparedID:         info.id,
+                       preparedMetadataID: info.metadataID,

Review Comment:
   Rename the field to `result_metadata_id`. `preparedMetadataID` can be 
misleading.



##########
conn.go:
##########
@@ -215,6 +216,14 @@ type Conn struct {
        host            *HostInfo
        isSchemaV2      bool
 
+       // Only for proto v5+.
+       // Indicates if Conn is ready to use Native Protocol V5.

Review Comment:
   This flag indicates that `STARTUP` message has been completed, not that we 
can use V5. Please fix the comment, because it can be misleading.



##########
frame.go:
##########
@@ -2070,3 +2085,251 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
                f.writeBytes(v)
        }
 }
+
+func (f *framer) prepareModernLayout() error {
+       // Ensure protocol version is V5 or higher
+       if f.proto < protoVersion5 {
+               panic("Modern layout is not supported with version V4 or less")
+       }
+
+       selfContained := true
+
+       var (
+               adjustedBuf []byte
+               tempBuf     []byte
+               err         error
+       )
+
+       // Process the buffer in chunks if it exceeds the max payload size
+       for len(f.buf) > maxPayloadSize {
+               if f.compres != nil {
+                       tempBuf, err = 
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+               } else {
+                       tempBuf, err = 
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+               }
+               if err != nil {
+                       return err
+               }
+
+               adjustedBuf = append(adjustedBuf, tempBuf...)
+               f.buf = f.buf[maxPayloadSize:]
+               selfContained = false
+       }
+
+       // Process the remaining buffer
+       if f.compres != nil {
+               tempBuf, err = newCompressedFrame(f.buf, selfContained, 
f.compres)
+       } else {
+               tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+       }
+       if err != nil {
+               return err
+       }
+
+       adjustedBuf = append(adjustedBuf, tempBuf...)
+       f.buf = adjustedBuf
+
+       return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+       const headerSize = 6
+       header := [headerSize + 1]byte{}
+
+       // Read the frame header
+       if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame, err: %w", err)
+       }
+
+       // Compute and verify the header CRC24
+       computedHeaderCRC24 := KoopmanChecksum(header[:3])
+       readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+       if computedHeaderCRC24 != readHeaderCRC24 {
+               return nil, false, fmt.Errorf("gocql: header crc24 mismatch, 
computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+       }
+
+       // Extract the payload length and self-contained flag
+       headerInt := binary.LittleEndian.Uint32(header[:4])
+       payloadLen := int(headerInt & 0x1FFFF)
+       isSelfContained := (headerInt & (1 << 17)) != 0
+
+       // Read the payload
+       payload := make([]byte, payloadLen)
+       if _, err := io.ReadFull(r, payload); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame payload, err: %w", err)
+       }
+
+       // Read and verify the payload CRC32
+       if _, err := io.ReadFull(r, header[:4]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read payload 
crc32, err: %w", err)
+       }
+
+       computedPayloadCRC32 := ChecksumIEEE(payload)
+       readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+       if computedPayloadCRC32 != readPayloadCRC32 {
+               return nil, false, fmt.Errorf("gocql: payload crc32 mismatch, 
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+       }
+
+       return payload, isSelfContained, nil
+}
+
+const maxPayloadSize = 128*1024 - 1
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte, 
error) {
+       const (
+               headerSize       = 6
+               selfContainedBit = 1 << 17
+       )
+
+       payloadLen := len(payload)
+       if payloadLen > maxPayloadSize {
+               return nil, fmt.Errorf("payload length (%d) exceeds maximum 
size of 128 KiB", payloadLen)
+       }
+
+       header := make([]byte, headerSize)
+
+       // First 3 bytes: payload length and self-contained flag
+       headerInt := uint32(payloadLen) & 0x1FFFF
+       if isSelfContained {
+               headerInt |= selfContainedBit // Set the self-contained flag
+       }
+
+       // Encode the first 3 bytes as a single little-endian integer
+       header[0] = byte(headerInt)
+       header[1] = byte(headerInt >> 8)
+       header[2] = byte(headerInt >> 16)
+
+       // Calculate CRC24 for the first 3 bytes of the header
+       crc := KoopmanChecksum(header[:3])
+
+       // Encode CRC24 into the next 3 bytes of the header
+       header[3] = byte(crc)
+       header[4] = byte(crc >> 8)
+       header[5] = byte(crc >> 16)
+
+       // Create the frame
+       frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
+       frame := make([]byte, frameSize)
+       copy(frame, header)               // Copy the header to the frame
+       copy(frame[headerSize:], payload) // Copy the payload to the frame
+
+       // Calculate CRC32 for the payload
+       payloadCRC32 := ChecksumIEEE(payload)
+       binary.LittleEndian.PutUint32(frame[headerSize+payloadLen:], 
payloadCRC32)
+
+       return frame, nil
+}
+
+func newCompressedFrame(uncompressedPayload []byte, isSelfContained bool, 
compressor Compressor) ([]byte, error) {
+       uncompressedLen := len(uncompressedPayload)
+       if uncompressedLen > maxPayloadSize {
+               return nil, fmt.Errorf("uncompressed compressed payload length 
exceedes max size of frame payload %d/%d", uncompressedLen, maxPayloadSize)
+       }
+
+       compressedPayload, err := compressor.Encode(uncompressedPayload)
+       if err != nil {
+               return nil, err
+       }
+
+       // Skip the first 4 bytes because the size of the uncompressed payload 
is written in the frame header, not in the
+       // body of the compressed envelope
+       compressedPayload = compressedPayload[4:]
+
+       compressedLen := len(compressedPayload)
+
+       // Compression is not worth it
+       if uncompressedLen < compressedLen {
+               // native_protocol_v5.spec
+               // 2.2
+               //  An uncompressed length of 0 signals that the compressed 
payload
+               //  should be used as-is and not decompressed.
+               compressedPayload = uncompressedPayload

Review Comment:
   `TestLargeSizeQuery` will generate random text, which LZ4 compressor most 
likely will not encode to smaller representation. Create a test to make sure 
that LZ4 compression is applied and we do not fall under this condition.



##########
frame.go:
##########
@@ -2070,3 +2085,251 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
                f.writeBytes(v)
        }
 }
+
+func (f *framer) prepareModernLayout() error {
+       // Ensure protocol version is V5 or higher
+       if f.proto < protoVersion5 {
+               panic("Modern layout is not supported with version V4 or less")
+       }
+
+       selfContained := true
+
+       var (
+               adjustedBuf []byte
+               tempBuf     []byte
+               err         error
+       )
+
+       // Process the buffer in chunks if it exceeds the max payload size
+       for len(f.buf) > maxPayloadSize {
+               if f.compres != nil {
+                       tempBuf, err = 
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+               } else {
+                       tempBuf, err = 
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+               }
+               if err != nil {
+                       return err
+               }
+
+               adjustedBuf = append(adjustedBuf, tempBuf...)
+               f.buf = f.buf[maxPayloadSize:]
+               selfContained = false
+       }
+
+       // Process the remaining buffer
+       if f.compres != nil {
+               tempBuf, err = newCompressedFrame(f.buf, selfContained, 
f.compres)
+       } else {
+               tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+       }
+       if err != nil {
+               return err
+       }
+
+       adjustedBuf = append(adjustedBuf, tempBuf...)
+       f.buf = adjustedBuf
+
+       return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+       const headerSize = 6
+       header := [headerSize + 1]byte{}
+
+       // Read the frame header
+       if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame, err: %w", err)
+       }
+
+       // Compute and verify the header CRC24
+       computedHeaderCRC24 := KoopmanChecksum(header[:3])
+       readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+       if computedHeaderCRC24 != readHeaderCRC24 {
+               return nil, false, fmt.Errorf("gocql: header crc24 mismatch, 
computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+       }
+
+       // Extract the payload length and self-contained flag
+       headerInt := binary.LittleEndian.Uint32(header[:4])
+       payloadLen := int(headerInt & 0x1FFFF)
+       isSelfContained := (headerInt & (1 << 17)) != 0
+
+       // Read the payload
+       payload := make([]byte, payloadLen)
+       if _, err := io.ReadFull(r, payload); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame payload, err: %w", err)
+       }
+
+       // Read and verify the payload CRC32
+       if _, err := io.ReadFull(r, header[:4]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read payload 
crc32, err: %w", err)
+       }
+
+       computedPayloadCRC32 := ChecksumIEEE(payload)
+       readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+       if computedPayloadCRC32 != readPayloadCRC32 {
+               return nil, false, fmt.Errorf("gocql: payload crc32 mismatch, 
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+       }
+
+       return payload, isSelfContained, nil
+}
+
+const maxPayloadSize = 128*1024 - 1
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte, 
error) {
+       const (
+               headerSize       = 6
+               selfContainedBit = 1 << 17
+       )
+
+       payloadLen := len(payload)
+       if payloadLen > maxPayloadSize {
+               return nil, fmt.Errorf("payload length (%d) exceeds maximum 
size of 128 KiB", payloadLen)
+       }
+
+       header := make([]byte, headerSize)
+
+       // First 3 bytes: payload length and self-contained flag
+       headerInt := uint32(payloadLen) & 0x1FFFF
+       if isSelfContained {
+               headerInt |= selfContainedBit // Set the self-contained flag
+       }
+
+       // Encode the first 3 bytes as a single little-endian integer
+       header[0] = byte(headerInt)
+       header[1] = byte(headerInt >> 8)
+       header[2] = byte(headerInt >> 16)
+
+       // Calculate CRC24 for the first 3 bytes of the header
+       crc := KoopmanChecksum(header[:3])
+
+       // Encode CRC24 into the next 3 bytes of the header
+       header[3] = byte(crc)
+       header[4] = byte(crc >> 8)
+       header[5] = byte(crc >> 16)
+
+       // Create the frame
+       frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32
+       frame := make([]byte, frameSize)
+       copy(frame, header)               // Copy the header to the frame
+       copy(frame[headerSize:], payload) // Copy the payload to the frame
+
+       // Calculate CRC32 for the payload
+       payloadCRC32 := ChecksumIEEE(payload)
+       binary.LittleEndian.PutUint32(frame[headerSize+payloadLen:], 
payloadCRC32)
+
+       return frame, nil
+}
+
+func newCompressedFrame(uncompressedPayload []byte, isSelfContained bool, 
compressor Compressor) ([]byte, error) {
+       uncompressedLen := len(uncompressedPayload)
+       if uncompressedLen > maxPayloadSize {
+               return nil, fmt.Errorf("uncompressed compressed payload length 
exceedes max size of frame payload %d/%d", uncompressedLen, maxPayloadSize)
+       }
+
+       compressedPayload, err := compressor.Encode(uncompressedPayload)
+       if err != nil {
+               return nil, err
+       }
+
+       // Skip the first 4 bytes because the size of the uncompressed payload 
is written in the frame header, not in the
+       // body of the compressed envelope
+       compressedPayload = compressedPayload[4:]
+
+       compressedLen := len(compressedPayload)
+
+       // Compression is not worth it
+       if uncompressedLen < compressedLen {
+               // native_protocol_v5.spec
+               // 2.2
+               //  An uncompressed length of 0 signals that the compressed 
payload
+               //  should be used as-is and not decompressed.
+               compressedPayload = uncompressedPayload
+               compressedLen = uncompressedLen
+               uncompressedLen = 0
+       }
+
+       // Combine compressed and uncompressed lengths and set the 
self-contained flag if needed
+       combined := uint64(compressedLen) | uint64(uncompressedLen)<<17
+       if isSelfContained {
+               combined |= 1 << 34
+       }
+
+       var headerBuf [8]byte
+
+       // Write the combined value into the header buffer
+       binary.LittleEndian.PutUint64(headerBuf[:], combined)
+
+       // Create a buffer with enough capacity to hold the header, compressed 
payload, and checksums
+       buf := bytes.NewBuffer(make([]byte, 0, 8+compressedLen+4))
+
+       // Write the first 5 bytes of the header (compressed and uncompressed 
sizes)
+       buf.Write(headerBuf[:5])
+
+       // Compute and write the CRC24 checksum of the first 5 bytes
+       headerChecksum := KoopmanChecksum(headerBuf[:5])
+       binary.LittleEndian.PutUint32(headerBuf[:], headerChecksum)
+       buf.Write(headerBuf[:3])
+       buf.Write(compressedPayload)
+
+       // Compute and write the CRC32 checksum of the payload
+       payloadChecksum := ChecksumIEEE(compressedPayload)
+       binary.LittleEndian.PutUint32(headerBuf[:], payloadChecksum)
+       buf.Write(headerBuf[:4])
+
+       return buf.Bytes(), nil
+}
+
+func readCompressedFrame(r io.Reader, compressor Compressor) ([]byte, bool, 
error) {

Review Comment:
   We are missing tests to simulate various frame errors here, e.g. invalid 
length encoded in the frame, invalid CRC. Tests could use our frame building 
logic for a simple EXECUTE or QUERY, change payload and then make sure decoding 
fails with certain error.



##########
conn.go:
##########
@@ -1394,9 +1483,10 @@ func (c *Conn) executeQuery(ctx context.Context, qry 
*Query) *Iter {
                params.skipMeta = !(c.session.cfg.DisableSkipMetadata || 
qry.disableSkipMetadata)
 
                frame = &writeExecuteFrame{

Review Comment:
   For BATCH message we are missing keyspace field.
   ```
   0x0080: With keyspace. If set, <keyspace> must be present. <keyspace> is a
   [string] indicating the keyspace that the query should be executed in.
   It supercedes the keyspace that the connection is bound to, if any.
   ```



##########
conn.go:
##########
@@ -1394,9 +1483,10 @@ func (c *Conn) executeQuery(ctx context.Context, qry 
*Query) *Iter {
                params.skipMeta = !(c.session.cfg.DisableSkipMetadata || 
qry.disableSkipMetadata)
 
                frame = &writeExecuteFrame{

Review Comment:
   Field `<now_in_seconds>` has not been added as per below quote and flags 
bitmap not updated.
   ```
   Added now_in_seconds field in QUERY, EXECUTE, and BATCH messages (Sections 
4.1.4, 4.1.6, and 4.1.7).
   ```



##########
frame.go:
##########
@@ -2070,3 +2085,251 @@ func (f *framer) writeBytesMap(m map[string][]byte) {
                f.writeBytes(v)
        }
 }
+
+func (f *framer) prepareModernLayout() error {
+       // Ensure protocol version is V5 or higher
+       if f.proto < protoVersion5 {
+               panic("Modern layout is not supported with version V4 or less")
+       }
+
+       selfContained := true
+
+       var (
+               adjustedBuf []byte
+               tempBuf     []byte
+               err         error
+       )
+
+       // Process the buffer in chunks if it exceeds the max payload size
+       for len(f.buf) > maxPayloadSize {
+               if f.compres != nil {
+                       tempBuf, err = 
newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres)
+               } else {
+                       tempBuf, err = 
newUncompressedFrame(f.buf[:maxPayloadSize], false)
+               }
+               if err != nil {
+                       return err
+               }
+
+               adjustedBuf = append(adjustedBuf, tempBuf...)
+               f.buf = f.buf[maxPayloadSize:]
+               selfContained = false
+       }
+
+       // Process the remaining buffer
+       if f.compres != nil {
+               tempBuf, err = newCompressedFrame(f.buf, selfContained, 
f.compres)
+       } else {
+               tempBuf, err = newUncompressedFrame(f.buf, selfContained)
+       }
+       if err != nil {
+               return err
+       }
+
+       adjustedBuf = append(adjustedBuf, tempBuf...)
+       f.buf = adjustedBuf
+
+       return nil
+}
+
+func readUncompressedFrame(r io.Reader) ([]byte, bool, error) {
+       const headerSize = 6
+       header := [headerSize + 1]byte{}
+
+       // Read the frame header
+       if _, err := io.ReadFull(r, header[:headerSize]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame, err: %w", err)
+       }
+
+       // Compute and verify the header CRC24
+       computedHeaderCRC24 := KoopmanChecksum(header[:3])
+       readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF
+       if computedHeaderCRC24 != readHeaderCRC24 {
+               return nil, false, fmt.Errorf("gocql: header crc24 mismatch, 
computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24)
+       }
+
+       // Extract the payload length and self-contained flag
+       headerInt := binary.LittleEndian.Uint32(header[:4])
+       payloadLen := int(headerInt & 0x1FFFF)
+       isSelfContained := (headerInt & (1 << 17)) != 0
+
+       // Read the payload
+       payload := make([]byte, payloadLen)
+       if _, err := io.ReadFull(r, payload); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read 
uncompressed frame payload, err: %w", err)
+       }
+
+       // Read and verify the payload CRC32
+       if _, err := io.ReadFull(r, header[:4]); err != nil {
+               return nil, false, fmt.Errorf("gocql: failed to read payload 
crc32, err: %w", err)
+       }
+
+       computedPayloadCRC32 := ChecksumIEEE(payload)
+       readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4])
+       if computedPayloadCRC32 != readPayloadCRC32 {
+               return nil, false, fmt.Errorf("gocql: payload crc32 mismatch, 
computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32)
+       }
+
+       return payload, isSelfContained, nil
+}
+
+const maxPayloadSize = 128*1024 - 1
+
+func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte, 
error) {
+       const (
+               headerSize       = 6
+               selfContainedBit = 1 << 17
+       )
+
+       payloadLen := len(payload)
+       if payloadLen > maxPayloadSize {
+               return nil, fmt.Errorf("payload length (%d) exceeds maximum 
size of 128 KiB", payloadLen)
+       }
+
+       header := make([]byte, headerSize)

Review Comment:
   Can we here not create `header` array and then copy it over to `frame`, but 
instead just write to `frame`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to