abandy commented on code in PR #45029:
URL: https://github.com/apache/arrow/pull/45029#discussion_r2072408604


##########
swift/Arrow/Sources/Arrow/ArrowReader.swift:
##########
@@ -216,7 +216,76 @@ public class ArrowReader { // swiftlint:disable:this 
type_body_length
         return .success(RecordBatch(arrowSchema, columns: columns))
     }
 
-    public func fromStream( // swiftlint:disable:this function_body_length
+    /*
+     The Memory stream format is for reading the arrow streaming protocol.  
This
+     format is slightly different from the File format protocol as it doesn't 
contain
+     a header and footer
+     */
+    public func fromMemoryStream( // swiftlint:disable:this 
function_body_length
+        _ fileData: Data,
+        useUnalignedBuffers: Bool = false
+    ) -> Result<ArrowReaderResult, ArrowError> {
+        let result = ArrowReaderResult()
+        var offset: Int = 0
+        var length = getUInt32(fileData, offset: offset)
+        var streamData = fileData
+        var schemaMessage: org_apache_arrow_flatbuf_Schema?
+        while length != 0 {
+            if length == CONTINUATIONMARKER {
+                offset += Int(MemoryLayout<Int32>.size)
+                length = getUInt32(fileData, offset: offset)
+                if length == 0 {
+                    return .success(result)
+                }
+            }
+
+            offset += Int(MemoryLayout<Int32>.size)
+            streamData = fileData[offset...]
+            let dataBuffer = ByteBuffer(
+                data: streamData,
+                allowReadingUnalignedBuffers: true)
+            let message = 
org_apache_arrow_flatbuf_Message.getRootAsMessage(bb: dataBuffer)
+            switch message.headerType {
+            case .recordbatch:
+                do {
+                    let rbMessage = message.header(type: 
org_apache_arrow_flatbuf_RecordBatch.self)!
+                    offset += Int(message.bodyLength + Int64(length))
+                    let recordBatch = try loadRecordBatch(
+                        rbMessage,
+                        schema: schemaMessage!,
+                        arrowSchema: result.schema!,
+                        data: fileData,
+                        messageEndOffset: (message.bodyLength + 
Int64(length))).get()
+                    result.batches.append(recordBatch)
+                    length = getUInt32(fileData, offset: offset)
+                } catch let error as ArrowError {
+                    return .failure(error)
+                } catch {
+                    return .failure(.unknownError("Unexpected error: 
\(error)"))
+                }
+            case .schema:
+                schemaMessage = message.header(type: 
org_apache_arrow_flatbuf_Schema.self)!
+                let schemaResult = loadSchema(schemaMessage!)
+                switch schemaResult {
+                case .success(let schema):
+                    result.schema = schema
+                case .failure(let error):
+                    return .failure(error)
+                }
+                offset += Int(message.bodyLength + Int64(length))
+                length = getUInt32(fileData, offset: offset)
+            default:
+                return .failure(.unknownError("Unhandled header type: 
\(message.headerType)"))
+            }
+        }
+        return .success(result)
+    }
+
+    /*
+     The File stream format supports random accessing the data.  This format 
contains
+     a header and footer around the streaming format.
+     */
+    public func fromFileStream( // swiftlint:disable:this function_body_length

Review Comment:
   Gotcha, I will change the name to fromStream.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to