davidzollo commented on issue #10305:
URL: https://github.com/apache/seatunnel/issues/10305#issuecomment-3831097005

   # Implementation: Stain Trace for End-to-End Performance Analysis
   
   Hi team, I've implemented a **Stain Trace** system that addresses the 
traffic dyeing/sampling requirements discussed in this issue, with a focus on 
end-to-end performance analysis and bottleneck identification.
   
   ## ๐ŸŽฏ What We Built
   
   A framework-level data tracing system that tracks sampled records through 
the entire pipeline (Source โ†’ Queue โ†’ Transform โ†’ Sink) with minimal overhead, 
enabling precise performance bottleneck identification without requiring any 
connector modifications.
   
   **Pull Request**: #10434
   
   ---
   
   ## ๐Ÿ—๏ธ Architecture Overview
   
   ### Core Concept: Sample-Based One-Shot Reporting
   
   Instead of reporting at every stage (which causes event storms), we:
   1. **Mark** a small fraction of records at the Source with a trace payload
   2. **Propagate** the payload through the Engine's internal flow
   3. **Report** once at the Sink with complete timing breakdown
   
   This ensures reporting volume is strictly proportional to sampling rate, 
with each sampled record generating exactly **1 event**.
   
   ### Trace Stages
   
   | Stage | Meaning | Framework Location |
   |-------|---------|-------------------|
   | `S0` | Source emits record | `SeaTunnelSourceCollector.collect()` |
   | `Q+` | Queue enqueue start | `IntermediateBlockingQueue.received()` / 
`RecordEventProducer` |
   | `Q-` | Queue dequeue complete | `IntermediateBlockingQueue.collect()` / 
`RecordEventHandler` |
   | `T+` | Transform receives record | `TransformFlowLifeCycle.received()` |
   | `T-` | Transform outputs record | Before `collector.collect(output)` |
   | `W!` | Sink write complete | After `writer.write()` in `SinkFlowLifeCycle` 
|
   
   ---
   
   ## ๐Ÿ“ฆ Implementation Details
   
   ### 1. Trace Payload Protocol (Cross-Node Transmission)
   
   We extend `RecordSerializer` to transmit a compact binary payload:
   
   ```
   MAGIC(4)  = 0x53545452  // 'STTR'
   VER(2)    = 1
   TRACE_ID(8)
   START_TS_MS(8)
   COUNT(2)  // number of entries
   ENTRIES:
     repeat COUNT times:
       STAGE(1)     // stage code (S0, Q+, Q-, T+, T-, W!)
       TASK_ID(8)   // TaskLocation.getTaskID()
       TS_MS(8)     // System.currentTimeMillis()
   ```
   
   - **Storage**: `SeaTunnelRow.options["__st_trace_payload"] = byte[]`
   - **Serialization**: Extended `RecordSerializer` with backward compatibility 
(old version data format still readable)
   - **Size limit**: Configurable max entries per trace (default: 32)
   
   ### 2. Sampling & Budget Control
   
   **Deterministic Sampling** (at Source only):
   - Each Source subtask maintains a sequence counter
   - Sample when `seq % sampleRate == 0`
   - No random number generation overhead
   
   **Strict Budget Control** (per Worker per second):
   - `stain-trace-max-traces-per-second-per-worker = X`
   - When limit reached, stop creating new samples (but continue propagating 
existing ones)
   - Guarantees upper bound on event volume
   
   ### 3. Framework Modifications (Connector-Agnostic)
   
   All changes are in **SeaTunnel Engine framework layer**:
   
   | Component | File | Change |
   |-----------|------|--------|
   | Config | `EngineConfig` | Add stain-trace config fields |
   | Serialization | `RecordSerializer` | Serialize/deserialize trace payload |
   | Source | `SeaTunnelSourceCollector` | Create payload, append S0 |
   | Queue | `IntermediateBlockingQueue` / Disruptor | Append Q+/Q- |
   | Transform | `TransformFlowLifeCycle` | Append T+/T- (with 1-to-N handling) 
|
   | Sink | `SinkFlowLifeCycle` | Append W!, report event |
   
   **Zero connector changes required** - all connectors automatically get 
tracing capability.
   
   ### 4. Event Reporting
   
   **New Event Type**: `StainTraceEvent` with fields:
   - `jobId`, `traceId`, `sinkTaskId`, `tableId`
   - `payload` (byte[], contains all timing entries)
   - `createdTime`
   
   **Reporting Path**:
   - Sink โ†’ Worker's `EventService` โ†’ Master โ†’ HTTP POST (JSON)
   - Reuses existing event infrastructure (`JobEventHttpReportHandler`)
   
   ---
   
   ## ๐Ÿ“Š Trace Collector Service
   
   We also built a standalone **Trace Collector** HTTP service:
   
   **Features**:
   - **Multi-database storage**: PostgreSQL, MySQL, ClickHouse
   - **REST APIs**: `/ingest` (receive events), `/traces` (query), `/health`, 
`/metrics`
   - **Task mapping cache**: Enriches traces with readable task names
   - **Built-in metrics**: Ingestion rate, errors, query performance
   
   **Quick Start**: See `seatunnel-trace/STAIN_TRACE_QUICKSTART.md`
   
   ---
   
   ## โš™๏ธ Configuration
   
   Enable in `seatunnel.yaml`:
   
   ```yaml
   seatunnel:
     engine:
       stain-trace-enabled: true
       stain-trace-sample-rate: 100000        # 1 in 100k records
       stain-trace-max-traces-per-second-per-worker: 50
       stain-trace-max-entries-per-trace: 32
   ```
   
   **Production-Safe Defaults**:
   - `enabled: false` (opt-in)
   - Zero overhead when disabled (single boolean check)
   - ~0.1-1% overhead with 1% sampling when enabled
   
   ---
   
   ## ๐Ÿ” Use Cases & Analysis
   
   ### Bottleneck Identification
   
   From trace payload, calculate:
   - **End-to-end latency**: `W!.ts - S0.ts`
   - **Queue wait time**: `Q-.ts - Q+.ts`
   - **Transform processing**: `T-.ts - T+.ts`
   - **Sink write time**: `W!.ts - (previous stage)`
   
   Then aggregate:
   - P50/P95/P99 of e2e latency
   - P95 queue wait time (identifies backpressure)
   - P95 sink write time (identifies storage bottlenecks)
   
   ### Example Query (ClickHouse)
   
   ```sql
   SELECT 
     job_id,
     quantile(0.95)(e2e_ms) as p95_e2e,
     quantile(0.95)(queue_wait_ms) as p95_queue_wait,
     quantile(0.95)(sink_write_ms) as p95_sink_write
   FROM trace_records
   WHERE job_id = '123456'
   GROUP BY job_id
   ```
   
   ---
   
   ## ๐Ÿงช Testing & Validation
   
   **Unit Tests**:
   - `RecordSerializerTest`: Backward compatibility (old format โ†’ new reader)
   - `StainTracePayloadTest`: Encoding/decoding, size limits
   - `StainTraceSamplerTest`: Sampling logic, budget control
   
   **Integration Tests**:
   - `StainTraceFlowIT`: End-to-end trace through Source โ†’ Transform โ†’ Sink
   - `TransformFlowLifeCycleStainTraceTest`: 1-to-N (FlatMap), 1-to-0 (Filter) 
scenarios
   
   **Performance Regression**:
   - Baseline TPS/CPU/GC with tracing disabled
   - Verify negligible impact with production-safe sampling
   
   ---
   
   ## ๐Ÿš€ Deployment Strategy
   
   **Non-Rolling Upgrade** (all nodes same version):
   1. Stop all jobs
   2. Stop Master/Worker nodes
   3. Replace Engine jars (includes serialization protocol change)
   4. Start Master โ†’ Workers
   5. Enable on canary job first
   6. Validate events, then expand
   
   **Rollback**:
   - Stop cluster, revert to old jars, restart
   - **Critical**: If using Hazelcast persistence, must clear persisted data 
(old version can't read new serialization format)
   
   ---
   
   ## ๐Ÿ“‹ Differences from Original Proposal
   
   This issue proposed:
   - **Flags in `SeaTunnelRow`**: We use a dedicated binary payload instead 
(cleaner serialization)
   - **ThreadLocal context**: We serialize payload with the record (simpler, no 
async complexity)
   - **Metrics integration**: We focus on **tracing events** rather than 
metrics aggregation
   
   Our approach prioritizes:
   - **End-to-end visibility** over per-stage metrics
   - **Minimal serialization overhead** over ThreadLocal management
   - **External analysis** (OLAP) over in-process metric aggregation
   
   ---
   
   ## ๐Ÿ”— Resources
   
   - **PR #10434**: https://github.com/apache/seatunnel/pull/10434
   - **Quick Start Guide**: `seatunnel-trace/STAIN_TRACE_QUICKSTART.md` (in PR)
   - **Files Changed**: 84 files, +6824/-42 lines
   
   ---
   
   ## ๐Ÿ’ญ Open for Discussion
   
   This implementation satisfies the "traffic dyeing/sampling" requirement but 
focuses on **performance analysis** rather than **metrics segmentation**. 
   
   If the primary goal is to split metrics by sampled/non-sampled traffic (as 
originally proposed), we could:
   1. Reuse the stain trace payload mechanism for flag propagation
   2. Add a metrics proxy layer that reads the payload
   3. Route metrics to separate counters based on flags
   
   Would love feedback on whether this tracing-first approach meets the needs, 
or if we should extend it for metrics segmentation.
   
   ---
   
   **Status**: Ready for review in #10434. All tests passing, documentation 
complete.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to