Djjanks commented on code in PR #14:
URL: https://github.com/apache/arrow-js/pull/14#discussion_r2324604859
##########
src/ipc/writer.ts:
##########
@@ -251,34 +274,99 @@ export class RecordBatchWriter<T extends TypeMap = any>
extends ReadableInterop<
}
protected _writeRecordBatch(batch: RecordBatch<T>) {
- const { byteLength, nodes, bufferRegions, buffers } =
VectorAssembler.assemble(batch);
- const recordBatch = new metadata.RecordBatch(batch.numRows, nodes,
bufferRegions);
+ const { byteLength, nodes, bufferRegions, buffers } =
this._assembleRecordBatch(batch);
+ const recordBatch = new metadata.RecordBatch(batch.numRows, nodes,
bufferRegions, this._compression);
const message = Message.from(recordBatch, byteLength);
return this
._writeDictionaries(batch)
._writeMessage(message)
._writeBodyBuffers(buffers);
}
+ protected _assembleRecordBatch(batch: RecordBatch<T>) {
+ let { byteLength, nodes, bufferRegions, buffers } =
VectorAssembler.assemble(batch);
+ if (this._compression != null) {
+ ({ byteLength, bufferRegions, buffers } =
this._compressBodyBuffers(buffers));
+ }
+ return { byteLength, nodes, bufferRegions, buffers };
+ }
+
+ protected _compressBodyBuffers(buffers: ArrayBufferView[]) {
+ const codec = compressionRegistry.get(this._compression!.type!);
+
+ if (!codec?.encode || typeof codec.encode !== 'function') {
+ throw new Error(`Codec for compression type
"${CompressionType[this._compression!.type!]}" has invalid encode method`);
+ }
+
+ let currentOffset = 0;
+ const compressedBuffers: ArrayBufferView[] = [];
+ const bufferRegions: metadata.BufferRegion[] = [];
+
+ for (const buffer of buffers) {
+ const byteBuf = toUint8Array(buffer);
+
+ if (byteBuf.length === 0) {
+ compressedBuffers.push(new Uint8Array(0), new Uint8Array(0));
+ bufferRegions.push(new metadata.BufferRegion(currentOffset,
0));
+ continue;
+ }
+
+ const compressed = codec.encode(byteBuf);
+ const isCompressionEffective = compressed.length < byteBuf.length;
+
+ const finalBuffer = isCompressionEffective ? compressed : byteBuf;
+ const byteLength = isCompressionEffective ? finalBuffer.length :
LENGTH_NO_COMPRESSED_DATA;
+
+ const lengthPrefix = new flatbuffers.ByteBuffer(new
Uint8Array(COMPRESS_LENGTH_PREFIX));
+ lengthPrefix.writeInt64(0, BigInt(byteLength));
+
+ compressedBuffers.push(lengthPrefix.bytes(), new
Uint8Array(finalBuffer));
+
+ const padding = ((currentOffset + 7) & ~7) - currentOffset;
+ currentOffset += padding;
+
+ const fullBodyLength = COMPRESS_LENGTH_PREFIX + finalBuffer.length;
+ bufferRegions.push(new metadata.BufferRegion(currentOffset,
fullBodyLength));
+
+ currentOffset += fullBodyLength;
+ }
+ const finalPadding = ((currentOffset + 7) & ~7) - currentOffset;
+ currentOffset += finalPadding;
+
+ return { byteLength: currentOffset, bufferRegions, buffers:
compressedBuffers };
+ }
+
protected _writeDictionaryBatch(dictionary: Data, id: number, isDelta =
false) {
const { byteLength, nodes, bufferRegions, buffers } =
VectorAssembler.assemble(new Vector([dictionary]));
- const recordBatch = new metadata.RecordBatch(dictionary.length, nodes,
bufferRegions);
+ const recordBatch = new metadata.RecordBatch(dictionary.length, nodes,
bufferRegions, null);
const dictionaryBatch = new metadata.DictionaryBatch(recordBatch, id,
isDelta);
const message = Message.from(dictionaryBatch, byteLength);
return this
._writeMessage(message)
- ._writeBodyBuffers(buffers);
+ ._writeBodyBuffers(buffers, "dictionary");
}
- protected _writeBodyBuffers(buffers: ArrayBufferView[]) {
- let buffer: ArrayBufferView;
- let size: number, padding: number;
- for (let i = -1, n = buffers.length; ++i < n;) {
- if ((buffer = buffers[i]) && (size = buffer.byteLength) > 0) {
- this._write(buffer);
- if ((padding = ((size + 7) & ~7) - size) > 0) {
- this._writePadding(padding);
- }
+ protected _writeBodyBuffers(buffers: ArrayBufferView[], batchType:
"record" | "dictionary" = "record") {
Review Comment:
The compression section of the documentation focuses on record batches, but
it doesn't specifically mention that dictionary batches should also be
compressed. However, I agree that, logically, dictionary compression should be
included.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]