[ 
https://issues.apache.org/jira/browse/FLINK-39170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39170:
-----------------------------------
    Labels: pull-request-available  (was: )

> [Table/SQL] Add async batch lookup join optimization for temporal table joins
> -----------------------------------------------------------------------------
>
>                 Key: FLINK-39170
>                 URL: https://issues.apache.org/jira/browse/FLINK-39170
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / API, Table SQL / Planner
>            Reporter: featzhang
>            Priority: Major
>              Labels: pull-request-available
>
> h2. Overview
> This feature introduces an async batch lookup join optimization for temporal 
> table joins in Flink's Table API & SQL. The optimization batches multiple 
> lookup requests together to reduce network overhead and improve throughput, 
> particularly beneficial for high-throughput scenarios with frequent dimension 
> table lookups.
> h2. Motivation
> Currently, Flink's temporal table joins perform individual lookup requests 
> for each input record, which can lead to:
> * *High network overhead*: Each lookup request requires a separate network 
> round-trip
> * *Suboptimal throughput*: Network latency becomes a bottleneck in 
> high-throughput scenarios  
> * *Resource inefficiency*: Frequent small requests don't utilize network 
> bandwidth effectively
> h2. Proposed Solution
> Implement an async batch lookup join that:
> * *Batches lookup requests*: Groups multiple lookup requests together before 
> sending to the external system
> * *Configurable batch size*: Allows users to tune batch size based on their 
> specific use case
> * *Timeout-based flushing*: Ensures low latency by flushing incomplete 
> batches after a configurable interval
> * *Backward compatibility*: Maintains existing behavior when batch mode is 
> disabled (default)
> h2. Use Cases
> * *Real-time analytics*: High-throughput streaming jobs that need to enrich 
> data with dimension tables
> * *Event processing*: Applications processing millions of events per second 
> with frequent lookups
> * *Data warehousing*: ETL pipelines that need to join streaming data with 
> slowly changing dimensions
> h2. Expected Benefits
> * *Improved throughput*: 2-5x improvement in lookup throughput for 
> high-volume scenarios
> * *Reduced network overhead*: Fewer network round-trips through batching
> * *Better resource utilization*: More efficient use of network bandwidth and 
> external system resources
> * *Configurable performance*: Users can tune batch size and flush interval 
> based on their latency/throughput requirements
> h2. Configuration Options
> h3. table.optimizer.dim-lookup-join.batch.enabled
> * *Type*: Boolean
> * *Default*: false
> * *Description*: Whether to enable the dim table batch lookup join 
> optimization
> h3. table.optimizer.dim-lookup-join.batch.size
> * *Type*: Integer
> * *Default*: 100
> * *Description*: The batch size of dim table lookup join. Controls how many 
> lookup requests are batched together.
> h3. table.optimizer.dim-lookup-join.batch.flush.millis
> * *Type*: Long
> * *Default*: 2000L
> * *Description*: The flush interval of dim table lookup join in batch mode, 
> in milliseconds. Controls the maximum time to wait before flushing a batch.
> h2. Implementation Details
> h3. Core Components
> # *AsyncBatchLookupJoinRunner*: Main runner for batch async lookup joins 
> without calc
> # *AsyncBatchLookupJoinWithCalcRunner*: Runner for batch async lookup joins 
> with post-lookup calculations
> # *BatchLookupFunctionWrapper*: Adapter that wraps single lookup functions 
> for batch processing
> # *BatchResultFutureWrapper*: Wrapper for handling batch result futures
> h3. Architecture
> {noformat}
> Input Stream → Batch Accumulator → Batch Lookup Function → Result Processor → 
> Output Stream
>                      ↓
>               Timeout Flusher (configurable interval)
> {noformat}
> h3. Batch Processing Flow
> # *Accumulation*: Input records are accumulated into batches up to the 
> configured batch size
> # *Timeout Handling*: If batch size isn't reached within the flush interval, 
> the batch is processed anyway
> # *Batch Lookup*: The entire batch is sent to the lookup function as a single 
> request
> # *Result Distribution*: Results are distributed back to the corresponding 
> input records
> # *Output*: Joined results are emitted to the output stream
> h2. Compatibility
> * *Backward Compatible*: Existing temporal table joins continue to work 
> unchanged
> * *Opt-in Feature*: Must be explicitly enabled via configuration
> * *API Compatible*: No changes to existing Table API or SQL syntax
> h2. Testing Strategy
> * *Unit Tests*: Comprehensive tests for all batch processing components
> * *Integration Tests*: End-to-end tests with various batch sizes and flush 
> intervals
> * *Performance Tests*: Benchmarks comparing batch vs non-batch performance
> * *Configuration Tests*: Validation of all configuration options



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to