polestar1988 commented on issue #37976:
URL: https://github.com/apache/arrow/issues/37976#issuecomment-1743451223
@zeroshade Thanks for your response, As I want to have no limit on my data
size I Already splitted data using NewSlice :
// Split records into smaller slices
```
chunkSize := 4 * 1024 * 1024 // Bytes
recordChunks := sliceRecordByBytes(transformedRecord, chunkSize)
chunkSchema := recordChunks[0].Schema()
currentChunk := make([]arrow.Array, 0)
for _, rec := range recordChunks {
for i := 0; i < int(rec.NumCols()); i++ {
column := rec.Column(i)
currentChunk = append(currentChunk, column)
}
// Create a Flight writer
writeChunkToStream(server, chunkSchema, currentChunk)
currentChunk = nil
}
```
```
// Function to calculate size based on data type
func calculateSize(dataType arrow.DataType) int {
switch dataType.(type) {
case *arrow.Int64Type:
return 8 // 8 bytes for Int64
case *arrow.FixedSizeBinaryType, *arrow.BinaryType:
// Adjust this size calculation based on your requirements
return 16 // Example: 16 bytes for FixedSizeBinary
case *arrow.Float64Type:
return 8 // 8 bytes for Float64
case *arrow.Date64Type:
return 8 // 8 bytes for Date64
case *arrow.Time32Type, *arrow.TimestampType:
// Adjust this size calculation based on your requirements
return 16 // Example: 16 bytes for Timestamp
case *arrow.Int16Type:
return 2 // 2 bytes for Int16
case *arrow.StringType:
// Adjust this size calculation based on your requirements
return 32 // Example: 32 bytes for String
default:
// Default to 0 if the data type is unknown
return 0
}
}
```
```
func sliceRecordByBytes(record arrow.Record, size int) []arrow.Record {
// Get the number of rows in the record
numRows := record.NumRows()
// Create a slice of records to store the slices
slices := make([]arrow.Record, 0)
// Initialize variables for slicing
currentSize := 0
currentRows := make([]int, 0)
// Loop over the rows in the record
for i := 0; i < int(numRows); i++ {
// Calculate the size of the row in bytes based on data type
rowSize := 0
for j := 0; j < int(record.NumCols()); j++ {
col := record.Column(j)
rowSize += calculateSize(col.DataType())
}
// Check if adding this row would exceed the size limit
if currentSize+rowSize > size {
// If yes, create a slice from the current rows
slice := record.NewSlice(int64(currentRows[0]),
int64(currentRows[len(currentRows)-1]))
// Append it to the slices slice
slices = append(slices, slice)
// Reset the current size and rows
currentSize = 0
currentRows = []int{i}
} else {
// If no, add this row to the current rows
currentRows = append(currentRows, i)
}
// Update the current size
currentSize += rowSize
}
// Create a slice from the remaining rows
if len(currentRows) > 0 {
slice := record.NewSlice(int64(currentRows[0]),
int64(currentRows[len(currentRows)-1]))
// Append it to the slices slice
slices = append(slices, slice)
}
// Return the slices
return slices
}
```
in the client side:
```
recs := make([]arrow.Record, 0)
for reader.Next() {
r := reader.Record()
r.Retain()
defer r.Release()
recs = append(recs, r)
}
if err := reader.Err(); err != nil {
log.Print("ERROR", err)
}
tbl := array.NewTableFromRecords(reader.Schema(), recs)
defer tbl.Release()
```
And error `arrow/ipc: invalid message type (got=Schema, want=RecordBatch`
That's because the slices also containig the schema , and writing the slices
without schema is not possible, How can I handle this?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]