brancz commented on code in PR #14:
URL: https://github.com/apache/arrow-js/pull/14#discussion_r2226003816


##########
src/ipc/writer.ts:
##########
@@ -251,34 +274,99 @@ export class RecordBatchWriter<T extends TypeMap = any> 
extends ReadableInterop<
     }
 
     protected _writeRecordBatch(batch: RecordBatch<T>) {
-        const { byteLength, nodes, bufferRegions, buffers } = 
VectorAssembler.assemble(batch);
-        const recordBatch = new metadata.RecordBatch(batch.numRows, nodes, 
bufferRegions);
+        const { byteLength, nodes, bufferRegions, buffers } = 
this._assembleRecordBatch(batch);
+        const recordBatch = new metadata.RecordBatch(batch.numRows, nodes, 
bufferRegions, this._compression);
         const message = Message.from(recordBatch, byteLength);
         return this
             ._writeDictionaries(batch)
             ._writeMessage(message)
             ._writeBodyBuffers(buffers);
     }
 
+    protected _assembleRecordBatch(batch: RecordBatch<T>) {
+        let { byteLength, nodes, bufferRegions, buffers } = 
VectorAssembler.assemble(batch);
+        if (this._compression != null) {
+            ({ byteLength, bufferRegions, buffers } = 
this._compressBodyBuffers(buffers));
+        }
+        return { byteLength, nodes, bufferRegions, buffers };
+    }
+
+    protected _compressBodyBuffers(buffers: ArrayBufferView[]) {
+        const codec = compressionRegistry.get(this._compression!.type!);
+
+        if (!codec?.encode || typeof codec.encode !== 'function') {
+            throw new Error(`Codec for compression type 
"${CompressionType[this._compression!.type!]}" has invalid encode method`);
+        }
+
+        let currentOffset = 0;
+        const compressedBuffers: ArrayBufferView[] = [];
+        const bufferRegions: metadata.BufferRegion[] = [];
+
+        for (const buffer of buffers) {
+            const byteBuf = toUint8Array(buffer);
+
+            if (byteBuf.length === 0) {
+                compressedBuffers.push(new Uint8Array(0), new Uint8Array(0));
+                bufferRegions.push(new metadata.BufferRegion(currentOffset, 
0));
+                continue;
+            }
+
+            const compressed = codec.encode(byteBuf);
+            const isCompressionEffective = compressed.length < byteBuf.length;
+
+            const finalBuffer = isCompressionEffective ? compressed : byteBuf;
+            const byteLength = isCompressionEffective ? finalBuffer.length : 
LENGTH_NO_COMPRESSED_DATA;
+
+            const lengthPrefix = new flatbuffers.ByteBuffer(new 
Uint8Array(COMPRESS_LENGTH_PREFIX));
+            lengthPrefix.writeInt64(0, BigInt(byteLength));
+
+            compressedBuffers.push(lengthPrefix.bytes(), new 
Uint8Array(finalBuffer));
+
+            const padding = ((currentOffset + 7) & ~7) - currentOffset;
+            currentOffset += padding;
+
+            const fullBodyLength = COMPRESS_LENGTH_PREFIX + finalBuffer.length;
+            bufferRegions.push(new metadata.BufferRegion(currentOffset, 
fullBodyLength));
+
+            currentOffset += fullBodyLength;
+        }
+        const finalPadding = ((currentOffset + 7) & ~7) - currentOffset;
+        currentOffset += finalPadding;
+
+        return { byteLength: currentOffset, bufferRegions, buffers: 
compressedBuffers };
+    }
+
     protected _writeDictionaryBatch(dictionary: Data, id: number, isDelta = 
false) {
         const { byteLength, nodes, bufferRegions, buffers } = 
VectorAssembler.assemble(new Vector([dictionary]));
-        const recordBatch = new metadata.RecordBatch(dictionary.length, nodes, 
bufferRegions);
+        const recordBatch = new metadata.RecordBatch(dictionary.length, nodes, 
bufferRegions, null);
         const dictionaryBatch = new metadata.DictionaryBatch(recordBatch, id, 
isDelta);
         const message = Message.from(dictionaryBatch, byteLength);
         return this
             ._writeMessage(message)
-            ._writeBodyBuffers(buffers);
+            ._writeBodyBuffers(buffers, "dictionary");
     }
 
-    protected _writeBodyBuffers(buffers: ArrayBufferView[]) {
-        let buffer: ArrayBufferView;
-        let size: number, padding: number;
-        for (let i = -1, n = buffers.length; ++i < n;) {
-            if ((buffer = buffers[i]) && (size = buffer.byteLength) > 0) {
-                this._write(buffer);
-                if ((padding = ((size + 7) & ~7) - size) > 0) {
-                    this._writePadding(padding);
-                }
+    protected _writeBodyBuffers(buffers: ArrayBufferView[], batchType: 
"record" | "dictionary" = "record") {

Review Comment:
   I don't think that's correct, where do you read that from that documentation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to