Copilot commented on code in PR #12299:
URL: https://github.com/apache/gluten/pull/12299#discussion_r3412343321
##########
cpp/core/utils/tac/FForCodec.cc:
##########
@@ -57,4 +58,46 @@ FForCodec::decompress(const uint8_t* input, int64_t
inputSize, uint8_t* output,
return static_cast<int64_t>(nDecoded);
}
+int64_t FForCodec::maxCompressedLength128(int64_t inputSize) {
+ if (inputSize % sizeof(__int128_t) != 0) {
+ return 0;
+ }
+ size_t numValues = inputSize / sizeof(__int128_t);
+ return static_cast<int64_t>(ffor::compress128Bound(numValues));
+}
+
+arrow::Result<int64_t>
+FForCodec::compress128(const uint8_t* input, int64_t inputSize, uint8_t*
output, int64_t outputSize) {
+ if (inputSize == 0) {
+ return 0;
+ }
+ if (inputSize % sizeof(__int128_t) != 0) {
+ return arrow::Status::Invalid("FForCodec: input size ", inputSize, " is
not a multiple of ", sizeof(__int128_t), ".");
+ }
+
+ size_t numValues = inputSize / sizeof(__int128_t);
+ auto maxLen = static_cast<int64_t>(ffor::compress128Bound(numValues));
+ if (outputSize < maxLen) {
+ return arrow::Status::Invalid(
+ "FForCodec: output buffer too small for 128-bit compression (need ",
+ maxLen, " bytes, got ", outputSize, ").");
+ }
+
+ auto written = ffor::compress128(input, numValues, output);
+ return static_cast<int64_t>(written);
+}
Review Comment:
`compress128()` returns early for `inputSize == 0`, which means it does not
write the required tail header that `ffor::compress128()` would normally emit.
Since `TypeAwareCompressCodec::compress()` already writes its 2-byte payload
header, this produces a malformed TAC stream for empty inputs (and
`decompress128()` can currently accept it without validating). Fix by removing
the early return and letting `ffor::compress128()` handle `numValues == 0`, or
by still emitting a valid empty stream (tail header with count=0) and returning
the number of bytes written."
##########
cpp/core/utils/tac/TypeAwareCompressCodec.cc:
##########
@@ -65,18 +80,38 @@ TypeAwareCompressCodec::decompress(const uint8_t* input,
int64_t inputLen, uint8
auto* in = input;
auto codecId = static_cast<CodecId>(*in++);
- [[maybe_unused]] auto tacType = *in++;
+ auto tacType = static_cast<int8_t>(*in++);
auto dataLen = inputLen - kPayloadHeaderSize;
- switch (codecId) {
- case CodecId::kFFor: {
- ARROW_ASSIGN_OR_RAISE(auto nDecoded, FForCodec::decompress(in, dataLen,
output, outputLen));
- (void)nDecoded;
- return inputLen;
+ if (codecId != CodecId::kFFor) {
+ return arrow::Status::Invalid("Unknown type-aware codec ID: ",
static_cast<int>(codecId));
+ }
+
+ int64_t nDecoded = 0;
+ int64_t valueSize = 0;
+ const char* typeName = nullptr;
+ switch (tacType) {
+ case tac::kUInt64: {
+ ARROW_ASSIGN_OR_RAISE(nDecoded, FForCodec::decompress(in, dataLen,
output, outputLen));
+ valueSize = sizeof(uint64_t);
+ typeName = "uint64";
+ break;
+ }
+ case tac::kUInt128: {
+ ARROW_ASSIGN_OR_RAISE(nDecoded, FForCodec::decompress128(in, dataLen,
output, outputLen));
+ valueSize = 2 * sizeof(uint64_t);
+ typeName = "uint128";
+ break;
}
default:
- return arrow::Status::Invalid("Unknown type-aware codec ID: ",
static_cast<int>(codecId));
+ return arrow::Status::Invalid("Unknown tac type in decompress: ",
static_cast<int>(tacType));
+ }
+ const int64_t expected = outputLen / valueSize;
+ if (nDecoded != expected) {
+ return arrow::Status::Invalid(
+ "TAC decompress ", typeName, " value count mismatch: expected ",
expected, " got ", nDecoded);
}
+ return inputLen;
Review Comment:
This introduces a stricter contract requiring `nDecoded == outputLen /
valueSize`. If callers pass an output buffer larger than the logical
uncompressed size, decompression will now fail even though decoding into a
larger buffer is safe and common. Either (a) document clearly that `outputLen`
must equal the exact uncompressed byte length for the stream, or (b) relax the
check to only require `nDecoded * valueSize <= outputLen` (and optionally
validate that the compressed stream contains exactly `nDecoded` values).
##########
cpp/core/utils/tac/ffor.hpp:
##########
@@ -496,5 +521,178 @@ inline size_t decompress64(const uint8_t* input, size_t
inputSize, uint64_t* out
return decompress64Impl<false, false>(input, inputSize, output);
}
+//
=============================================================================
+// 128-bit codec.
+//
+// Each 128-bit value occupies a 16B slot (lo at offset 0, hi at offset 8 --
+// the DECIMAL128 / __int128_t layout used by Velox). Per block, the lo and
+// hi halves are gathered into two stack scratches and each is fed through
+// the 64-bit FFOR encoder. Reads/writes go through native uint64, so the
+// codec is byte-order agnostic as long as producer and consumer agree.
+//
+// Wire format per block: [hdr][lo payload][hdr][hi payload]
+// followed by one tail block (kBwTailMarker) carrying the remaining 16B
+// values raw.
+//
=============================================================================
+
+inline constexpr size_t compress128Bound(size_t numValues) {
+ // Two 64-bit streams (lo + hi), worst case each =
compress64Bound(numValues).
+ return 2 * compress64Bound(numValues);
+}
Review Comment:
`compress128Bound()` is a very conservative bound (it effectively counts two
independent 64-bit tails/headers), and `FForCodec::compress128()` uses this
value to reject “too small” output buffers. This can significantly over-require
buffer space for small inputs (especially when `numValues < kLanes`, where the
128-bit format emits a single raw tail block). Consider computing a tighter
worst-case bound for the actual 128-bit wire format (two headers per full block
+ one tail header total + raw 16B/value payload worst-case) to reduce memory
overhead and avoid unnecessarily rejecting buffers that would fit the produced
output.
##########
cpp/core/utils/tac/FForCodec.cc:
##########
@@ -57,4 +58,46 @@ FForCodec::decompress(const uint8_t* input, int64_t
inputSize, uint8_t* output,
return static_cast<int64_t>(nDecoded);
}
+int64_t FForCodec::maxCompressedLength128(int64_t inputSize) {
+ if (inputSize % sizeof(__int128_t) != 0) {
+ return 0;
+ }
+ size_t numValues = inputSize / sizeof(__int128_t);
+ return static_cast<int64_t>(ffor::compress128Bound(numValues));
+}
+
+arrow::Result<int64_t>
+FForCodec::compress128(const uint8_t* input, int64_t inputSize, uint8_t*
output, int64_t outputSize) {
+ if (inputSize == 0) {
+ return 0;
+ }
+ if (inputSize % sizeof(__int128_t) != 0) {
+ return arrow::Status::Invalid("FForCodec: input size ", inputSize, " is
not a multiple of ", sizeof(__int128_t), ".");
+ }
+
+ size_t numValues = inputSize / sizeof(__int128_t);
+ auto maxLen = static_cast<int64_t>(ffor::compress128Bound(numValues));
+ if (outputSize < maxLen) {
+ return arrow::Status::Invalid(
+ "FForCodec: output buffer too small for 128-bit compression (need ",
+ maxLen, " bytes, got ", outputSize, ").");
+ }
+
+ auto written = ffor::compress128(input, numValues, output);
+ return static_cast<int64_t>(written);
+}
+
+arrow::Result<int64_t>
+FForCodec::decompress128(const uint8_t* input, int64_t inputSize, uint8_t*
output, int64_t outputSize) {
+ if (outputSize == 0) {
+ return 0;
+ }
+ if (outputSize % sizeof(__int128_t) != 0) {
+ return arrow::Status::Invalid("FForCodec: output size ", outputSize, " is
not a multiple of ", sizeof(__int128_t), ".");
+ }
+
+ auto nDecoded = ffor::decompress128(input, inputSize, output,
static_cast<size_t>(outputSize));
+ return static_cast<int64_t>(nDecoded);
+}
Review Comment:
`decompress128()` returns `0` when `outputSize == 0` without validating that
`input/inputSize` contains a well-formed empty 128-bit stream (e.g., at least a
tail header with count=0). This can cause truncated/corrupt compressed input to
be accepted as successful decompression in the empty-output case. Consider
removing the early return and delegating to `ffor::decompress128()` (which can
perform format validation), or explicitly validate that `inputSize` contains a
valid tail marker/header when `outputSize == 0`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]