caldempsey commented on PR #449: URL: https://github.com/apache/arrow-go/pull/449#issuecomment-3117354957
``` large_payload_50k_records: Records: 50000 Target record size: 10240 bytes JSON Array size: 500.93 MB NDJSON size: 500.93 MB JSONReader (chunked): 1.267049792s (395.35 MB/s, 39462 rec/s) JSONReader (single chunk): 1.058549125s (473.22 MB/s, 47234 rec/s) RecordFromJSON: 1.043343541s (480.12 MB/s, 47923 rec/s) Speedup vs RecordFromJSON: JSONReader (chunked): 0.82x JSONReader (single chunk): 0.99x --- PASS: TestFileLabelsPayloads/large_payload_50k_records (4.69s) === RUN TestFileLabelsPayloads/4MB_payload_1000_records 4MB_payload_1000_records: Records: 1000 Target record size: 4000000 bytes JSON Array size: 3814.94 MB NDJSON size: 3814.94 MB JSONReader (chunked): 9.64786775s (395.42 MB/s, 104 rec/s) JSONReader (single chunk): 8.009190083s (476.32 MB/s, 125 rec/s) RecordFromJSON: 7.984174833s (477.81 MB/s, 125 rec/s) Speedup vs RecordFromJSON: JSONReader (chunked): 0.83x JSONReader (single chunk): 1.00x --- PASS: TestFileLabelsPayloads/4MB_payload_1000_records (35.29s) --- PASS: TestFileLabelsPayloads (39.97s) PASS ``` Ran a test using data that's not quite production, but closer. The use case involves ingesting transcripts of audio files via Spark Connect. These transcripts can be quite large, up to 4MB each. We're streaming this data into a data lake using Spark Connect instead of traditional Spark jobs, which allows us to receive synchronous responses at the call site and support coordination patterns (GET/POST) using standard REST consumers (which is _so_ much better for error handling). Hence the massive strings. Feel free to use this benchmark in your own tests — it should be a fairly comparable setup. ```go package main_test import ( "bytes" "encoding/json" "fmt" "strings" "testing" "time" "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/arrow-go/v18/arrow/memory" "github.com/google/uuid" ) // FileLabel represents the structure from the Python benchmark type FileLabel struct { FilePath string `json:"file_path"` SomeLabel string `json:"some_label"` Confidence float64 `json:"confidence"` ModelName string `json:"model_name"` ProcessedAt string `json:"processed_at"` BatchID string `json:"batch_id"` Metadata string `json:"metadata"` // JSON string } // MetadataContent represents the metadata structure type MetadataContent struct { ProcessingTimeMs int `json:"processing_time_ms"` Version string `json:"version"` Padding string `json:"padding"` RecordID int `json:"record_id"` ExtraField1 string `json:"extra_field_1"` ExtraField2 string `json:"extra_field_2"` ExtraField3 string `json:"extra_field_3"` } // makePadding creates a padding string of the specified size func makePadding(size int) string { alphabet := "abcdefghijklmnopqrstuvwxyz" repeats := size/len(alphabet) + 1 return strings.Repeat(alphabet, repeats)[:size] } // generateLargePayload generates the same payload structure as the Python benchmark func generateLargePayload(numRecords int, recordSizeBytes int) []FileLabel { batchID := uuid.New().String() baseRecordSize := 200 metadataSize := recordSizeBytes - baseRecordSize if metadataSize < 100 { metadataSize = 100 } padding := makePadding(metadataSize) nowISO := time.Now().UTC().Format(time.RFC3339) payload := make([]FileLabel, numRecords) for i := 0; i < numRecords; i++ { metadata := MetadataContent{ ProcessingTimeMs: 150 + (i % 100), Version: "1.0", Padding: padding, RecordID: i, ExtraField1: fmt.Sprintf("extra_value_%d", i), ExtraField2: fmt.Sprintf("extra_value_%d", i*2), ExtraField3: fmt.Sprintf("extra_value_%d", i*3), } metadataJSON, _ := json.Marshal(metadata) payload[i] = FileLabel{ FilePath: fmt.Sprintf("s3://test-bucket/batch-%s/file-%d.jpg", batchID, i), SomeLabel: fmt.Sprintf("label_%d", i%10), Confidence: 0.85 + float64(i%15)/100.0, ModelName: fmt.Sprintf("model_v%d", (i%5)+1), ProcessedAt: nowISO, BatchID: batchID, Metadata: string(metadataJSON), } } return payload } // payloadToNDJSON converts payload to newline-delimited JSON bytes func payloadToNDJSON(payload []FileLabel) []byte { var buf bytes.Buffer encoder := json.NewEncoder(&buf) for _, record := range payload { encoder.Encode(record) } return buf.Bytes() } // payloadToJSONArray converts payload to JSON array format func payloadToJSONArray(payload []FileLabel) []byte { data, _ := json.Marshal(payload) return data } // Define Arrow schema matching the FileLabel structure func getFileLabelsSchema() *arrow.Schema { return arrow.NewSchema([]arrow.Field{ {Name: "file_path", Type: arrow.BinaryTypes.String}, {Name: "some_label", Type: arrow.BinaryTypes.String}, {Name: "confidence", Type: arrow.PrimitiveTypes.Float64}, {Name: "model_name", Type: arrow.BinaryTypes.String}, {Name: "processed_at", Type: arrow.BinaryTypes.String}, {Name: "batch_id", Type: arrow.BinaryTypes.String}, {Name: "metadata", Type: arrow.BinaryTypes.String}, }, nil) } func benchmarkRecordFromJSON(data []byte, schema *arrow.Schema) (time.Duration, int64, error) { pool := memory.NewGoAllocator() start := time.Now() record, _, err := array.RecordFromJSON(pool, schema, bytes.NewReader(data)) duration := time.Since(start) if err != nil { return duration, 0, err } numRows := record.NumRows() record.Release() return duration, numRows, nil } func benchmarkJSONReader(ndjsonData []byte, schema *arrow.Schema) (time.Duration, int64, error) { pool := memory.NewGoAllocator() start := time.Now() rdr := array.NewJSONReader(bytes.NewReader(ndjsonData), schema, array.WithAllocator(pool)) defer rdr.Release() var totalRows int64 for rdr.Next() { rec := rdr.Record() totalRows += rec.NumRows() } if err := rdr.Err(); err != nil { return time.Since(start), totalRows, err } duration := time.Since(start) return duration, totalRows, nil } func benchmarkJSONReaderSingleChunk(ndjsonData []byte, schema *arrow.Schema) (time.Duration, int64, error) { pool := memory.NewGoAllocator() start := time.Now() rdr := array.NewJSONReader(bytes.NewReader(ndjsonData), schema, array.WithAllocator(pool), array.WithChunk(-1)) defer rdr.Release() if !rdr.Next() { return time.Since(start), 0, fmt.Errorf("no record found") } rec := rdr.Record() numRows := rec.NumRows() duration := time.Since(start) return duration, numRows, nil } // BenchmarkScenario represents a test scenario type BenchmarkScenario struct { Name string NumRecords int RecordSizeBytes int } func TestFileLabelsPayloads(t *testing.T) { scenarios := []BenchmarkScenario{ { Name: "large_payload_50k_records", NumRecords: 50_000, RecordSizeBytes: 10_240, }, { Name: "4MB_payload_1000_records", NumRecords: 1000, RecordSizeBytes: 4000000, }, } schema := getFileLabelsSchema() fmt.Println("Spark Connect API Payload Benchmarks") fmt.Println("==================================") for _, scenario := range scenarios { t.Run(scenario.Name, func(t *testing.T) { // Generate payload payload := generateLargePayload(scenario.NumRecords, scenario.RecordSizeBytes) // Convert to both formats jsonArrayData := payloadToJSONArray(payload) ndjsonData := payloadToNDJSON(payload) jsonArraySizeMB := float64(len(jsonArrayData)) / (1024 * 1024) ndjsonSizeMB := float64(len(ndjsonData)) / (1024 * 1024) fmt.Printf("\n%s:\n", scenario.Name) fmt.Printf(" Records: %d\n", scenario.NumRecords) fmt.Printf(" Target record size: %d bytes\n", scenario.RecordSizeBytes) fmt.Printf(" JSON Array size: %.2f MB\n", jsonArraySizeMB) fmt.Printf(" NDJSON size: %.2f MB\n", ndjsonSizeMB) // Benchmark JSONReader with NDJSON (default chunking) duration2, rows2, err2 := benchmarkJSONReader(ndjsonData, schema) if err2 != nil { t.Errorf("JSONReader (chunked) failed: %v", err2) } else { throughput2 := ndjsonSizeMB / duration2.Seconds() recordsPerSec2 := float64(rows2) / duration2.Seconds() fmt.Printf(" JSONReader (chunked): %12v (%.2f MB/s, %.0f rec/s)\n", duration2, throughput2, recordsPerSec2) } // Benchmark JSONReader with NDJSON (single chunk) duration3, rows3, err3 := benchmarkJSONReaderSingleChunk(ndjsonData, schema) if err3 != nil { t.Errorf("JSONReader (single chunk) failed: %v", err3) } else { throughput3 := ndjsonSizeMB / duration3.Seconds() recordsPerSec3 := float64(rows3) / duration3.Seconds() fmt.Printf(" JSONReader (single chunk): %12v (%.2f MB/s, %.0f rec/s)\n", duration3, throughput3, recordsPerSec3) } // Benchmark RecordFromJSON (expects JSON array) duration1, rows1, err1 := benchmarkRecordFromJSON(jsonArrayData, schema) if err1 != nil { t.Errorf("RecordFromJSON failed: %v", err1) } else { throughput1 := jsonArraySizeMB / duration1.Seconds() recordsPerSec1 := float64(rows1) / duration1.Seconds() fmt.Printf(" RecordFromJSON: %12v (%.2f MB/s, %.0f rec/s)\n", duration1, throughput1, recordsPerSec1) } if err1 == nil && err2 == nil && err3 == nil { fmt.Printf("\n Speedup vs RecordFromJSON:\n") fmt.Printf(" JSONReader (chunked): %.2fx\n", float64(duration1)/float64(duration2)) fmt.Printf(" JSONReader (single chunk): %.2fx\n", float64(duration1)/float64(duration3)) } }) } } ``` LGTM 🚀 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org