This is an automated email from the ASF dual-hosted git repository.

kevingurney pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8b75373d0f GH-44923: [MATLAB] Add IPC `RecordBatchStreamReader` MATLAB 
class (#45068)
8b75373d0f is described below

commit 8b75373d0f344a69057fedc38ac1ceafb9b6b734
Author: Kevin Gurney <[email protected]>
AuthorDate: Mon Dec 23 15:53:11 2024 -0500

    GH-44923: [MATLAB] Add IPC `RecordBatchStreamReader` MATLAB class (#45068)
    
    ### Rationale for this change
    
    To enable support for the IPC Streaming format in the MATLAB interface, we 
should add a `RecordBatchStreamReader` class.
    
    This is a followup to #44922
    
    ### What changes are included in this PR?
    
    1. Added a new `arrow.io.ipc.RecordBatchStreamReader` MATLAB class.
    
    ### Are these changes tested?
    
    Yes.
    
    1. Added new MATLAB test suite 
`arrow/matlab/test/arrow/io/ipc/tRecordBatchStreamReader.m`.
    
    ### Are there any user-facing changes?
    
    Yes.
    
    1. Users can now create `arrow.io.ipc.RecordBatchStreamReader` objects to 
read `RecordBatch` objects incrementally from an Arrow IPC Stream file.
    
    ### Notes
    
    1. Thank you @ sgilmore10 for your help with this pull request!
    * GitHub Issue: #44923
    
    Lead-authored-by: Kevin Gurney <[email protected]>
    Co-authored-by: Kevin Gurney <[email protected]>
    Co-authored-by: Sarah Gilmore <[email protected]>
    Signed-off-by: Kevin Gurney <[email protected]>
---
 matlab/src/cpp/arrow/matlab/error/error.h          |   2 +
 .../io/ipc/proxy/record_batch_stream_reader.cc     | 154 ++++++++++
 .../io/ipc/proxy/record_batch_stream_reader.h      |  44 +++
 matlab/src/cpp/arrow/matlab/proxy/factory.cc       |   2 +
 .../+arrow/+io/+ipc/RecordBatchStreamReader.m      |  83 +++++
 .../test/arrow/io/ipc/tRecordBatchStreamReader.m   | 336 +++++++++++++++++++++
 matlab/tools/cmake/BuildMatlabArrowInterface.cmake |   1 +
 7 files changed, 622 insertions(+)

diff --git a/matlab/src/cpp/arrow/matlab/error/error.h 
b/matlab/src/cpp/arrow/matlab/error/error.h
index e5a5df6f4b..425e089d9f 100644
--- a/matlab/src/cpp/arrow/matlab/error/error.h
+++ b/matlab/src/cpp/arrow/matlab/error/error.h
@@ -249,5 +249,7 @@ static const char* IPC_RECORD_BATCH_READER_OPEN_FAILED =
     "arrow:io:ipc:FailedToOpenRecordBatchReader";
 static const char* IPC_RECORD_BATCH_READ_INVALID_INDEX = 
"arrow:io:ipc:InvalidIndex";
 static const char* IPC_RECORD_BATCH_READ_FAILED = "arrow:io:ipc:ReadFailed";
+static const char* IPC_TABLE_READ_FAILED = "arrow:io:ipc:TableReadFailed";
+static const char* IPC_END_OF_STREAM = "arrow:io:ipc:EndOfStream";
 
 }  // namespace arrow::matlab::error
diff --git 
a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc 
b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc
new file mode 100644
index 0000000000..f3c833484d
--- /dev/null
+++ b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc
@@ -0,0 +1,154 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h"
+#include "arrow/io/file.h"
+#include "arrow/matlab/error/error.h"
+#include "arrow/matlab/tabular/proxy/record_batch.h"
+#include "arrow/matlab/tabular/proxy/schema.h"
+#include "arrow/matlab/tabular/proxy/table.h"
+#include "arrow/util/utf8.h"
+
+#include "libmexclass/proxy/ProxyManager.h"
+
+namespace arrow::matlab::io::ipc::proxy {
+
+RecordBatchStreamReader::RecordBatchStreamReader(
+    const std::shared_ptr<arrow::ipc::RecordBatchStreamReader> reader)
+    : reader{std::move(reader)} {
+  REGISTER_METHOD(RecordBatchStreamReader, getSchema);
+  REGISTER_METHOD(RecordBatchStreamReader, readRecordBatch);
+  REGISTER_METHOD(RecordBatchStreamReader, hasNextRecordBatch);
+  REGISTER_METHOD(RecordBatchStreamReader, readTable);
+}
+
+libmexclass::proxy::MakeResult RecordBatchStreamReader::make(
+    const libmexclass::proxy::FunctionArguments& constructor_arguments) {
+  namespace mda = ::matlab::data;
+  using RecordBatchStreamReaderProxy =
+      arrow::matlab::io::ipc::proxy::RecordBatchStreamReader;
+
+  const mda::StructArray opts = constructor_arguments[0];
+
+  const mda::StringArray filename_mda = opts[0]["Filename"];
+  const auto filename_utf16 = std::u16string(filename_mda[0]);
+  MATLAB_ASSIGN_OR_ERROR(const auto filename_utf8,
+                         arrow::util::UTF16StringToUTF8(filename_utf16),
+                         error::UNICODE_CONVERSION_ERROR_ID);
+
+  MATLAB_ASSIGN_OR_ERROR(auto input_stream, 
arrow::io::ReadableFile::Open(filename_utf8),
+                         error::FAILED_TO_OPEN_FILE_FOR_READ);
+
+  MATLAB_ASSIGN_OR_ERROR(auto reader,
+                         
arrow::ipc::RecordBatchStreamReader::Open(input_stream),
+                         error::IPC_RECORD_BATCH_READER_OPEN_FAILED);
+
+  return std::make_shared<RecordBatchStreamReaderProxy>(std::move(reader));
+}
+
+void RecordBatchStreamReader::getSchema(libmexclass::proxy::method::Context& 
context) {
+  namespace mda = ::matlab::data;
+  using SchemaProxy = arrow::matlab::tabular::proxy::Schema;
+
+  auto schema = reader->schema();
+
+  auto schema_proxy = std::make_shared<SchemaProxy>(std::move(schema));
+  const auto schema_proxy_id =
+      libmexclass::proxy::ProxyManager::manageProxy(schema_proxy);
+
+  mda::ArrayFactory factory;
+  const auto schema_proxy_id_mda = factory.createScalar(schema_proxy_id);
+  context.outputs[0] = schema_proxy_id_mda;
+}
+
+void RecordBatchStreamReader::readTable(libmexclass::proxy::method::Context& 
context) {
+  namespace mda = ::matlab::data;
+  using TableProxy = arrow::matlab::tabular::proxy::Table;
+
+  MATLAB_ASSIGN_OR_ERROR_WITH_CONTEXT(auto table, reader->ToTable(), context,
+                                      error::IPC_TABLE_READ_FAILED);
+  auto table_proxy = std::make_shared<TableProxy>(table);
+  const auto table_proxy_id = 
libmexclass::proxy::ProxyManager::manageProxy(table_proxy);
+
+  mda::ArrayFactory factory;
+  const auto table_proxy_id_mda = factory.createScalar(table_proxy_id);
+  context.outputs[0] = table_proxy_id_mda;
+}
+
+void RecordBatchStreamReader::readRecordBatch(
+    libmexclass::proxy::method::Context& context) {
+  namespace mda = ::matlab::data;
+  using RecordBatchProxy = arrow::matlab::tabular::proxy::RecordBatch;
+  using namespace libmexclass::error;
+  // If we don't have a "pre-cached" record batch to return, then try reading 
another
+  // record batch from the IPC Stream. If there are no more record batches in 
the stream,
+  // then error.
+  if (!nextRecordBatch) {
+    MATLAB_ASSIGN_OR_ERROR_WITH_CONTEXT(nextRecordBatch, reader->Next(), 
context,
+                                        error::IPC_RECORD_BATCH_READ_FAILED);
+  }
+  // Even if the read was "successful", the resulting record batch may be 
empty,
+  // signaling the end of the stream.
+  if (!nextRecordBatch) {
+    context.error =
+        Error{error::IPC_END_OF_STREAM,
+              "Reached end of Arrow IPC Stream. No more record batches to 
read."};
+    return;
+  }
+  auto record_batch_proxy = 
std::make_shared<RecordBatchProxy>(nextRecordBatch);
+  const auto record_batch_proxy_id =
+      libmexclass::proxy::ProxyManager::manageProxy(record_batch_proxy);
+  // Once we have "consumed" the next RecordBatch, set nextRecordBatch to 
nullptr
+  // so that the next call to hasNextRecordBatch correctly checks whether 
there are more
+  // record batches remaining in the IPC Stream.
+  nextRecordBatch = nullptr;
+  mda::ArrayFactory factory;
+  const auto record_batch_proxy_id_mda = 
factory.createScalar(record_batch_proxy_id);
+  context.outputs[0] = record_batch_proxy_id_mda;
+}
+
+void RecordBatchStreamReader::hasNextRecordBatch(
+    libmexclass::proxy::method::Context& context) {
+  namespace mda = ::matlab::data;
+  bool has_next_record_batch = true;
+  if (!nextRecordBatch) {
+    // Try to read another RecordBatch from the
+    // IPC Stream.
+    auto maybe_record_batch = reader->Next();
+    if (!maybe_record_batch.ok()) {
+      has_next_record_batch = false;
+    } else {
+      // If we read a RecordBatch successfully,
+      // then "cache" the RecordBatch
+      // so that we can return it on the next
+      // call to readRecordBatch.
+      nextRecordBatch = *maybe_record_batch;
+
+      // Even if the read was "successful", the resulting
+      // record batch may be empty, signaling that
+      // the end of the IPC stream has been reached.
+      if (!nextRecordBatch) {
+        has_next_record_batch = false;
+      }
+    }
+  }
+
+  mda::ArrayFactory factory;
+  context.outputs[0] = factory.createScalar(has_next_record_batch);
+}
+
+}  // namespace arrow::matlab::io::ipc::proxy
diff --git 
a/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h 
b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h
new file mode 100644
index 0000000000..56fb293987
--- /dev/null
+++ b/matlab/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/ipc/reader.h"
+#include "libmexclass/proxy/Proxy.h"
+
+namespace arrow::matlab::io::ipc::proxy {
+
+class RecordBatchStreamReader : public libmexclass::proxy::Proxy {
+ public:
+  RecordBatchStreamReader(std::shared_ptr<arrow::ipc::RecordBatchStreamReader> 
reader);
+
+  ~RecordBatchStreamReader() = default;
+
+  static libmexclass::proxy::MakeResult make(
+      const libmexclass::proxy::FunctionArguments& constructor_arguments);
+
+ protected:
+  std::shared_ptr<arrow::ipc::RecordBatchStreamReader> reader;
+  std::shared_ptr<arrow::RecordBatch> nextRecordBatch;
+
+  void getSchema(libmexclass::proxy::method::Context& context);
+  void readRecordBatch(libmexclass::proxy::method::Context& context);
+  void hasNextRecordBatch(libmexclass::proxy::method::Context& context);
+  void readTable(libmexclass::proxy::method::Context& context);
+};
+
+}  // namespace arrow::matlab::io::ipc::proxy
diff --git a/matlab/src/cpp/arrow/matlab/proxy/factory.cc 
b/matlab/src/cpp/arrow/matlab/proxy/factory.cc
index a08a7495c0..902546fd05 100644
--- a/matlab/src/cpp/arrow/matlab/proxy/factory.cc
+++ b/matlab/src/cpp/arrow/matlab/proxy/factory.cc
@@ -36,6 +36,7 @@
 #include "arrow/matlab/io/feather/proxy/writer.h"
 #include "arrow/matlab/io/ipc/proxy/record_batch_file_reader.h"
 #include "arrow/matlab/io/ipc/proxy/record_batch_file_writer.h"
+#include "arrow/matlab/io/ipc/proxy/record_batch_stream_reader.h"
 #include "arrow/matlab/io/ipc/proxy/record_batch_stream_writer.h"
 #include "arrow/matlab/tabular/proxy/record_batch.h"
 #include "arrow/matlab/tabular/proxy/schema.h"
@@ -113,6 +114,7 @@ libmexclass::proxy::MakeResult Factory::make_proxy(
   REGISTER_PROXY(arrow.io.ipc.proxy.RecordBatchFileReader , 
arrow::matlab::io::ipc::proxy::RecordBatchFileReader);
   REGISTER_PROXY(arrow.io.ipc.proxy.RecordBatchFileWriter , 
arrow::matlab::io::ipc::proxy::RecordBatchFileWriter);
   REGISTER_PROXY(arrow.io.ipc.proxy.RecordBatchStreamWriter , 
arrow::matlab::io::ipc::proxy::RecordBatchStreamWriter);
+  REGISTER_PROXY(arrow.io.ipc.proxy.RecordBatchStreamReader , 
arrow::matlab::io::ipc::proxy::RecordBatchStreamReader);
 
   // clang-format on
 
diff --git a/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m 
b/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m
new file mode 100644
index 0000000000..60ca38eba9
--- /dev/null
+++ b/matlab/src/matlab/+arrow/+io/+ipc/RecordBatchStreamReader.m
@@ -0,0 +1,83 @@
+%RECORDBATCHSTREAMREADER Class for reading Arrow record batches from the
+% Arrow IPC Stream format.
+
+% Licensed to the Apache Software Foundation (ASF) under one or more
+% contributor license agreements.  See the NOTICE file distributed with
+% this work for additional information regarding copyright ownership.
+% The ASF licenses this file to you under the Apache License, Version
+% 2.0 (the "License"); you may not use this file except in compliance
+% with the License.  You may obtain a copy of the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS,
+% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+% implied.  See the License for the specific language governing
+% permissions and limitations under the License.
+
+classdef RecordBatchStreamReader < matlab.mixin.Scalar
+
+    properties(SetAccess=private, GetAccess=public, Hidden)
+        Proxy
+    end
+
+    properties (Dependent, SetAccess=private, GetAccess=public)
+        Schema
+    end
+
+    methods
+        function obj = RecordBatchStreamReader(filename)
+            arguments
+                filename(1, 1) string {mustBeNonzeroLengthText}
+            end
+            args = struct(Filename=filename);
+            proxyName = "arrow.io.ipc.proxy.RecordBatchStreamReader";
+            obj.Proxy = arrow.internal.proxy.create(proxyName, args);
+        end
+
+        function schema = get.Schema(obj)
+            proxyID = obj.Proxy.getSchema();
+            proxyName = "arrow.tabular.proxy.Schema";
+            proxy = libmexclass.proxy.Proxy(ID=proxyID, Name=proxyName);
+            schema = arrow.tabular.Schema(proxy);
+        end
+
+        function tf = hasnext(obj)
+            tf = obj.Proxy.hasNextRecordBatch();
+        end
+
+        function tf = done(obj)
+            tf = ~obj.Proxy.hasNextRecordBatch();
+        end
+
+        function arrowRecordBatch = read(obj)
+           % NOTE: This function is a "convenience alias" for the 
readRecordBatch
+           % method, which has a longer name. This is the exact same 
implementation
+           % as readRecordBatch. Since this method might be called in a tight 
loop,
+           % it should be slightly more efficient to call the C++ code 
directly,
+           % rather than invoking obj.readRecordBatch indirectly. We are 
intentionally
+           % trading off code duplication for performance here.
+            proxyID = obj.Proxy.readRecordBatch();
+            proxyName = "arrow.tabular.proxy.RecordBatch";
+            proxy = libmexclass.proxy.Proxy(ID=proxyID, Name=proxyName);
+            arrowRecordBatch = arrow.tabular.RecordBatch(proxy);
+        end
+
+        function arrowRecordBatch = readRecordBatch(obj)
+            proxyID = obj.Proxy.readRecordBatch();
+            proxyName = "arrow.tabular.proxy.RecordBatch";
+            proxy = libmexclass.proxy.Proxy(ID=proxyID, Name=proxyName);
+            arrowRecordBatch = arrow.tabular.RecordBatch(proxy);
+        end
+
+        function arrowTable = readTable(obj)
+            proxyID = obj.Proxy.readTable();
+            proxyName = "arrow.tabular.proxy.Table";
+            proxy = libmexclass.proxy.Proxy(ID=proxyID, Name=proxyName);
+            arrowTable = arrow.tabular.Table(proxy);
+        end
+
+    end
+
+end
diff --git a/matlab/test/arrow/io/ipc/tRecordBatchStreamReader.m 
b/matlab/test/arrow/io/ipc/tRecordBatchStreamReader.m
new file mode 100644
index 0000000000..6ca6719773
--- /dev/null
+++ b/matlab/test/arrow/io/ipc/tRecordBatchStreamReader.m
@@ -0,0 +1,336 @@
+%TRECORDBATCHSTREAMREADER Unit tests for arrow.io.ipc.RecordBatchStreamReader.
+
+% Licensed to the Apache Software Foundation (ASF) under one or more
+% contributor license agreements.  See the NOTICE file distributed with
+% this work for additional information regarding copyright ownership.
+% The ASF licenses this file to you under the Apache License, Version
+% 2.0 (the "License"); you may not use this file except in compliance
+% with the License.  You may obtain a copy of the License at
+%
+%   http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS,
+% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+% implied.  See the License for the specific language governing
+% permissions and limitations under the License.
+classdef tRecordBatchStreamReader < matlab.unittest.TestCase
+
+    properties
+        DataFolder
+        ZeroBatchStreamFile
+        OneBatchStreamFile
+        MultipleBatchStreamFile
+        RandomAccessFile
+    end
+
+    properties (TestParameter)
+        RecordBatchReadFcn = {@read, @readRecordBatch}
+    end
+
+    methods(TestClassSetup)
+
+        function setupDataFolder(testCase)
+            import matlab.unittest.fixtures.TemporaryFolderFixture
+            fixture = testCase.applyFixture(TemporaryFolderFixture);
+            testCase.DataFolder = string(fixture.Folder);
+        end
+
+        function setupRandomAccessFile(testCase)
+            fieldA = arrow.field("A", arrow.string());
+            fieldB = arrow.field("B", arrow.float32());
+            schema = arrow.schema([fieldA, fieldB]);
+            fname = fullfile(testCase.DataFolder, "RandomAccessFile.arrow");
+            writer = arrow.io.ipc.RecordBatchFileWriter(fname, schema);
+            writer.close();
+            testCase.RandomAccessFile = fname;
+        end
+
+        function setupZeroBatchStreamFile(testCase)
+            fieldA = arrow.field("A", arrow.string());
+            fieldB = arrow.field("B", arrow.float32());
+            schema = arrow.schema([fieldA, fieldB]);
+            fname = fullfile(testCase.DataFolder, 
"ZeroBatchStreamFile.arrows");
+            writer = arrow.io.ipc.RecordBatchStreamWriter(fname, schema);
+            writer.close();
+            testCase.ZeroBatchStreamFile = fname;
+        end
+
+        function setupOneBatchStreamFile(testCase)
+            t = table(["Row1"; "Row2"], single([1; 2]), VariableNames=["A", 
"B"]);
+            recordBatch = arrow.recordBatch(t);
+            fname = fullfile(testCase.DataFolder, "OneBatchFile.arrows");
+            writer = arrow.io.ipc.RecordBatchStreamWriter(fname, 
recordBatch.Schema);
+            writer.writeRecordBatch(recordBatch);
+            writer.close();
+            testCase.OneBatchStreamFile = fname;
+        end
+
+        function setupMultipleBatchStreamFile(testCase)
+            t1 = table(["Row1"; "Row2"], single([1; 2]), VariableNames=["A", 
"B"]);
+            t2 = table(["Row3"; "Row4"], single([3; 4]), VariableNames=["A", 
"B"]);
+            recordBatch1 = arrow.recordBatch(t1);
+            recordBatch2 = arrow.recordBatch(t2);
+            fname = fullfile(testCase.DataFolder, 
"MultipleBatchStreamFile.arrows");
+            writer = arrow.io.ipc.RecordBatchStreamWriter(fname, 
recordBatch1.Schema);
+            writer.writeRecordBatch(recordBatch1);
+            writer.writeRecordBatch(recordBatch2);
+            writer.close();
+            testCase.MultipleBatchStreamFile = fname;
+        end
+    end
+
+    methods (Test)
+
+        function ZeroLengthFilenameError(testCase)
+            % Verify RecordBatchStreamReader throws an exception with the
+            % identifier MATLAB:validators:mustBeNonzeroLengthText if the
+            % filename input argument given is a zero length string.
+            fcn = @() arrow.io.ipc.RecordBatchStreamReader("");
+            testCase.verifyError(fcn, 
"MATLAB:validators:mustBeNonzeroLengthText");
+        end
+
+        function MissingStringFilenameError(testCase)
+            % Verify RecordBatchStreamReader throws an exception with the
+            % identifier MATLAB:validators:mustBeNonzeroLengthText if the
+            % filename input argument given is a missing string.
+            fcn = @() arrow.io.ipc.RecordBatchStreamReader(string(missing));
+            testCase.verifyError(fcn, 
"MATLAB:validators:mustBeNonzeroLengthText");
+        end
+
+        function FilenameInvalidTypeError(testCase)
+            % Verify RecordBatchStreamReader throws an exception with the
+            % identifier MATLAB:validators:UnableToConvert if the filename
+            % input argument is neither a scalar string nor a char vector.
+            fcn = @() arrow.io.ipc.RecordBatchStreamReader(table);
+            testCase.verifyError(fcn, "MATLAB:validation:UnableToConvert");
+        end
+
+        function Schema(testCase)
+            % Verify the getter method for Schema returns the
+            % expected value.
+            fieldA = arrow.field("A", arrow.string());
+            fieldB = arrow.field("B", arrow.float32());
+            expectedSchema = arrow.schema([fieldA fieldB]);
+
+            reader = 
arrow.io.ipc.RecordBatchStreamReader(testCase.ZeroBatchStreamFile);
+            testCase.verifyEqual(reader.Schema, expectedSchema);
+
+            reader = 
arrow.io.ipc.RecordBatchStreamReader(testCase.OneBatchStreamFile);
+            testCase.verifyEqual(reader.Schema, expectedSchema);
+
+            reader = 
arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile);
+            testCase.verifyEqual(reader.Schema, expectedSchema);
+        end
+
+        function SchemaNoSetter(testCase)
+            % Verify the Schema property is not settable.
+            fieldC = arrow.field("C", arrow.date32());
+            schema = arrow.schema(fieldC);
+            reader = 
arrow.io.ipc.RecordBatchStreamReader(testCase.ZeroBatchStreamFile);
+            testCase.verifyError(@() setfield(reader, "Schema", schema), 
"MATLAB:class:SetProhibited");
+        end
+
+        function ReadErrorIfEndOfStream(testCase, RecordBatchReadFcn)
+            % Verify read throws an execption with the identifier 
arrow:io:ipc:EndOfStream
+            % on an Arrow IPC Stream file containing zero batches.
+            reader = 
arrow.io.ipc.RecordBatchStreamReader(testCase.ZeroBatchStreamFile);
+            fcn = @() RecordBatchReadFcn(reader);
+            testCase.verifyError(fcn, "arrow:io:ipc:EndOfStream");
+        end
+
+        function ReadOneBatchStreamFile(testCase, RecordBatchReadFcn)
+            % Verify read can successfully read an Arrow IPC Stream file
+            % containing one batch.
+            reader = 
arrow.io.ipc.RecordBatchStreamReader(testCase.OneBatchStreamFile);
+
+            expectedMatlabTable = table(["Row1"; "Row2"], single([1; 2]), 
VariableNames=["A", "B"]);
+            expected = arrow.recordBatch(expectedMatlabTable);
+            actual = RecordBatchReadFcn(reader);
+            testCase.verifyEqual(actual, expected);
+
+            fcn = @() RecordBatchReadFcn(reader);
+            testCase.verifyError(fcn, "arrow:io:ipc:EndOfStream");
+        end
+
+        function ReadMultipleBatchStreamFile(testCase, RecordBatchReadFcn)
+            % Verify read can successfully read an Arrow IPC Stream file
+            % containing mulitple batches.
+            reader = 
arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile);
+
+            expectedMatlabTable1 = table(["Row1"; "Row2"], single([1; 2]), 
VariableNames=["A", "B"]);
+            expected1 = arrow.recordBatch(expectedMatlabTable1);
+            actual1 = RecordBatchReadFcn(reader);
+            testCase.verifyEqual(actual1, expected1);
+
+            expectedMatlabTable2 = table(["Row3"; "Row4"], single([3; 4]), 
VariableNames=["A", "B"]);
+            expected2 = arrow.recordBatch(expectedMatlabTable2);
+            actual2 = RecordBatchReadFcn(reader);
+            testCase.verifyEqual(actual2, expected2);
+
+            fcn = @() RecordBatchReadFcn(reader);
+            testCase.verifyError(fcn, "arrow:io:ipc:EndOfStream");
+        end
+
+        function HasNext(testCase, RecordBatchReadFcn)
+            % Verify that the hasnext method returns true the correct
+            % number of times depending on the number of record
+            % batches in an Arrow IPC Stream format.
+
+            reader = 
arrow.io.ipc.RecordBatchStreamReader(testCase.ZeroBatchStreamFile);
+            % hasnext should return true 0 times for a 0 batch file.
+            iterations = 0;
+            while reader.hasnext()
+                RecordBatchReadFcn(reader);
+                iterations = iterations + 1;
+            end
+            testCase.verifyEqual(iterations, 0);
+
+            reader = 
arrow.io.ipc.RecordBatchStreamReader(testCase.OneBatchStreamFile);
+            % hasnext should return true 1 time for a 1 batch file.
+            iterations = 0;
+            while reader.hasnext()
+                RecordBatchReadFcn(reader);
+                iterations = iterations + 1;
+            end
+            testCase.verifyEqual(iterations, 1);
+
+            reader = 
arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile);
+            % hasnext should return true 2 times for a 2 batch file.
+            iterations = 0;
+            while reader.hasnext()
+                RecordBatchReadFcn(reader);
+                iterations = iterations + 1;
+            end
+            testCase.verifyEqual(iterations, 2);
+        end
+
+        function Done(testCase, RecordBatchReadFcn)
+            % Verify that the done method returns false the correct
+            % number of times depending on the number of record
+            % batches in an Arrow IPC Stream format.
+
+            reader = 
arrow.io.ipc.RecordBatchStreamReader(testCase.ZeroBatchStreamFile);
+            % done should return false 0 times for a 0 batch file.
+            iterations = 0;
+            while ~reader.done()
+                RecordBatchReadFcn(reader);
+                iterations = iterations + 1;
+            end
+            testCase.verifyEqual(iterations, 0);
+
+            reader = 
arrow.io.ipc.RecordBatchStreamReader(testCase.OneBatchStreamFile);
+            % done should return false 1 time for a 1 batch file.
+            iterations = 0;
+            while ~reader.done()
+                RecordBatchReadFcn(reader);
+                iterations = iterations + 1;
+            end
+            testCase.verifyEqual(iterations, 1);
+
+            reader = 
arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile);
+            % done should return false 2 times for a 2 batch file.
+            iterations = 0;
+            while ~reader.done()
+                RecordBatchReadFcn(reader);
+                iterations = iterations + 1;
+            end
+            testCase.verifyEqual(iterations, 2);
+        end
+
+        function ReadTableZeroBatchStreamFile(testCase)
+            % Verify read can successfully read an Arrow IPC Stream file
+            % containing zero batches as an arrow.tabular.Table.
+            reader = 
arrow.io.ipc.RecordBatchStreamReader(testCase.ZeroBatchStreamFile);
+            matlabTable = table('Size', [0, 2], 'VariableTypes', ["string", 
"single"], 'VariableNames', ["A", "B"]);
+            expected = arrow.table(matlabTable);
+            actual = reader.readTable();
+            testCase.verifyEqual(actual, expected);
+        end
+
+        function ReadTableOneBatchStreamFile(testCase)
+            % Verify read can successfully read an Arrow IPC Stream file
+            % containing one batch as an arrow.tabular.Table.
+            reader = 
arrow.io.ipc.RecordBatchStreamReader(testCase.OneBatchStreamFile);
+            matlabTable = table(["Row1"; "Row2"], single([1; 2]), 
VariableNames=["A", "B"]);
+            expected = arrow.table(matlabTable);
+            actual = reader.readTable();
+            testCase.verifyEqual(actual, expected);
+        end
+
+        function ReadTableMultipleBatchStreamFile(testCase)
+            % Verify read can successfully read an Arrow IPC Stream file
+            % containing multiple batches as an arrow.tabular.Table.
+            reader = 
arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile);
+            matlabTable = table(["Row1"; "Row2"; "Row3"; "Row4"], single([1; 
2; 3; 4]), VariableNames=["A", "B"]);
+            expected = arrow.table(matlabTable);
+            actual = reader.readTable();
+            testCase.verifyEqual(actual, expected);
+        end
+
+        function ReadTableAfterReadRecordBatch(testCase, RecordBatchReadFcn)
+            % Verify readTable returns only the remaining record batches
+            % in an Arrow IPC Stream file after calling readRecordBatch first.
+            reader = 
arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile);
+
+            testCase.verifyTrue(reader.hasnext());
+            testCase.verifyFalse(reader.done());
+
+            expectedRecordBatch = arrow.recordBatch(...
+                table(["Row1"; "Row2"], single([1; 2]), VariableNames=["A", 
"B"]) ...
+            );
+            actualRecordBatch = RecordBatchReadFcn(reader);
+            testCase.verifyEqual(actualRecordBatch, expectedRecordBatch);
+
+            expectedTable = arrow.table(...
+                table(["Row3"; "Row4"], single([3; 4]), VariableNames=["A", 
"B"]) ...
+            );
+            actualTable = reader.readTable();
+            testCase.verifyEqual(actualTable, expectedTable);
+
+            testCase.verifyFalse(reader.hasnext());
+            testCase.verifyTrue(reader.done());
+        end
+
+        function ReadTableMultipleCalls(testCase)
+            % Verify readTable returns an empty table if it is called
+            % multiple times in a row.
+            reader = 
arrow.io.ipc.RecordBatchStreamReader(testCase.MultipleBatchStreamFile);
+
+            expected = arrow.table(...
+                table(["Row1"; "Row2"; "Row3"; "Row4"], single([1; 2; 3; 4]), 
VariableNames=["A", "B"]) ...
+            );
+            actual = reader.readTable();
+            testCase.verifyEqual(actual, expected);
+
+            testCase.verifyFalse(reader.hasnext());
+            testCase.verifyTrue(reader.done());
+
+            expectedEmpty = arrow.table(...
+                table('Size', [0, 2], 'VariableTypes', ["string", "single"], 
'VariableNames', ["A", "B"]) ...
+            );
+
+            actualEmpty = reader.readTable();
+            testCase.verifyEqual(actualEmpty, expectedEmpty);
+
+            testCase.verifyFalse(reader.hasnext());
+            testCase.verifyTrue(reader.done());
+
+            actualEmpty = reader.readTable();
+            testCase.verifyEqual(actualEmpty, expectedEmpty);
+
+            testCase.verifyFalse(reader.hasnext());
+            testCase.verifyTrue(reader.done());
+        end
+
+        function ErrorIfNotIpcStreamFile(testCase)
+            % Verify RecordBatchStreamReader throws an exception with the
+            % identifier arrow:io:ipc:FailedToOpenRecordBatchReader if
+            % the provided file is not an Arrow IPC Stream file.
+            fcn = @() 
arrow.io.ipc.RecordBatchStreamReader(testCase.RandomAccessFile);
+            testCase.verifyError(fcn, 
"arrow:io:ipc:FailedToOpenRecordBatchReader");
+        end
+
+    end
+
+end
diff --git a/matlab/tools/cmake/BuildMatlabArrowInterface.cmake 
b/matlab/tools/cmake/BuildMatlabArrowInterface.cmake
index 29a737a6ec..27af19676b 100644
--- a/matlab/tools/cmake/BuildMatlabArrowInterface.cmake
+++ b/matlab/tools/cmake/BuildMatlabArrowInterface.cmake
@@ -83,6 +83,7 @@ set(MATLAB_ARROW_LIBMEXCLASS_CLIENT_PROXY_SOURCES 
"${CMAKE_SOURCE_DIR}/src/cpp/a
                                                   
"${CMAKE_SOURCE_DIR}/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_file_reader.cc"
                                                   
"${CMAKE_SOURCE_DIR}/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_file_writer.cc"
                                                   
"${CMAKE_SOURCE_DIR}/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_writer.cc"
+                                                  
"${CMAKE_SOURCE_DIR}/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_reader.cc"
                                                   
"${CMAKE_SOURCE_DIR}/src/cpp/arrow/matlab/io/ipc/proxy/record_batch_stream_writer.cc")
 
 

Reply via email to