hhr293 commented on code in PR #12299:
URL: https://github.com/apache/gluten/pull/12299#discussion_r3457187599
##########
cpp/core/utils/tac/ffor.hpp:
##########
@@ -418,82 +452,245 @@ inline size_t compress64(const uint64_t* input, size_t
num, uint8_t* output) {
}
// Template-based decompress with alignment dispatch.
+// decodeBlock requires aligned uint64_t* input; when the caller's input
+// buffer is unaligned, we stage through tmpIn and memcpy in.
template <bool InAligned, bool OutAligned>
-inline size_t decompress64Impl(const uint8_t* input, size_t inputSize,
uint64_t* output) {
- alignas(64) uint64_t tmpIn[kMaxValuesPerBlock];
+inline size_t decompress64Impl(const uint8_t* input, size_t inputSize,
uint64_t* output, size_t outputSize) {
+ alignas(64) uint64_t tmpIn[kMaxValuesPerBlock + 2];
alignas(64) uint64_t tmpOut[kMaxValuesPerBlock];
const uint8_t* inPtr = input;
const uint8_t* inEnd = input + inputSize;
+ const size_t outValuesMax = outputSize / sizeof(uint64_t);
size_t nDecoded = 0;
while (inPtr + kHeaderSize <= inEnd) {
- uint8_t bw;
- uint8_t count;
- uint64_t base;
- readHeader(inPtr, bw, count, base);
- inPtr += kHeaderSize;
-
- if (bw == kBwTailMarker) {
- if (count > 0) {
- // memcpy handles any alignment, no special case needed.
- std::memcpy(reinterpret_cast<uint8_t*>(output) + nDecoded *
sizeof(uint64_t), inPtr, count * sizeof(uint64_t));
+ if (inPtr[0] == kBwTailMarker) {
+ const uint8_t count = inPtr[1];
+ inPtr += kHeaderSize;
+ const size_t tailBytes = static_cast<size_t>(count) * sizeof(uint64_t);
+ if (count > 0 && inPtr + tailBytes <= inEnd && count <= outValuesMax -
nDecoded) {
+ std::memcpy(reinterpret_cast<uint8_t*>(output) + nDecoded *
sizeof(uint64_t), inPtr, tailBytes);
nDecoded += count;
}
break;
}
-
- size_t blockVals = static_cast<size_t>(count) * kLanes;
- size_t compBytes = compressedWords(blockVals, bw) * sizeof(uint64_t);
-
- if (inPtr + compBytes > inEnd) {
+ const size_t blockVals = static_cast<size_t>(inPtr[1]) * kLanes;
+ if (blockVals == 0 || blockVals > kMaxValuesPerBlock || blockVals >
outValuesMax - nDecoded) {
break;
}
+ const size_t remaining = static_cast<size_t>(inEnd - inPtr);
+ uint64_t* decDst = OutAligned ? output + nDecoded : tmpOut;
- // Decode: pick aligned src/dst.
- const uint64_t* decIn;
+ size_t consumed;
if constexpr (InAligned) {
- decIn = reinterpret_cast<const uint64_t*>(inPtr);
+ consumed = decodeBlock(reinterpret_cast<const uint64_t*>(inPtr),
remaining, blockVals, decDst);
} else {
- std::memcpy(tmpIn, inPtr, compBytes);
- decIn = tmpIn;
+ const size_t n = std::min(remaining, sizeof(tmpIn));
+ std::memcpy(tmpIn, inPtr, n);
+ consumed = decodeBlock(tmpIn, n, blockVals, decDst);
}
-
- uint64_t* decOut;
- if constexpr (OutAligned) {
- decOut = output + nDecoded;
- } else {
- decOut = tmpOut;
+ if (consumed == 0) {
+ break;
}
-
- decodeRt(decIn, decOut, base, blockVals, bw);
+ inPtr += consumed;
if constexpr (!OutAligned) {
std::memcpy(
reinterpret_cast<uint8_t*>(output) + nDecoded * sizeof(uint64_t),
tmpOut, blockVals * sizeof(uint64_t));
}
-
- inPtr += compBytes;
nDecoded += blockVals;
}
return nDecoded;
}
// Runtime dispatch.
-inline size_t decompress64(const uint8_t* input, size_t inputSize, uint64_t*
output) {
+inline size_t decompress64(const uint8_t* input, size_t inputSize, uint64_t*
output, size_t outputSize) {
bool inOk = (reinterpret_cast<uintptr_t>(input) % alignof(uint64_t) == 0);
bool outOk = (reinterpret_cast<uintptr_t>(output) % alignof(uint64_t) == 0);
if (inOk && outOk) {
- return decompress64Impl<true, true>(input, inputSize, output);
+ return decompress64Impl<true, true>(input, inputSize, output, outputSize);
+ }
+ if (inOk && !outOk) {
+ return decompress64Impl<true, false>(input, inputSize, output, outputSize);
+ }
+ if (!inOk && outOk) {
+ return decompress64Impl<false, true>(input, inputSize, output, outputSize);
+ }
+ return decompress64Impl<false, false>(input, inputSize, output, outputSize);
+}
+
+//
=============================================================================
+// 128-bit codec.
+//
+// Each 128-bit value occupies a 16B slot (lo at offset 0, hi at offset 8 --
+// the DECIMAL128 / __int128_t layout used by Velox). Per block, the lo and
+// hi halves are gathered into two stack scratches and each is fed through
+// the 64-bit FFOR encoder. Reads/writes go through native uint64, so the
+// codec is byte-order agnostic as long as producer and consumer agree.
+//
+// Wire format per block: [hdr][lo payload][hdr][hi payload]
+// followed by one tail block (kBwTailMarker) carrying the remaining 16B
+// values raw.
+//
=============================================================================
+
+inline constexpr size_t compress128Bound(size_t numValues) {
+ // Two 64-bit streams (lo + hi), worst case each =
compress64Bound(numValues).
+ return 2 * compress64Bound(numValues);
+}
+
+// 128-bit compress. See encodeBlock for InAligned/OutAligned semantics.
+template <bool InAligned, bool OutAligned>
+inline size_t compress128Impl(const uint8_t* input, size_t numValues, uint8_t*
output) {
+ alignas(64) uint64_t loBuffer[kMaxValuesPerBlock];
+ alignas(64) uint64_t hiBuffer[kMaxValuesPerBlock];
+ alignas(64) uint64_t tmpIn[kMaxValuesPerBlock * 2];
+ alignas(64) uint64_t tmpOut[kMaxValuesPerBlock + 2]; // header(2 words) +
payload
+
+ uint8_t* outPtr = output;
+ size_t remaining = numValues;
+ const uint8_t* inPtr = input;
+
+ while (remaining >= kLanes) {
+ size_t blockVals = remaining - (remaining % kLanes);
+ if (blockVals > kMaxValuesPerBlock) {
+ blockVals = kMaxValuesPerBlock;
+ }
+
+ const uint64_t* in64;
+ if constexpr (InAligned) {
+ in64 = reinterpret_cast<const uint64_t*>(inPtr);
+ } else {
+ std::memcpy(tmpIn, inPtr, blockVals * sizeof(__int128_t));
+ in64 = tmpIn;
+ }
+ for (size_t j = 0; j < blockVals; ++j) {
+ loBuffer[j] = in64[j * 2];
+ hiBuffer[j] = in64[j * 2 + 1];
+ }
+
+ if constexpr (OutAligned) {
+ outPtr += encodeBlock(loBuffer, blockVals,
reinterpret_cast<uint64_t*>(outPtr));
+ outPtr += encodeBlock(hiBuffer, blockVals,
reinterpret_cast<uint64_t*>(outPtr));
+ } else {
+ size_t n = encodeBlock(loBuffer, blockVals, tmpOut);
+ std::memcpy(outPtr, tmpOut, n);
+ outPtr += n;
+ n = encodeBlock(hiBuffer, blockVals, tmpOut);
+ std::memcpy(outPtr, tmpOut, n);
+ outPtr += n;
+ }
+
+ inPtr += blockVals * sizeof(__int128_t);
+ remaining -= blockVals;
+ }
+
+ // Tail: one header + remaining values copied raw.
+ writeHeader(outPtr, kBwTailMarker, static_cast<uint8_t>(remaining), 0);
+ outPtr += kHeaderSize;
+ if (remaining > 0) {
+ std::memcpy(outPtr, inPtr, remaining * sizeof(__int128_t));
+ outPtr += remaining * sizeof(__int128_t);
+ }
+ return static_cast<size_t>(outPtr - output);
+}
+
+// Runtime dispatch — check alignment once, pick the right template.
+inline size_t compress128(const uint8_t* input, size_t numValues, uint8_t*
output) {
+ const bool inOk = (reinterpret_cast<uintptr_t>(input) % alignof(uint64_t) ==
0);
+ const bool outOk = (reinterpret_cast<uintptr_t>(output) % alignof(uint64_t)
== 0);
+ if (inOk && outOk) {
+ return compress128Impl<true, true>(input, numValues, output);
+ }
+ if (inOk && !outOk) {
+ return compress128Impl<true, false>(input, numValues, output);
+ }
+ if (!inOk && outOk) {
+ return compress128Impl<false, true>(input, numValues, output);
+ }
+ return compress128Impl<false, false>(input, numValues, output);
+}
+
+// 128-bit decompress. decodeBlock requires aligned uint64_t* input; when
+// the caller's input buffer is unaligned, we stage through tmpIn.
+template <bool InAligned, bool OutAligned>
+inline size_t decompress128Impl(const uint8_t* input, size_t inputSize,
uint8_t* output, size_t outputSize) {
+ alignas(64) uint64_t loBuffer[kMaxValuesPerBlock];
+ alignas(64) uint64_t hiBuffer[kMaxValuesPerBlock];
+ alignas(64) uint64_t tmpIn[kMaxValuesPerBlock + 2];
+ alignas(64) uint64_t tmpOut[kMaxValuesPerBlock * 2];
+
+ const uint8_t* inPtr = input;
+ const uint8_t* inEnd = input + inputSize;
+ const size_t outValuesMax = outputSize / sizeof(__int128_t);
+ size_t nDecoded = 0;
+
+ while (inPtr + kHeaderSize <= inEnd) {
+ if (inPtr[0] == kBwTailMarker) {
+ const uint8_t count = inPtr[1];
+ inPtr += kHeaderSize;
+ const size_t tailBytes = static_cast<size_t>(count) * sizeof(__int128_t);
+ if (inPtr + tailBytes > inEnd || count > outValuesMax - nDecoded) {
+ break;
+ }
+ std::memcpy(output + nDecoded * sizeof(__int128_t), inPtr, tailBytes);
+ nDecoded += count;
+ break;
+ }
Review Comment:
OOB is already closed by count > outValuesMax - nDecoded — any count is
memory-safe. The tail path is a raw memcpy, not a parse, so count >= kLanes is
well-defined to decode. The < kLanes invariant lives on the encoder side (while
(remaining >= kLanes)), and the existing TypeAwareCompressCodec value-count
check already surfaces malformed streams to the caller. Adding the check just
moves which line the early-break triggers on without catching anything new.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]