trxcllnt commented on code in PR #361:
URL: https://github.com/apache/arrow-js/pull/361#discussion_r2692616809


##########
src/ipc/metadata/message.ts:
##########
@@ -98,22 +99,35 @@ export class Message<T extends MessageHeader = any> {
         } else if (message.isDictionaryBatch()) {
             headerOffset = DictionaryBatch.encode(b, message.header() as 
DictionaryBatch);
         }
+
+        // Encode custom metadata if present (must be done before startMessage)
+        const customMetadataOffset = !(message.metadata && 
message.metadata.size > 0) ? -1 :
+            _Message.createCustomMetadataVector(b, 
[...message.metadata].map(([k, v]) => {
+                const key = b.createString(`${k}`);
+                const val = b.createString(`${v}`);
+                _KeyValue.startKeyValue(b);
+                _KeyValue.addKey(b, key);
+                _KeyValue.addValue(b, val);
+                return _KeyValue.endKeyValue(b);
+            }));
+
         _Message.startMessage(b);
         _Message.addVersion(b, MetadataVersion.V5);
         _Message.addHeader(b, headerOffset);
         _Message.addHeaderType(b, message.headerType);
         _Message.addBodyLength(b, BigInt(message.bodyLength));
+        if (customMetadataOffset !== -1) { _Message.addCustomMetadata(b, 
customMetadataOffset); }
         _Message.finishMessageBuffer(b, _Message.endMessage(b));
         return b.asUint8Array();
     }
 
     /** @nocollapse */
-    public static from(header: Schema | RecordBatch | DictionaryBatch, 
bodyLength = 0) {
+    public static from(header: Schema | RecordBatch | DictionaryBatch, 
bodyLength = 0, metadata?: Map<string, string>) {
         if (header instanceof Schema) {
             return new Message(0, MetadataVersion.V5, MessageHeader.Schema, 
header);
         }
         if (header instanceof RecordBatch) {
-            return new Message(bodyLength, MetadataVersion.V5, 
MessageHeader.RecordBatch, header);
+            return new Message(bodyLength, MetadataVersion.V5, 
MessageHeader.RecordBatch, header, metadata);

Review Comment:
   ```suggestion
               return new Message(bodyLength, MetadataVersion.V5, 
MessageHeader.RecordBatch, header, header.metadata);
   ```



##########
test/unit/ipc/writer/message-metadata-tests.ts:
##########
@@ -0,0 +1,287 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import {
+    Field,
+    Int32,
+    makeData,
+    RecordBatch,
+    RecordBatchFileWriter,
+    RecordBatchStreamWriter,
+    Schema,
+    Struct,
+    tableFromIPC,
+    Utf8
+} from 'apache-arrow';
+
+describe('RecordBatch message metadata writing', () => {
+
+    // Helper to create a simple RecordBatch for testing
+    function createTestBatch(): RecordBatch {
+        const schema = new Schema([
+            new Field('id', new Int32()),
+            new Field('name', new Utf8())
+        ]);
+        const idData = makeData({ type: new Int32(), data: new Int32Array([1, 
2, 3]) });
+        const nameData = makeData({ type: new Utf8(), data: 
Buffer.from('foobarbaz'), valueOffsets: new Int32Array([0, 3, 6, 9]) });
+        const structData = makeData({
+            type: new Struct(schema.fields),
+            length: 3,
+            nullCount: 0,
+            children: [idData, nameData]
+        });
+        return new RecordBatch(schema, structData);
+    }
+
+    describe('Stream format round-trip', () => {
+        test('should write and read metadata via RecordBatchStreamWriter', () 
=> {
+            const batch = createTestBatch();
+            const metadata = new Map([['batch_id', '123'], ['source', 
'test']]);
+
+            const writer = new RecordBatchStreamWriter();
+            writer.write(batch, metadata);

Review Comment:
   Then we can do this:
   ```suggestion
               const batch = createTestBatch();
               batch.metadata.set('batch_id', '123');
               batch.metadata.set('source', 'test');
   
               const writer = new RecordBatchStreamWriter();
               writer.write(batch);
   ```
   
   Or this:
   ```suggestion
               const batch = createTestBatch();
               const clone = batch.slice();
               clone.metadata.set('batch_id', '123');
               clone.metadata.set('source', 'test');
   
               const writer = new RecordBatchStreamWriter();
               writer.write(clone);
   ```
   
   Or this:
   ```suggestion
               const batch = createTestBatch();
   
               const writer = new RecordBatchStreamWriter();
               writer.write(batch.mergeMetadata(new Map([
                 ['batch_id', '123'],
                 ['source', 'test']
               ])));
   ```
   



##########
src/ipc/metadata/message.ts:
##########
@@ -98,22 +99,35 @@ export class Message<T extends MessageHeader = any> {
         } else if (message.isDictionaryBatch()) {
             headerOffset = DictionaryBatch.encode(b, message.header() as 
DictionaryBatch);
         }
+
+        // Encode custom metadata if present (must be done before startMessage)
+        const customMetadataOffset = !(message.metadata && 
message.metadata.size > 0) ? -1 :
+            _Message.createCustomMetadataVector(b, 
[...message.metadata].map(([k, v]) => {
+                const key = b.createString(`${k}`);
+                const val = b.createString(`${v}`);
+                _KeyValue.startKeyValue(b);
+                _KeyValue.addKey(b, key);
+                _KeyValue.addValue(b, val);
+                return _KeyValue.endKeyValue(b);
+            }));
+
         _Message.startMessage(b);
         _Message.addVersion(b, MetadataVersion.V5);
         _Message.addHeader(b, headerOffset);
         _Message.addHeaderType(b, message.headerType);
         _Message.addBodyLength(b, BigInt(message.bodyLength));
+        if (customMetadataOffset !== -1) { _Message.addCustomMetadata(b, 
customMetadataOffset); }
         _Message.finishMessageBuffer(b, _Message.endMessage(b));
         return b.asUint8Array();
     }
 
     /** @nocollapse */
-    public static from(header: Schema | RecordBatch | DictionaryBatch, 
bodyLength = 0) {
+    public static from(header: Schema | RecordBatch | DictionaryBatch, 
bodyLength = 0, metadata?: Map<string, string>) {

Review Comment:
   Since the second metadata argument is only relevant if we're serializing a 
RecordBatch message, can we just use its metadata field instead?
   
   ```suggestion
       public static from(header: Schema | RecordBatch | DictionaryBatch, 
bodyLength = 0) {
   ```



##########
src/ipc/writer.ts:
##########


Review Comment:
   If the Message reads the metadata from the RecordBatch, this can all be 
simplified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to