This is an automated email from the ASF dual-hosted git repository.

kou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-js.git


The following commit(s) were added to refs/heads/main by this push:
     new f203cfb  feat: Add custom metadata support for IPC messages and 
RecordBatch (#361)
f203cfb is described below

commit f203cfbaef82620633d0409dd1dc7e734195c788
Author: Rusty Conover <[email protected]>
AuthorDate: Fri Feb 20 20:27:20 2026 -0500

    feat: Add custom metadata support for IPC messages and RecordBatch (#361)
    
    ## What's Changed
    
    Decode and expose custom metadata from IPC message headers, propagating
    it through the reader to RecordBatch instances. This allows accessing
    per-batch metadata stored in Arrow IPC streams and files.
    
    Extend RecordBatchWriter to support writing custom metadata to IPC
    messages, similar to PyArrow's write_batch(batch, custom_metadata=...).
    
    Changes:
      - Update Message.from() to accept optional metadata parameter
      - Update Message.encode() to serialize custom metadata to FlatBuffers
      - Add customMetadata parameter to RecordBatchWriter.write()
      - Add mergeMetadata() helper that combines batch.metadata with the
        parameter (parameter takes precedence for duplicate keys)
      - Add comprehensive integration tests for write/read round-trip
    
    Usage:
      writer.write(batch, new Map([['key', 'value']]));
---
 src/ipc/metadata/message.ts                    |  40 +++-
 src/ipc/reader.ts                              |  12 +-
 src/ipc/writer.ts                              |   6 +-
 src/recordbatch.ts                             |  25 ++-
 test/data/test_message_metadata.arrow          | Bin 0 -> 7154 bytes
 test/unit/ipc/reader/message-metadata-tests.ts |  97 ++++++++++
 test/unit/ipc/writer/message-metadata-tests.ts | 255 +++++++++++++++++++++++++
 7 files changed, 416 insertions(+), 19 deletions(-)

diff --git a/src/ipc/metadata/message.ts b/src/ipc/metadata/message.ts
index b41ec4a..347b7c9 100644
--- a/src/ipc/metadata/message.ts
+++ b/src/ipc/metadata/message.ts
@@ -82,7 +82,8 @@ export class Message<T extends MessageHeader = any> {
         const bodyLength: bigint = _message.bodyLength()!;
         const version: MetadataVersion = _message.version();
         const headerType: MessageHeader = _message.headerType();
-        const message = new Message(bodyLength, version, headerType);
+        const metadata = decodeMessageCustomMetadata(_message);
+        const message = new Message(bodyLength, version, headerType, 
undefined, metadata);
         message._createHeader = decodeMessageHeader(_message, headerType);
         return message;
     }
@@ -98,11 +99,24 @@ export class Message<T extends MessageHeader = any> {
         } else if (message.isDictionaryBatch()) {
             headerOffset = DictionaryBatch.encode(b, message.header() as 
DictionaryBatch);
         }
+
+        // Encode custom metadata if present (must be done before startMessage)
+        const customMetadataOffset = !(message.metadata && 
message.metadata.size > 0) ? -1 :
+            _Message.createCustomMetadataVector(b, 
[...message.metadata].map(([k, v]) => {
+                const key = b.createString(`${k}`);
+                const val = b.createString(`${v}`);
+                _KeyValue.startKeyValue(b);
+                _KeyValue.addKey(b, key);
+                _KeyValue.addValue(b, val);
+                return _KeyValue.endKeyValue(b);
+            }));
+
         _Message.startMessage(b);
         _Message.addVersion(b, MetadataVersion.V5);
         _Message.addHeader(b, headerOffset);
         _Message.addHeaderType(b, message.headerType);
         _Message.addBodyLength(b, BigInt(message.bodyLength));
+        if (customMetadataOffset !== -1) { _Message.addCustomMetadata(b, 
customMetadataOffset); }
         _Message.finishMessageBuffer(b, _Message.endMessage(b));
         return b.asUint8Array();
     }
@@ -113,7 +127,7 @@ export class Message<T extends MessageHeader = any> {
             return new Message(0, MetadataVersion.V5, MessageHeader.Schema, 
header);
         }
         if (header instanceof RecordBatch) {
-            return new Message(bodyLength, MetadataVersion.V5, 
MessageHeader.RecordBatch, header);
+            return new Message(bodyLength, MetadataVersion.V5, 
MessageHeader.RecordBatch, header, header.metadata);
         }
         if (header instanceof DictionaryBatch) {
             return new Message(bodyLength, MetadataVersion.V5, 
MessageHeader.DictionaryBatch, header);
@@ -126,24 +140,27 @@ export class Message<T extends MessageHeader = any> {
     protected _bodyLength: number;
     protected _version: MetadataVersion;
     protected _compression: BodyCompression | null;
+    protected _metadata: Map<string, string>;
     public get type() { return this.headerType; }
     public get version() { return this._version; }
     public get headerType() { return this._headerType; }
     public get compression() { return this._compression; }
     public get bodyLength() { return this._bodyLength; }
+    public get metadata() { return this._metadata; }
     declare protected _createHeader: MessageHeaderDecoder;
     public header() { return this._createHeader<T>(); }
     public isSchema(): this is Message<MessageHeader.Schema> { return 
this.headerType === MessageHeader.Schema; }
     public isRecordBatch(): this is Message<MessageHeader.RecordBatch> { 
return this.headerType === MessageHeader.RecordBatch; }
     public isDictionaryBatch(): this is Message<MessageHeader.DictionaryBatch> 
{ return this.headerType === MessageHeader.DictionaryBatch; }
 
-    constructor(bodyLength: bigint | number, version: MetadataVersion, 
headerType: T, header?: any) {
+    constructor(bodyLength: bigint | number, version: MetadataVersion, 
headerType: T, header?: any, metadata?: Map<string, string>) {
         this._version = version;
         this._headerType = headerType;
         this.body = new Uint8Array(0);
         this._compression = header?.compression;
         header && (this._createHeader = () => header);
         this._bodyLength = bigIntToNumber(bodyLength);
+        this._metadata = metadata || new Map();
     }
 }
 
@@ -157,23 +174,27 @@ export class RecordBatch {
     protected _buffers: BufferRegion[];
     protected _compression: BodyCompression | null;
     protected _variadicBufferCounts: number[];
+    protected _metadata: Map<string, string>;
     public get nodes() { return this._nodes; }
     public get length() { return this._length; }
     public get buffers() { return this._buffers; }
     public get compression() { return this._compression; }
     public get variadicBufferCounts() { return this._variadicBufferCounts; }
+    public get metadata() { return this._metadata; }
     constructor(
         length: bigint | number,
         nodes: FieldNode[],
         buffers: BufferRegion[],
         compression: BodyCompression | null,
-        variadicBufferCounts: number[] = []
+        variadicBufferCounts: number[] = [],
+        metadata?: Map<string, string>
     ) {
         this._nodes = nodes;
         this._buffers = buffers;
         this._length = bigIntToNumber(length);
         this._compression = compression;
         this._variadicBufferCounts = variadicBufferCounts;
+        this._metadata = metadata || new Map();
     }
 }
 
@@ -468,6 +489,17 @@ function decodeCustomMetadata(parent?: _Schema | _Field | 
null) {
     return data;
 }
 
+/** @ignore */
+function decodeMessageCustomMetadata(message: _Message) {
+    const data = new Map<string, string>();
+    for (let entry, key, i = -1, n = 
Math.trunc(message.customMetadataLength()); ++i < n;) {
+        if ((entry = message.customMetadata(i)) && (key = entry.key()) != 
null) {
+            data.set(key, entry.value()!);
+        }
+    }
+    return data;
+}
+
 /** @ignore */
 function decodeIndexType(_type: _Int) {
     return new Int(_type.isSigned(), _type.bitWidth() as IntBitWidth);
diff --git a/src/ipc/reader.ts b/src/ipc/reader.ts
index af49f37..d0edafd 100644
--- a/src/ipc/reader.ts
+++ b/src/ipc/reader.ts
@@ -358,7 +358,7 @@ abstract class RecordBatchReaderImpl<T extends TypeMap = 
any> implements RecordB
         return this;
     }
 
-    protected _loadRecordBatch(header: metadata.RecordBatch, body: 
Uint8Array): RecordBatch<T> {
+    protected _loadRecordBatch(header: metadata.RecordBatch, body: Uint8Array, 
messageMetadata?: Map<string, string>): RecordBatch<T> {
         let children: Data<any>[];
         if (header.compression != null) {
             const codec = compressionRegistry.get(header.compression.type);
@@ -379,7 +379,7 @@ abstract class RecordBatchReaderImpl<T extends TypeMap = 
any> implements RecordB
         }
 
         const data = makeData({ type: new Struct(this.schema.fields), length: 
header.length, children });
-        return new RecordBatch(this.schema, data);
+        return new RecordBatch(this.schema, data, messageMetadata);
     }
 
     protected _loadDictionaryBatch(header: metadata.DictionaryBatch, body: 
Uint8Array) {
@@ -512,7 +512,7 @@ class RecordBatchStreamReaderImpl<T extends TypeMap = any> 
extends RecordBatchRe
                 this._recordBatchIndex++;
                 const header = message.header();
                 const buffer = reader.readMessageBody(message.bodyLength);
-                const recordBatch = this._loadRecordBatch(header, buffer);
+                const recordBatch = this._loadRecordBatch(header, buffer, 
message.metadata);
                 return { done: false, value: recordBatch };
             } else if (message.isDictionaryBatch()) {
                 this._dictionaryIndex++;
@@ -587,7 +587,7 @@ class AsyncRecordBatchStreamReaderImpl<T extends TypeMap = 
any> extends RecordBa
                 this._recordBatchIndex++;
                 const header = message.header();
                 const buffer = await 
reader.readMessageBody(message.bodyLength);
-                const recordBatch = this._loadRecordBatch(header, buffer);
+                const recordBatch = this._loadRecordBatch(header, buffer, 
message.metadata);
                 return { done: false, value: recordBatch };
             } else if (message.isDictionaryBatch()) {
                 this._dictionaryIndex++;
@@ -640,7 +640,7 @@ class RecordBatchFileReaderImpl<T extends TypeMap = any> 
extends RecordBatchStre
             if (message?.isRecordBatch()) {
                 const header = message.header();
                 const buffer = 
this._reader.readMessageBody(message.bodyLength);
-                const recordBatch = this._loadRecordBatch(header, buffer);
+                const recordBatch = this._loadRecordBatch(header, buffer, 
message.metadata);
                 return recordBatch;
             }
         }
@@ -714,7 +714,7 @@ class AsyncRecordBatchFileReaderImpl<T extends TypeMap = 
any> extends AsyncRecor
             if (message?.isRecordBatch()) {
                 const header = message.header();
                 const buffer = await 
this._reader.readMessageBody(message.bodyLength);
-                const recordBatch = this._loadRecordBatch(header, buffer);
+                const recordBatch = this._loadRecordBatch(header, buffer, 
message.metadata);
                 return recordBatch;
             }
         }
diff --git a/src/ipc/writer.ts b/src/ipc/writer.ts
index 0b13fdf..7d783eb 100644
--- a/src/ipc/writer.ts
+++ b/src/ipc/writer.ts
@@ -185,6 +185,9 @@ export class RecordBatchWriter<T extends TypeMap = any> 
extends ReadableInterop<
         return this;
     }
 
+    public write(payload?: Table<T> | RecordBatch<T> | 
Iterable<RecordBatch<T>> | null): void;
+    // Overload for UnderlyingSink compatibility (used by DOM streams)
+    public write(chunk: RecordBatch<T>, controller: 
WritableStreamDefaultController): void;
     public write(payload?: Table<T> | RecordBatch<T> | 
Iterable<RecordBatch<T>> | null) {
         let schema: Schema<T> | null = null;
 
@@ -275,7 +278,7 @@ export class RecordBatchWriter<T extends TypeMap = any> 
extends ReadableInterop<
 
     protected _writeRecordBatch(batch: RecordBatch<T>) {
         const { byteLength, nodes, bufferRegions, buffers, 
variadicBufferCounts } = this._assembleRecordBatch(batch);
-        const recordBatch = new metadata.RecordBatch(batch.numRows, nodes, 
bufferRegions, this._compression, variadicBufferCounts);
+        const recordBatch = new metadata.RecordBatch(batch.numRows, nodes, 
bufferRegions, this._compression, variadicBufferCounts, batch.metadata);
         const message = Message.from(recordBatch, byteLength);
         return this
             ._writeDictionaries(batch)
@@ -589,3 +592,4 @@ function recordBatchToJSON(records: RecordBatch) {
         'columns': columns
     }, null, 2);
 }
+
diff --git a/src/recordbatch.ts b/src/recordbatch.ts
index cb4dbf9..7cdc8b3 100644
--- a/src/recordbatch.ts
+++ b/src/recordbatch.ts
@@ -61,9 +61,10 @@ export class RecordBatch<T extends TypeMap = any> {
     declare public readonly [kRecordBatchSymbol]: true;
 
     constructor(columns: { [P in keyof T]: Data<T[P]> });
-    constructor(schema: Schema<T>, data?: Data<Struct<T>>);
+    constructor(schema: Schema<T>, data?: Data<Struct<T>>, metadata?: 
Map<string, string>);
     constructor(...args: any[]) {
         switch (args.length) {
+            case 3:
             case 2: {
                 [this.schema] = args;
                 if (!(this.schema instanceof Schema)) {
@@ -74,7 +75,8 @@ export class RecordBatch<T extends TypeMap = any> {
                         nullCount: 0,
                         type: new Struct<T>(this.schema.fields),
                         children: this.schema.fields.map((f) => makeData({ 
type: f.type, nullCount: 0 }))
-                    })
+                    }),
+                    this._metadata = new Map()
                 ] = args;
                 if (!(this.data instanceof Data)) {
                     throw new TypeError('RecordBatch constructor expects a 
[Schema, Data] pair.');
@@ -98,6 +100,7 @@ export class RecordBatch<T extends TypeMap = any> {
                 const schema = new Schema<T>(fields);
                 const data = makeData({ type: new Struct<T>(fields), length, 
children, nullCount: 0 });
                 [this.schema, this.data] = ensureSameLengthData<T>(schema, 
data.children as Data<T[keyof T]>[], length);
+                this._metadata = new Map();
                 break;
             }
             default: throw new TypeError('RecordBatch constructor expects an 
Object mapping names to child Data, or a [Schema, Data] pair.');
@@ -105,10 +108,16 @@ export class RecordBatch<T extends TypeMap = any> {
     }
 
     protected _dictionaries?: Map<number, Vector>;
+    protected _metadata: Map<string, string>;
 
     public readonly schema: Schema<T>;
     public readonly data: Data<Struct<T>>;
 
+    /**
+     * Custom metadata for this RecordBatch.
+     */
+    public get metadata() { return this._metadata; }
+
     public get dictionaries() {
         return this._dictionaries || (this._dictionaries = 
collectDictionaries(this.schema.fields, this.data.children));
     }
@@ -202,7 +211,7 @@ export class RecordBatch<T extends TypeMap = any> {
      */
     public slice(begin?: number, end?: number): RecordBatch<T> {
         const [slice] = new Vector([this.data]).slice(begin, end).data;
-        return new RecordBatch(this.schema, slice);
+        return new RecordBatch(this.schema, slice, this._metadata);
     }
 
     /**
@@ -254,7 +263,7 @@ export class RecordBatch<T extends TypeMap = any> {
             schema = new Schema(fields, new Map(this.schema.metadata));
             data = makeData({ type: new Struct<T>(fields), children });
         }
-        return new RecordBatch(schema, data);
+        return new RecordBatch(schema, data, this._metadata);
     }
 
     /**
@@ -273,7 +282,7 @@ export class RecordBatch<T extends TypeMap = any> {
                 children[index] = this.data.children[index] as Data<T[K]>;
             }
         }
-        return new RecordBatch(schema, makeData({ type, length: this.numRows, 
children }));
+        return new RecordBatch(schema, makeData({ type, length: this.numRows, 
children }), this._metadata);
     }
 
     /**
@@ -286,7 +295,7 @@ export class RecordBatch<T extends TypeMap = any> {
         const schema = this.schema.selectAt<K>(columnIndices);
         const children = columnIndices.map((i) => 
this.data.children[i]).filter(Boolean);
         const subset = makeData({ type: new Struct(schema.fields), length: 
this.numRows, children });
-        return new RecordBatch<{ [P in keyof K]: K[P] }>(schema, subset);
+        return new RecordBatch<{ [P in keyof K]: K[P] }>(schema, subset, 
this._metadata);
     }
 
     // Initialize this static property via an IIFE so bundlers don't tree-shake
@@ -369,9 +378,9 @@ function collectDictionaries(fields: Field[], children: 
readonly Data[], diction
  * @private
  */
 export class _InternalEmptyPlaceholderRecordBatch<T extends TypeMap = any> 
extends RecordBatch<T> {
-    constructor(schema: Schema<T>) {
+    constructor(schema: Schema<T>, metadata?: Map<string, string>) {
         const children = schema.fields.map((f) => makeData({ type: f.type }));
         const data = makeData({ type: new Struct<T>(schema.fields), nullCount: 
0, children });
-        super(schema, data);
+        super(schema, data, metadata || new Map());
     }
 }
diff --git a/test/data/test_message_metadata.arrow 
b/test/data/test_message_metadata.arrow
new file mode 100644
index 0000000..2dd9e82
Binary files /dev/null and b/test/data/test_message_metadata.arrow differ
diff --git a/test/unit/ipc/reader/message-metadata-tests.ts 
b/test/unit/ipc/reader/message-metadata-tests.ts
new file mode 100644
index 0000000..ecfc0a1
--- /dev/null
+++ b/test/unit/ipc/reader/message-metadata-tests.ts
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import { readFileSync } from 'node:fs';
+import path from 'node:path';
+import { tableFromIPC, RecordBatch } from 'apache-arrow';
+
+// Path to the test file with message-level metadata
+// Use process.cwd() since tests are run from project root
+const testFilePath = path.resolve(process.cwd(), 
'test/data/test_message_metadata.arrow');
+
+describe('RecordBatch message metadata', () => {
+    const buffer = readFileSync(testFilePath);
+    const table = tableFromIPC(buffer);
+
+    test('should read RecordBatch metadata from IPC file', () => {
+        expect(table.batches).toHaveLength(3);
+
+        for (let i = 0; i < table.batches.length; i++) {
+            const batch = table.batches[i];
+            expect(batch).toBeInstanceOf(RecordBatch);
+            expect(batch.metadata).toBeInstanceOf(Map);
+            expect(batch.metadata.size).toBeGreaterThan(0);
+
+            // Verify specific metadata keys exist
+            expect(batch.metadata.has('batch_index')).toBe(true);
+            expect(batch.metadata.has('batch_id')).toBe(true);
+            expect(batch.metadata.has('producer')).toBe(true);
+
+            // Verify batch_index matches the batch position
+            expect(batch.metadata.get('batch_index')).toBe(String(i));
+            
expect(batch.metadata.get('batch_id')).toBe(`batch_${String(i).padStart(4, 
'0')}`);
+        }
+    });
+
+    test('should read unicode metadata values', () => {
+        const batch = table.batches[0];
+        expect(batch.metadata.has('unicode_test')).toBe(true);
+        expect(batch.metadata.get('unicode_test')).toBe('Hello 世界 🌍 مرحبا');
+    });
+
+    test('should handle empty metadata values', () => {
+        const batch = table.batches[0];
+        expect(batch.metadata.has('optional_field')).toBe(true);
+        expect(batch.metadata.get('optional_field')).toBe('');
+    });
+
+    test('should read JSON metadata values', () => {
+        const batch = table.batches[0];
+        expect(batch.metadata.has('batch_info_json')).toBe(true);
+        const jsonStr = batch.metadata.get('batch_info_json')!;
+        const parsed = JSON.parse(jsonStr);
+        expect(parsed.batch_number).toBe(0);
+        expect(parsed.processing_stage).toBe('final');
+        expect(parsed.tags).toEqual(['validated', 'complete']);
+    });
+
+    describe('metadata preservation', () => {
+        test('should preserve metadata through slice()', () => {
+            const batch = table.batches[0];
+            const sliced = batch.slice(0, 2);
+            expect(sliced.metadata).toBeInstanceOf(Map);
+            expect(sliced.metadata.size).toBe(batch.metadata.size);
+            
expect(sliced.metadata.get('batch_index')).toBe(batch.metadata.get('batch_index'));
+        });
+
+        test('should preserve metadata through select()', () => {
+            const batch = table.batches[0];
+            const selected = batch.select(['id', 'name']);
+            expect(selected.metadata).toBeInstanceOf(Map);
+            expect(selected.metadata.size).toBe(batch.metadata.size);
+            
expect(selected.metadata.get('batch_index')).toBe(batch.metadata.get('batch_index'));
+        });
+
+        test('should preserve metadata through selectAt()', () => {
+            const batch = table.batches[0];
+            const selectedAt = batch.selectAt([0, 1]);
+            expect(selectedAt.metadata).toBeInstanceOf(Map);
+            expect(selectedAt.metadata.size).toBe(batch.metadata.size);
+            
expect(selectedAt.metadata.get('batch_index')).toBe(batch.metadata.get('batch_index'));
+        });
+    });
+});
diff --git a/test/unit/ipc/writer/message-metadata-tests.ts 
b/test/unit/ipc/writer/message-metadata-tests.ts
new file mode 100644
index 0000000..ccc800b
--- /dev/null
+++ b/test/unit/ipc/writer/message-metadata-tests.ts
@@ -0,0 +1,255 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import {
+    Field,
+    Int32,
+    makeData,
+    RecordBatch,
+    RecordBatchFileWriter,
+    RecordBatchStreamWriter,
+    Schema,
+    Struct,
+    tableFromIPC,
+    Utf8
+} from 'apache-arrow';
+
+describe('RecordBatch message metadata writing', () => {
+
+    // Helper to create a simple RecordBatch for testing
+    function createTestBatch(): RecordBatch {
+        const schema = new Schema([
+            new Field('id', new Int32()),
+            new Field('name', new Utf8())
+        ]);
+        const idData = makeData({ type: new Int32(), data: new Int32Array([1, 
2, 3]) });
+        const nameData = makeData({ type: new Utf8(), data: 
Buffer.from('foobarbaz'), valueOffsets: new Int32Array([0, 3, 6, 9]) });
+        const structData = makeData({
+            type: new Struct(schema.fields),
+            length: 3,
+            nullCount: 0,
+            children: [idData, nameData]
+        });
+        return new RecordBatch(schema, structData);
+    }
+
+    describe('Stream format round-trip', () => {
+        test('should write and read metadata via RecordBatchStreamWriter', () 
=> {
+            const batch = createTestBatch();
+            batch.metadata.set('batch_id', '123');
+            batch.metadata.set('source', 'test');
+
+            const writer = new RecordBatchStreamWriter();
+            writer.write(batch);
+            writer.finish();
+            const buffer = writer.toUint8Array(true);
+
+            const table = tableFromIPC(buffer);
+            expect(table.batches).toHaveLength(1);
+            expect(table.batches[0].metadata).toBeInstanceOf(Map);
+            expect(table.batches[0].metadata.get('batch_id')).toBe('123');
+            expect(table.batches[0].metadata.get('source')).toBe('test');
+        });
+
+        test('should write batch without metadata when none provided', () => {
+            const batch = createTestBatch();
+
+            const writer = new RecordBatchStreamWriter();
+            writer.write(batch);
+            writer.finish();
+            const buffer = writer.toUint8Array(true);
+
+            const table = tableFromIPC(buffer);
+            expect(table.batches).toHaveLength(1);
+            expect(table.batches[0].metadata).toBeInstanceOf(Map);
+            expect(table.batches[0].metadata.size).toBe(0);
+        });
+    });
+
+    describe('File format round-trip', () => {
+        test('should write and read metadata via RecordBatchFileWriter', () => 
{
+            const batch = createTestBatch();
+            batch.metadata.set('format', 'file');
+            batch.metadata.set('version', '1.0');
+
+            const writer = new RecordBatchFileWriter();
+            writer.write(batch);
+            writer.finish();
+            const buffer = writer.toUint8Array(true);
+
+            const table = tableFromIPC(buffer);
+            expect(table.batches).toHaveLength(1);
+            expect(table.batches[0].metadata.get('format')).toBe('file');
+            expect(table.batches[0].metadata.get('version')).toBe('1.0');
+        });
+    });
+
+    describe('Multiple batches with different metadata', () => {
+        test('should write multiple batches with different metadata', () => {
+            const writer = new RecordBatchStreamWriter();
+
+            const batch1 = createTestBatch();
+            batch1.metadata.set('batch_index', '0');
+            batch1.metadata.set('tag', 'first');
+
+            const batch2 = createTestBatch();
+            batch2.metadata.set('batch_index', '1');
+            batch2.metadata.set('tag', 'middle');
+
+            const batch3 = createTestBatch();
+            batch3.metadata.set('batch_index', '2');
+            batch3.metadata.set('tag', 'last');
+
+            writer.write(batch1);
+            writer.write(batch2);
+            writer.write(batch3);
+            writer.finish();
+
+            const table = tableFromIPC(writer.toUint8Array(true));
+            expect(table.batches).toHaveLength(3);
+            expect(table.batches[0].metadata.get('batch_index')).toBe('0');
+            expect(table.batches[0].metadata.get('tag')).toBe('first');
+            expect(table.batches[1].metadata.get('batch_index')).toBe('1');
+            expect(table.batches[1].metadata.get('tag')).toBe('middle');
+            expect(table.batches[2].metadata.get('batch_index')).toBe('2');
+            expect(table.batches[2].metadata.get('tag')).toBe('last');
+        });
+    });
+
+    describe('Metadata preservation through operations', () => {
+        test('should preserve metadata through slice after round-trip', () => {
+            const batch = createTestBatch();
+            batch.metadata.set('key', 'value');
+
+            const writer = new RecordBatchStreamWriter();
+            writer.write(batch);
+            writer.finish();
+
+            const table = tableFromIPC(writer.toUint8Array(true));
+            const sliced = table.batches[0].slice(0, 2);
+
+            expect(sliced.metadata.get('key')).toBe('value');
+        });
+
+        test('should preserve metadata through selectAt after round-trip', () 
=> {
+            const batch = createTestBatch();
+            batch.metadata.set('preserved', 'yes');
+
+            const writer = new RecordBatchStreamWriter();
+            writer.write(batch);
+            writer.finish();
+
+            const table = tableFromIPC(writer.toUint8Array(true));
+            const selected = table.batches[0].selectAt([0]);
+
+            expect(selected.metadata.get('preserved')).toBe('yes');
+        });
+    });
+
+    describe('Metadata from constructor', () => {
+        test('should use metadata passed to RecordBatch constructor', () => {
+            const schema = new Schema([new Field('id', new Int32())]);
+            const idData = makeData({ type: new Int32(), data: new 
Int32Array([1, 2, 3]) });
+            const structData = makeData({
+                type: new Struct(schema.fields),
+                length: 3,
+                nullCount: 0,
+                children: [idData]
+            });
+            const batch = new RecordBatch(schema, structData, new 
Map([['from_batch', 'value']]));
+
+            const writer = new RecordBatchStreamWriter();
+            writer.write(batch);
+            writer.finish();
+
+            const table = tableFromIPC(writer.toUint8Array(true));
+            expect(table.batches[0].metadata.get('from_batch')).toBe('value');
+        });
+    });
+
+    describe('Edge cases', () => {
+        test('should handle empty metadata map', () => {
+            const batch = createTestBatch();
+
+            const writer = new RecordBatchStreamWriter();
+            writer.write(batch);
+            writer.finish();
+
+            const table = tableFromIPC(writer.toUint8Array(true));
+            expect(table.batches[0].metadata.size).toBe(0);
+        });
+
+        test('should handle unicode keys and values', () => {
+            const batch = createTestBatch();
+            batch.metadata.set('日本語キー', 'Japanese key');
+            batch.metadata.set('emoji', '🎉🚀💻');
+            batch.metadata.set('mixed', 'Hello 世界 مرحبا');
+
+            const writer = new RecordBatchStreamWriter();
+            writer.write(batch);
+            writer.finish();
+
+            const table = tableFromIPC(writer.toUint8Array(true));
+            expect(table.batches[0].metadata.get('日本語キー')).toBe('Japanese 
key');
+            expect(table.batches[0].metadata.get('emoji')).toBe('🎉🚀💻');
+            expect(table.batches[0].metadata.get('mixed')).toBe('Hello 世界 
مرحبا');
+        });
+
+        test('should handle empty string values', () => {
+            const batch = createTestBatch();
+            batch.metadata.set('empty_value', '');
+            batch.metadata.set('normal', 'value');
+
+            const writer = new RecordBatchStreamWriter();
+            writer.write(batch);
+            writer.finish();
+
+            const table = tableFromIPC(writer.toUint8Array(true));
+            expect(table.batches[0].metadata.get('empty_value')).toBe('');
+            expect(table.batches[0].metadata.get('normal')).toBe('value');
+        });
+
+        test('should handle JSON string as metadata value', () => {
+            const batch = createTestBatch();
+            const jsonValue = JSON.stringify({ nested: { data: [1, 2, 3] }, 
flag: true });
+            batch.metadata.set('json_data', jsonValue);
+
+            const writer = new RecordBatchStreamWriter();
+            writer.write(batch);
+            writer.finish();
+
+            const table = tableFromIPC(writer.toUint8Array(true));
+            const retrieved = table.batches[0].metadata.get('json_data')!;
+            const parsed = JSON.parse(retrieved);
+            expect(parsed.nested.data).toEqual([1, 2, 3]);
+            expect(parsed.flag).toBe(true);
+        });
+
+        test('should handle long metadata values', () => {
+            const batch = createTestBatch();
+            const longValue = 'x'.repeat(10000);
+            batch.metadata.set('long_value', longValue);
+
+            const writer = new RecordBatchStreamWriter();
+            writer.write(batch);
+            writer.finish();
+
+            const table = tableFromIPC(writer.toUint8Array(true));
+            
expect(table.batches[0].metadata.get('long_value')).toBe(longValue);
+        });
+    });
+});

Reply via email to