caldempsey commented on PR #449:
URL: https://github.com/apache/arrow-go/pull/449#issuecomment-3117354957

   ```
   large_payload_50k_records:
     Records: 50000
     Target record size: 10240 bytes
     JSON Array size: 500.93 MB
     NDJSON size: 500.93 MB
     JSONReader (chunked):      1.267049792s (395.35 MB/s, 39462 rec/s)
     JSONReader (single chunk): 1.058549125s (473.22 MB/s, 47234 rec/s)
     RecordFromJSON:            1.043343541s (480.12 MB/s, 47923 rec/s)
   
     Speedup vs RecordFromJSON:
       JSONReader (chunked):      0.82x
       JSONReader (single chunk): 0.99x
   --- PASS: TestFileLabelsPayloads/large_payload_50k_records (4.69s)
   === RUN   TestFileLabelsPayloads/4MB_payload_1000_records
   
   4MB_payload_1000_records:
     Records: 1000
     Target record size: 4000000 bytes
     JSON Array size: 3814.94 MB
     NDJSON size: 3814.94 MB
     JSONReader (chunked):       9.64786775s (395.42 MB/s, 104 rec/s)
     JSONReader (single chunk): 8.009190083s (476.32 MB/s, 125 rec/s)
     RecordFromJSON:            7.984174833s (477.81 MB/s, 125 rec/s)
   
     Speedup vs RecordFromJSON:
       JSONReader (chunked):      0.83x
       JSONReader (single chunk): 1.00x
   --- PASS: TestFileLabelsPayloads/4MB_payload_1000_records (35.29s)
   --- PASS: TestFileLabelsPayloads (39.97s)
   PASS
   ```
   
   Ran a test using data that's not quite production, but closer. The use case 
involves ingesting transcripts of audio files via Spark Connect. These 
transcripts can be quite large, up to 4MB each. We're streaming this data into 
a data lake using Spark Connect instead of traditional Spark jobs, which allows 
us to receive synchronous responses at the call site and support coordination 
patterns (GET/POST) using standard REST consumers (which is _so_ much better 
for error handling). Hence the massive strings. Feel free to use this benchmark 
in your own tests — it should be a fairly comparable setup.
   
   ```go
   package main_test
   
   import (
        "bytes"
        "encoding/json"
        "fmt"
        "strings"
        "testing"
        "time"
   
        "github.com/apache/arrow-go/v18/arrow"
        "github.com/apache/arrow-go/v18/arrow/array"
        "github.com/apache/arrow-go/v18/arrow/memory"
        "github.com/google/uuid"
   )
   
   // FileLabel represents the structure from the Python benchmark
   type FileLabel struct {
        FilePath    string  `json:"file_path"`
        SomeLabel   string  `json:"some_label"`
        Confidence  float64 `json:"confidence"`
        ModelName   string  `json:"model_name"`
        ProcessedAt string  `json:"processed_at"`
        BatchID     string  `json:"batch_id"`
        Metadata    string  `json:"metadata"` // JSON string
   }
   
   // MetadataContent represents the metadata structure
   type MetadataContent struct {
        ProcessingTimeMs int    `json:"processing_time_ms"`
        Version          string `json:"version"`
        Padding          string `json:"padding"`
        RecordID         int    `json:"record_id"`
        ExtraField1      string `json:"extra_field_1"`
        ExtraField2      string `json:"extra_field_2"`
        ExtraField3      string `json:"extra_field_3"`
   }
   
   // makePadding creates a padding string of the specified size
   func makePadding(size int) string {
        alphabet := "abcdefghijklmnopqrstuvwxyz"
        repeats := size/len(alphabet) + 1
        return strings.Repeat(alphabet, repeats)[:size]
   }
   
   // generateLargePayload generates the same payload structure as the Python 
benchmark
   func generateLargePayload(numRecords int, recordSizeBytes int) []FileLabel {
        batchID := uuid.New().String()
        baseRecordSize := 200
        metadataSize := recordSizeBytes - baseRecordSize
        if metadataSize < 100 {
                metadataSize = 100
        }
        padding := makePadding(metadataSize)
        nowISO := time.Now().UTC().Format(time.RFC3339)
   
        payload := make([]FileLabel, numRecords)
        for i := 0; i < numRecords; i++ {
                metadata := MetadataContent{
                        ProcessingTimeMs: 150 + (i % 100),
                        Version:          "1.0",
                        Padding:          padding,
                        RecordID:         i,
                        ExtraField1:      fmt.Sprintf("extra_value_%d", i),
                        ExtraField2:      fmt.Sprintf("extra_value_%d", i*2),
                        ExtraField3:      fmt.Sprintf("extra_value_%d", i*3),
                }
   
                metadataJSON, _ := json.Marshal(metadata)
   
                payload[i] = FileLabel{
                        FilePath:    
fmt.Sprintf("s3://test-bucket/batch-%s/file-%d.jpg", batchID, i),
                        SomeLabel:   fmt.Sprintf("label_%d", i%10),
                        Confidence:  0.85 + float64(i%15)/100.0,
                        ModelName:   fmt.Sprintf("model_v%d", (i%5)+1),
                        ProcessedAt: nowISO,
                        BatchID:     batchID,
                        Metadata:    string(metadataJSON),
                }
        }
        return payload
   }
   
   // payloadToNDJSON converts payload to newline-delimited JSON bytes
   func payloadToNDJSON(payload []FileLabel) []byte {
        var buf bytes.Buffer
        encoder := json.NewEncoder(&buf)
        for _, record := range payload {
                encoder.Encode(record)
        }
        return buf.Bytes()
   }
   
   // payloadToJSONArray converts payload to JSON array format
   func payloadToJSONArray(payload []FileLabel) []byte {
        data, _ := json.Marshal(payload)
        return data
   }
   
   // Define Arrow schema matching the FileLabel structure
   func getFileLabelsSchema() *arrow.Schema {
        return arrow.NewSchema([]arrow.Field{
                {Name: "file_path", Type: arrow.BinaryTypes.String},
                {Name: "some_label", Type: arrow.BinaryTypes.String},
                {Name: "confidence", Type: arrow.PrimitiveTypes.Float64},
                {Name: "model_name", Type: arrow.BinaryTypes.String},
                {Name: "processed_at", Type: arrow.BinaryTypes.String},
                {Name: "batch_id", Type: arrow.BinaryTypes.String},
                {Name: "metadata", Type: arrow.BinaryTypes.String},
        }, nil)
   }
   
   func benchmarkRecordFromJSON(data []byte, schema *arrow.Schema) 
(time.Duration, int64, error) {
        pool := memory.NewGoAllocator()
   
        start := time.Now()
        record, _, err := array.RecordFromJSON(pool, schema, 
bytes.NewReader(data))
        duration := time.Since(start)
   
        if err != nil {
                return duration, 0, err
        }
   
        numRows := record.NumRows()
        record.Release()
   
        return duration, numRows, nil
   }
   
   func benchmarkJSONReader(ndjsonData []byte, schema *arrow.Schema) 
(time.Duration, int64, error) {
        pool := memory.NewGoAllocator()
   
        start := time.Now()
   
        rdr := array.NewJSONReader(bytes.NewReader(ndjsonData), schema,
                array.WithAllocator(pool))
        defer rdr.Release()
   
        var totalRows int64
        for rdr.Next() {
                rec := rdr.Record()
                totalRows += rec.NumRows()
        }
   
        if err := rdr.Err(); err != nil {
                return time.Since(start), totalRows, err
        }
   
        duration := time.Since(start)
        return duration, totalRows, nil
   }
   
   func benchmarkJSONReaderSingleChunk(ndjsonData []byte, schema *arrow.Schema) 
(time.Duration, int64, error) {
        pool := memory.NewGoAllocator()
   
        start := time.Now()
   
        rdr := array.NewJSONReader(bytes.NewReader(ndjsonData), schema,
                array.WithAllocator(pool),
                array.WithChunk(-1))
        defer rdr.Release()
   
        if !rdr.Next() {
                return time.Since(start), 0, fmt.Errorf("no record found")
        }
   
        rec := rdr.Record()
        numRows := rec.NumRows()
   
        duration := time.Since(start)
        return duration, numRows, nil
   }
   
   // BenchmarkScenario represents a test scenario
   type BenchmarkScenario struct {
        Name            string
        NumRecords      int
        RecordSizeBytes int
   }
   
   func TestFileLabelsPayloads(t *testing.T) {
        scenarios := []BenchmarkScenario{
                {
                        Name:            "large_payload_50k_records",
                        NumRecords:      50_000,
                        RecordSizeBytes: 10_240,
                },
                {
                        Name:            "4MB_payload_1000_records",
                        NumRecords:      1000,
                        RecordSizeBytes: 4000000,
                },
        }
   
        schema := getFileLabelsSchema()
   
        fmt.Println("Spark Connect API Payload Benchmarks")
        fmt.Println("==================================")
   
        for _, scenario := range scenarios {
                t.Run(scenario.Name, func(t *testing.T) {
                        // Generate payload
                        payload := generateLargePayload(scenario.NumRecords, 
scenario.RecordSizeBytes)
   
                        // Convert to both formats
                        jsonArrayData := payloadToJSONArray(payload)
                        ndjsonData := payloadToNDJSON(payload)
   
                        jsonArraySizeMB := float64(len(jsonArrayData)) / (1024 
* 1024)
                        ndjsonSizeMB := float64(len(ndjsonData)) / (1024 * 1024)
   
                        fmt.Printf("\n%s:\n", scenario.Name)
                        fmt.Printf("  Records: %d\n", scenario.NumRecords)
                        fmt.Printf("  Target record size: %d bytes\n", 
scenario.RecordSizeBytes)
                        fmt.Printf("  JSON Array size: %.2f MB\n", 
jsonArraySizeMB)
                        fmt.Printf("  NDJSON size: %.2f MB\n", ndjsonSizeMB)
   
                        // Benchmark JSONReader with NDJSON (default chunking)
                        duration2, rows2, err2 := 
benchmarkJSONReader(ndjsonData, schema)
                        if err2 != nil {
                                t.Errorf("JSONReader (chunked) failed: %v", 
err2)
                        } else {
                                throughput2 := ndjsonSizeMB / 
duration2.Seconds()
                                recordsPerSec2 := float64(rows2) / 
duration2.Seconds()
                                fmt.Printf("  JSONReader (chunked):      %12v 
(%.2f MB/s, %.0f rec/s)\n",
                                        duration2, throughput2, recordsPerSec2)
                        }
   
                        // Benchmark JSONReader with NDJSON (single chunk)
                        duration3, rows3, err3 := 
benchmarkJSONReaderSingleChunk(ndjsonData, schema)
                        if err3 != nil {
                                t.Errorf("JSONReader (single chunk) failed: 
%v", err3)
                        } else {
                                throughput3 := ndjsonSizeMB / 
duration3.Seconds()
                                recordsPerSec3 := float64(rows3) / 
duration3.Seconds()
                                fmt.Printf("  JSONReader (single chunk): %12v 
(%.2f MB/s, %.0f rec/s)\n",
                                        duration3, throughput3, recordsPerSec3)
                        }
                        // Benchmark RecordFromJSON (expects JSON array)
                        duration1, rows1, err1 := 
benchmarkRecordFromJSON(jsonArrayData, schema)
                        if err1 != nil {
                                t.Errorf("RecordFromJSON failed: %v", err1)
                        } else {
                                throughput1 := jsonArraySizeMB / 
duration1.Seconds()
                                recordsPerSec1 := float64(rows1) / 
duration1.Seconds()
                                fmt.Printf("  RecordFromJSON:            %12v 
(%.2f MB/s, %.0f rec/s)\n",
                                        duration1, throughput1, recordsPerSec1)
                        }
   
                        if err1 == nil && err2 == nil && err3 == nil {
                                fmt.Printf("\n  Speedup vs RecordFromJSON:\n")
                                fmt.Printf("    JSONReader (chunked):      
%.2fx\n", float64(duration1)/float64(duration2))
                                fmt.Printf("    JSONReader (single chunk): 
%.2fx\n", float64(duration1)/float64(duration3))
                        }
                })
        }
   }
   ```
   
   LGTM 🚀 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to