stephnom commented on code in PR #14:
URL: https://github.com/apache/arrow-js/pull/14#discussion_r2323204132


##########
src/ipc/reader.ts:
##########
@@ -354,12 +358,31 @@ abstract class RecordBatchReaderImpl<T extends TypeMap = 
any> implements RecordB
         return this;
     }
 
-    protected _loadRecordBatch(header: metadata.RecordBatch, body: any) {
-        const children = this._loadVectors(header, body, this.schema.fields);
+    protected _loadRecordBatch(header: metadata.RecordBatch, body: 
Uint8Array): RecordBatch<T> {
+        let children: Data<any>[];
+        if (header.compression != null) {
+            const codec = compressionRegistry.get(header.compression.type);
+            if (codec?.decode && typeof codec.decode === 'function') {
+                const { decommpressedBody, buffers } = 
this._decompressBuffers(header, body, codec);
+                children = this._loadCompressedVectors(header, 
decommpressedBody, this.schema.fields);
+                header = new metadata.RecordBatch(
+                    header.length,
+                    header.nodes,
+                    buffers,
+                    null
+                );
+            } else {
+                throw new Error('Record batch is compressed but codec not 
found');
+            }
+        } else {
+            children = this._loadVectors(header, body, this.schema.fields);
+        }
+
         const data = makeData({ type: new Struct(this.schema.fields), length: 
header.length, children });
         return new RecordBatch(this.schema, data);
     }
-    protected _loadDictionaryBatch(header: metadata.DictionaryBatch, body: 
any) {
+
+    protected _loadDictionaryBatch(header: metadata.DictionaryBatch, body: 
Uint8Array) {

Review Comment:
   I've been trying this PR at my company using zstd encoding, and can confirm 
that dictionary batches are in fact compressed as well.
   
   I needed to add the following lines for the zstd decompression to work fully 
for dictionary vectors:
   
   ```
   let data: Data<any>[];
           if (header.data.compression != null) {
               const codec = 
compressionRegistry.get(header.data.compression.type);
               if (codec?.decode && typeof codec.decode === 'function') {
                   const { decommpressedBody, buffers } = 
this._decompressBuffers(header.data, body, codec);
                   data = this._loadCompressedVectors(header.data, 
decommpressedBody, [type]);
                   header = new metadata.DictionaryBatch(new 
metadata.RecordBatch(
                       header.data.length,
                       header.data.nodes,
                       buffers,
                       null
                   ), id, isDelta)
               } else {
                   throw new Error('Dictionary batch is compressed but codec 
not found');
               }
           } else {
               data = this._loadVectors(header.data, body, [type]);
           }
   ```
   
   otherwise this PR has been working great as-is at scale for us!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to