lukasz-antoniak commented on code in PR #1773: URL: https://github.com/apache/cassandra-gocql-driver/pull/1773#discussion_r1731230688
########## crc.go: ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gocql + +import "hash/crc32" + +var ( + // Initial CRC32 bytes: 0xFA, 0x2D, 0x55, 0xCA + initialCRC32Bytes = []byte{0xfa, 0x2d, 0x55, 0xca} +) + +// ChecksumIEEE calculates the CRC32 checksum of the given byte slice. +func ChecksumIEEE(b []byte) uint32 { + crc := crc32.NewIEEE() Review Comment: Every object contains quite big array `type Table [256]uint32`. Cassandra on the server side creates only one CRC object per thread. Maybe we should also create a pool of those objects to minimise memory consumption if we process a lot of concurrent requests. ########## conn.go: ########## @@ -660,6 +673,16 @@ func (c *Conn) heartBeat(ctx context.Context) { } func (c *Conn) recv(ctx context.Context) error { + // If native proto v5+ is used and conn is set up, then we should + // unwrap payload body from v5 compressed/uncompressed frame + if c.version > protoVersion4 && c.connReady { Review Comment: Maybe revert the condition. If connection has not been initialised, we should not relay on protocol version. ########## conn.go: ########## @@ -1394,9 +1483,10 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter { params.skipMeta = !(c.session.cfg.DisableSkipMetadata || qry.disableSkipMetadata) frame = &writeExecuteFrame{ - preparedID: info.id, - params: params, - customPayload: qry.customPayload, + preparedID: info.id, + preparedMetadataID: info.metadataID, + params: params, Review Comment: `resultRowsFrame` does not handle `new_metadata_id` and does not propagate it to next iterator calls. ``` <new_metadata_id> is [short bytes] representing the new, changed resultset metadata. The new metadata ID must also be used in subsequent executions of the corresponding prepared statement, if any. ``` This is needed to solve [CASSANDRA-10786](https://issues.apache.org/jira/browse/CASSANDRA-10786). ########## crc.go: ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gocql + +import "hash/crc32" + +var ( + // Initial CRC32 bytes: 0xFA, 0x2D, 0x55, 0xCA + initialCRC32Bytes = []byte{0xfa, 0x2d, 0x55, 0xca} +) + +// ChecksumIEEE calculates the CRC32 checksum of the given byte slice. +func ChecksumIEEE(b []byte) uint32 { + crc := crc32.NewIEEE() + crc.Reset() Review Comment: Call to reset is not needed. ########## conn.go: ########## @@ -777,6 +800,48 @@ func (c *Conn) handleTimeout() { } } +func (c *Conn) recvProtoV5Frame(ctx context.Context) error { + var ( + payload []byte + isSelfContained bool + err error + ) + + // Read frame based on compression + if c.compressor != nil { + payload, isSelfContained, err = readCompressedFrame(c.r, c.compressor) + } else { + payload, isSelfContained, err = readUncompressedFrame(c.r) + } + if err != nil { + return err + } + + if isSelfContained { + // TODO handle case when there are more than 1 envelop inside the frame Review Comment: This TODO has to be solved, because other V5 capable drivers do it as well. Quote from protocol: ``` If self contained, then the payload includes one or more complete envelopes and can be fully processed immediately. ``` Also implement tests for it. ########## frame.go: ########## @@ -1599,8 +1608,9 @@ func (f frameWriterFunc) buildFrame(framer *framer, streamID int) error { } type writeExecuteFrame struct { - preparedID []byte - params queryParams + preparedID []byte + preparedMetadataID []byte Review Comment: For clarity change the property name to`<result_metadata_id>`. ########## crc.go: ########## @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package gocql + +import "hash/crc32" + +var ( + // Initial CRC32 bytes: 0xFA, 0x2D, 0x55, 0xCA + initialCRC32Bytes = []byte{0xfa, 0x2d, 0x55, 0xca} +) + +// ChecksumIEEE calculates the CRC32 checksum of the given byte slice. +func ChecksumIEEE(b []byte) uint32 { + crc := crc32.NewIEEE() + crc.Reset() + crc.Write(initialCRC32Bytes) // Include initial CRC32 bytes + crc.Write(b) + return crc.Sum32() +} + +const ( + crc24Init = 0x875060 // Initial value for CRC24 calculation + crc24Poly = 0x1974F0B // Polynomial for CRC24 calculation +) + +// KoopmanChecksum calculates the CRC24 checksum using the Koopman polynomial. +func KoopmanChecksum(buf []byte) uint32 { Review Comment: Tests are missing for both CRC calculations. Somebody could unexpectedly change initial value and he would not realise it. ########## conn.go: ########## @@ -1223,9 +1310,10 @@ type StreamObserverContext interface { } type preparedStatment struct { - id []byte - request preparedMetadata - response resultMetadata + id []byte + metadataID []byte Review Comment: Please change the name to `result_metadata_id` as in the protocol not to create confusion. ########## conn.go: ########## @@ -1394,9 +1483,10 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter { params.skipMeta = !(c.session.cfg.DisableSkipMetadata || qry.disableSkipMetadata) frame = &writeExecuteFrame{ - preparedID: info.id, - params: params, - customPayload: qry.customPayload, + preparedID: info.id, + preparedMetadataID: info.metadataID, Review Comment: Rename the field to `result_metadata_id`. `preparedMetadataID` can be misleading. ########## conn.go: ########## @@ -215,6 +216,14 @@ type Conn struct { host *HostInfo isSchemaV2 bool + // Only for proto v5+. + // Indicates if Conn is ready to use Native Protocol V5. Review Comment: This flag indicates that `STARTUP` message has been completed, not that we can use V5. Please fix the comment, because it can be misleading. ########## frame.go: ########## @@ -2070,3 +2085,251 @@ func (f *framer) writeBytesMap(m map[string][]byte) { f.writeBytes(v) } } + +func (f *framer) prepareModernLayout() error { + // Ensure protocol version is V5 or higher + if f.proto < protoVersion5 { + panic("Modern layout is not supported with version V4 or less") + } + + selfContained := true + + var ( + adjustedBuf []byte + tempBuf []byte + err error + ) + + // Process the buffer in chunks if it exceeds the max payload size + for len(f.buf) > maxPayloadSize { + if f.compres != nil { + tempBuf, err = newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres) + } else { + tempBuf, err = newUncompressedFrame(f.buf[:maxPayloadSize], false) + } + if err != nil { + return err + } + + adjustedBuf = append(adjustedBuf, tempBuf...) + f.buf = f.buf[maxPayloadSize:] + selfContained = false + } + + // Process the remaining buffer + if f.compres != nil { + tempBuf, err = newCompressedFrame(f.buf, selfContained, f.compres) + } else { + tempBuf, err = newUncompressedFrame(f.buf, selfContained) + } + if err != nil { + return err + } + + adjustedBuf = append(adjustedBuf, tempBuf...) + f.buf = adjustedBuf + + return nil +} + +func readUncompressedFrame(r io.Reader) ([]byte, bool, error) { + const headerSize = 6 + header := [headerSize + 1]byte{} + + // Read the frame header + if _, err := io.ReadFull(r, header[:headerSize]); err != nil { + return nil, false, fmt.Errorf("gocql: failed to read uncompressed frame, err: %w", err) + } + + // Compute and verify the header CRC24 + computedHeaderCRC24 := KoopmanChecksum(header[:3]) + readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF + if computedHeaderCRC24 != readHeaderCRC24 { + return nil, false, fmt.Errorf("gocql: header crc24 mismatch, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24) + } + + // Extract the payload length and self-contained flag + headerInt := binary.LittleEndian.Uint32(header[:4]) + payloadLen := int(headerInt & 0x1FFFF) + isSelfContained := (headerInt & (1 << 17)) != 0 + + // Read the payload + payload := make([]byte, payloadLen) + if _, err := io.ReadFull(r, payload); err != nil { + return nil, false, fmt.Errorf("gocql: failed to read uncompressed frame payload, err: %w", err) + } + + // Read and verify the payload CRC32 + if _, err := io.ReadFull(r, header[:4]); err != nil { + return nil, false, fmt.Errorf("gocql: failed to read payload crc32, err: %w", err) + } + + computedPayloadCRC32 := ChecksumIEEE(payload) + readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4]) + if computedPayloadCRC32 != readPayloadCRC32 { + return nil, false, fmt.Errorf("gocql: payload crc32 mismatch, computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32) + } + + return payload, isSelfContained, nil +} + +const maxPayloadSize = 128*1024 - 1 + +func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte, error) { + const ( + headerSize = 6 + selfContainedBit = 1 << 17 + ) + + payloadLen := len(payload) + if payloadLen > maxPayloadSize { + return nil, fmt.Errorf("payload length (%d) exceeds maximum size of 128 KiB", payloadLen) + } + + header := make([]byte, headerSize) + + // First 3 bytes: payload length and self-contained flag + headerInt := uint32(payloadLen) & 0x1FFFF + if isSelfContained { + headerInt |= selfContainedBit // Set the self-contained flag + } + + // Encode the first 3 bytes as a single little-endian integer + header[0] = byte(headerInt) + header[1] = byte(headerInt >> 8) + header[2] = byte(headerInt >> 16) + + // Calculate CRC24 for the first 3 bytes of the header + crc := KoopmanChecksum(header[:3]) + + // Encode CRC24 into the next 3 bytes of the header + header[3] = byte(crc) + header[4] = byte(crc >> 8) + header[5] = byte(crc >> 16) + + // Create the frame + frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32 + frame := make([]byte, frameSize) + copy(frame, header) // Copy the header to the frame + copy(frame[headerSize:], payload) // Copy the payload to the frame + + // Calculate CRC32 for the payload + payloadCRC32 := ChecksumIEEE(payload) + binary.LittleEndian.PutUint32(frame[headerSize+payloadLen:], payloadCRC32) + + return frame, nil +} + +func newCompressedFrame(uncompressedPayload []byte, isSelfContained bool, compressor Compressor) ([]byte, error) { + uncompressedLen := len(uncompressedPayload) + if uncompressedLen > maxPayloadSize { + return nil, fmt.Errorf("uncompressed compressed payload length exceedes max size of frame payload %d/%d", uncompressedLen, maxPayloadSize) + } + + compressedPayload, err := compressor.Encode(uncompressedPayload) + if err != nil { + return nil, err + } + + // Skip the first 4 bytes because the size of the uncompressed payload is written in the frame header, not in the + // body of the compressed envelope + compressedPayload = compressedPayload[4:] + + compressedLen := len(compressedPayload) + + // Compression is not worth it + if uncompressedLen < compressedLen { + // native_protocol_v5.spec + // 2.2 + // An uncompressed length of 0 signals that the compressed payload + // should be used as-is and not decompressed. + compressedPayload = uncompressedPayload Review Comment: `TestLargeSizeQuery` will generate random text, which LZ4 compressor most likely will not encode to smaller representation. Create a test to make sure that LZ4 compression is applied and we do not fall under this condition. ########## frame.go: ########## @@ -2070,3 +2085,251 @@ func (f *framer) writeBytesMap(m map[string][]byte) { f.writeBytes(v) } } + +func (f *framer) prepareModernLayout() error { + // Ensure protocol version is V5 or higher + if f.proto < protoVersion5 { + panic("Modern layout is not supported with version V4 or less") + } + + selfContained := true + + var ( + adjustedBuf []byte + tempBuf []byte + err error + ) + + // Process the buffer in chunks if it exceeds the max payload size + for len(f.buf) > maxPayloadSize { + if f.compres != nil { + tempBuf, err = newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres) + } else { + tempBuf, err = newUncompressedFrame(f.buf[:maxPayloadSize], false) + } + if err != nil { + return err + } + + adjustedBuf = append(adjustedBuf, tempBuf...) + f.buf = f.buf[maxPayloadSize:] + selfContained = false + } + + // Process the remaining buffer + if f.compres != nil { + tempBuf, err = newCompressedFrame(f.buf, selfContained, f.compres) + } else { + tempBuf, err = newUncompressedFrame(f.buf, selfContained) + } + if err != nil { + return err + } + + adjustedBuf = append(adjustedBuf, tempBuf...) + f.buf = adjustedBuf + + return nil +} + +func readUncompressedFrame(r io.Reader) ([]byte, bool, error) { + const headerSize = 6 + header := [headerSize + 1]byte{} + + // Read the frame header + if _, err := io.ReadFull(r, header[:headerSize]); err != nil { + return nil, false, fmt.Errorf("gocql: failed to read uncompressed frame, err: %w", err) + } + + // Compute and verify the header CRC24 + computedHeaderCRC24 := KoopmanChecksum(header[:3]) + readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF + if computedHeaderCRC24 != readHeaderCRC24 { + return nil, false, fmt.Errorf("gocql: header crc24 mismatch, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24) + } + + // Extract the payload length and self-contained flag + headerInt := binary.LittleEndian.Uint32(header[:4]) + payloadLen := int(headerInt & 0x1FFFF) + isSelfContained := (headerInt & (1 << 17)) != 0 + + // Read the payload + payload := make([]byte, payloadLen) + if _, err := io.ReadFull(r, payload); err != nil { + return nil, false, fmt.Errorf("gocql: failed to read uncompressed frame payload, err: %w", err) + } + + // Read and verify the payload CRC32 + if _, err := io.ReadFull(r, header[:4]); err != nil { + return nil, false, fmt.Errorf("gocql: failed to read payload crc32, err: %w", err) + } + + computedPayloadCRC32 := ChecksumIEEE(payload) + readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4]) + if computedPayloadCRC32 != readPayloadCRC32 { + return nil, false, fmt.Errorf("gocql: payload crc32 mismatch, computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32) + } + + return payload, isSelfContained, nil +} + +const maxPayloadSize = 128*1024 - 1 + +func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte, error) { + const ( + headerSize = 6 + selfContainedBit = 1 << 17 + ) + + payloadLen := len(payload) + if payloadLen > maxPayloadSize { + return nil, fmt.Errorf("payload length (%d) exceeds maximum size of 128 KiB", payloadLen) + } + + header := make([]byte, headerSize) + + // First 3 bytes: payload length and self-contained flag + headerInt := uint32(payloadLen) & 0x1FFFF + if isSelfContained { + headerInt |= selfContainedBit // Set the self-contained flag + } + + // Encode the first 3 bytes as a single little-endian integer + header[0] = byte(headerInt) + header[1] = byte(headerInt >> 8) + header[2] = byte(headerInt >> 16) + + // Calculate CRC24 for the first 3 bytes of the header + crc := KoopmanChecksum(header[:3]) + + // Encode CRC24 into the next 3 bytes of the header + header[3] = byte(crc) + header[4] = byte(crc >> 8) + header[5] = byte(crc >> 16) + + // Create the frame + frameSize := headerSize + payloadLen + 4 // 4 bytes for CRC32 + frame := make([]byte, frameSize) + copy(frame, header) // Copy the header to the frame + copy(frame[headerSize:], payload) // Copy the payload to the frame + + // Calculate CRC32 for the payload + payloadCRC32 := ChecksumIEEE(payload) + binary.LittleEndian.PutUint32(frame[headerSize+payloadLen:], payloadCRC32) + + return frame, nil +} + +func newCompressedFrame(uncompressedPayload []byte, isSelfContained bool, compressor Compressor) ([]byte, error) { + uncompressedLen := len(uncompressedPayload) + if uncompressedLen > maxPayloadSize { + return nil, fmt.Errorf("uncompressed compressed payload length exceedes max size of frame payload %d/%d", uncompressedLen, maxPayloadSize) + } + + compressedPayload, err := compressor.Encode(uncompressedPayload) + if err != nil { + return nil, err + } + + // Skip the first 4 bytes because the size of the uncompressed payload is written in the frame header, not in the + // body of the compressed envelope + compressedPayload = compressedPayload[4:] + + compressedLen := len(compressedPayload) + + // Compression is not worth it + if uncompressedLen < compressedLen { + // native_protocol_v5.spec + // 2.2 + // An uncompressed length of 0 signals that the compressed payload + // should be used as-is and not decompressed. + compressedPayload = uncompressedPayload + compressedLen = uncompressedLen + uncompressedLen = 0 + } + + // Combine compressed and uncompressed lengths and set the self-contained flag if needed + combined := uint64(compressedLen) | uint64(uncompressedLen)<<17 + if isSelfContained { + combined |= 1 << 34 + } + + var headerBuf [8]byte + + // Write the combined value into the header buffer + binary.LittleEndian.PutUint64(headerBuf[:], combined) + + // Create a buffer with enough capacity to hold the header, compressed payload, and checksums + buf := bytes.NewBuffer(make([]byte, 0, 8+compressedLen+4)) + + // Write the first 5 bytes of the header (compressed and uncompressed sizes) + buf.Write(headerBuf[:5]) + + // Compute and write the CRC24 checksum of the first 5 bytes + headerChecksum := KoopmanChecksum(headerBuf[:5]) + binary.LittleEndian.PutUint32(headerBuf[:], headerChecksum) + buf.Write(headerBuf[:3]) + buf.Write(compressedPayload) + + // Compute and write the CRC32 checksum of the payload + payloadChecksum := ChecksumIEEE(compressedPayload) + binary.LittleEndian.PutUint32(headerBuf[:], payloadChecksum) + buf.Write(headerBuf[:4]) + + return buf.Bytes(), nil +} + +func readCompressedFrame(r io.Reader, compressor Compressor) ([]byte, bool, error) { Review Comment: We are missing tests to simulate various frame errors here, e.g. invalid length encoded in the frame, invalid CRC. Tests could use our frame building logic for a simple EXECUTE or QUERY, change payload and then make sure decoding fails with certain error. ########## conn.go: ########## @@ -1394,9 +1483,10 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter { params.skipMeta = !(c.session.cfg.DisableSkipMetadata || qry.disableSkipMetadata) frame = &writeExecuteFrame{ Review Comment: For BATCH message we are missing keyspace field. ``` 0x0080: With keyspace. If set, <keyspace> must be present. <keyspace> is a [string] indicating the keyspace that the query should be executed in. It supercedes the keyspace that the connection is bound to, if any. ``` ########## conn.go: ########## @@ -1394,9 +1483,10 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter { params.skipMeta = !(c.session.cfg.DisableSkipMetadata || qry.disableSkipMetadata) frame = &writeExecuteFrame{ Review Comment: Field `<now_in_seconds>` has not been added as per below quote and flags bitmap not updated. ``` Added now_in_seconds field in QUERY, EXECUTE, and BATCH messages (Sections 4.1.4, 4.1.6, and 4.1.7). ``` ########## frame.go: ########## @@ -2070,3 +2085,251 @@ func (f *framer) writeBytesMap(m map[string][]byte) { f.writeBytes(v) } } + +func (f *framer) prepareModernLayout() error { + // Ensure protocol version is V5 or higher + if f.proto < protoVersion5 { + panic("Modern layout is not supported with version V4 or less") + } + + selfContained := true + + var ( + adjustedBuf []byte + tempBuf []byte + err error + ) + + // Process the buffer in chunks if it exceeds the max payload size + for len(f.buf) > maxPayloadSize { + if f.compres != nil { + tempBuf, err = newCompressedFrame(f.buf[:maxPayloadSize], false, f.compres) + } else { + tempBuf, err = newUncompressedFrame(f.buf[:maxPayloadSize], false) + } + if err != nil { + return err + } + + adjustedBuf = append(adjustedBuf, tempBuf...) + f.buf = f.buf[maxPayloadSize:] + selfContained = false + } + + // Process the remaining buffer + if f.compres != nil { + tempBuf, err = newCompressedFrame(f.buf, selfContained, f.compres) + } else { + tempBuf, err = newUncompressedFrame(f.buf, selfContained) + } + if err != nil { + return err + } + + adjustedBuf = append(adjustedBuf, tempBuf...) + f.buf = adjustedBuf + + return nil +} + +func readUncompressedFrame(r io.Reader) ([]byte, bool, error) { + const headerSize = 6 + header := [headerSize + 1]byte{} + + // Read the frame header + if _, err := io.ReadFull(r, header[:headerSize]); err != nil { + return nil, false, fmt.Errorf("gocql: failed to read uncompressed frame, err: %w", err) + } + + // Compute and verify the header CRC24 + computedHeaderCRC24 := KoopmanChecksum(header[:3]) + readHeaderCRC24 := binary.LittleEndian.Uint32(header[3:]) & 0xFFFFFF + if computedHeaderCRC24 != readHeaderCRC24 { + return nil, false, fmt.Errorf("gocql: header crc24 mismatch, computed: %d, got: %d", computedHeaderCRC24, readHeaderCRC24) + } + + // Extract the payload length and self-contained flag + headerInt := binary.LittleEndian.Uint32(header[:4]) + payloadLen := int(headerInt & 0x1FFFF) + isSelfContained := (headerInt & (1 << 17)) != 0 + + // Read the payload + payload := make([]byte, payloadLen) + if _, err := io.ReadFull(r, payload); err != nil { + return nil, false, fmt.Errorf("gocql: failed to read uncompressed frame payload, err: %w", err) + } + + // Read and verify the payload CRC32 + if _, err := io.ReadFull(r, header[:4]); err != nil { + return nil, false, fmt.Errorf("gocql: failed to read payload crc32, err: %w", err) + } + + computedPayloadCRC32 := ChecksumIEEE(payload) + readPayloadCRC32 := binary.LittleEndian.Uint32(header[:4]) + if computedPayloadCRC32 != readPayloadCRC32 { + return nil, false, fmt.Errorf("gocql: payload crc32 mismatch, computed: %d, got: %d", computedPayloadCRC32, readPayloadCRC32) + } + + return payload, isSelfContained, nil +} + +const maxPayloadSize = 128*1024 - 1 + +func newUncompressedFrame(payload []byte, isSelfContained bool) ([]byte, error) { + const ( + headerSize = 6 + selfContainedBit = 1 << 17 + ) + + payloadLen := len(payload) + if payloadLen > maxPayloadSize { + return nil, fmt.Errorf("payload length (%d) exceeds maximum size of 128 KiB", payloadLen) + } + + header := make([]byte, headerSize) Review Comment: Can we here not create `header` array and then copy it over to `frame`, but instead just write to `frame`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

