[
https://issues.apache.org/jira/browse/FLINK-39170?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-39170:
-----------------------------------
Labels: pull-request-available (was: )
> [Table/SQL] Add async batch lookup join optimization for temporal table joins
> -----------------------------------------------------------------------------
>
> Key: FLINK-39170
> URL: https://issues.apache.org/jira/browse/FLINK-39170
> Project: Flink
> Issue Type: New Feature
> Components: Table SQL / API, Table SQL / Planner
> Reporter: featzhang
> Priority: Major
> Labels: pull-request-available
>
> h2. Overview
> This feature introduces an async batch lookup join optimization for temporal
> table joins in Flink's Table API & SQL. The optimization batches multiple
> lookup requests together to reduce network overhead and improve throughput,
> particularly beneficial for high-throughput scenarios with frequent dimension
> table lookups.
> h2. Motivation
> Currently, Flink's temporal table joins perform individual lookup requests
> for each input record, which can lead to:
> * *High network overhead*: Each lookup request requires a separate network
> round-trip
> * *Suboptimal throughput*: Network latency becomes a bottleneck in
> high-throughput scenarios
> * *Resource inefficiency*: Frequent small requests don't utilize network
> bandwidth effectively
> h2. Proposed Solution
> Implement an async batch lookup join that:
> * *Batches lookup requests*: Groups multiple lookup requests together before
> sending to the external system
> * *Configurable batch size*: Allows users to tune batch size based on their
> specific use case
> * *Timeout-based flushing*: Ensures low latency by flushing incomplete
> batches after a configurable interval
> * *Backward compatibility*: Maintains existing behavior when batch mode is
> disabled (default)
> h2. Use Cases
> * *Real-time analytics*: High-throughput streaming jobs that need to enrich
> data with dimension tables
> * *Event processing*: Applications processing millions of events per second
> with frequent lookups
> * *Data warehousing*: ETL pipelines that need to join streaming data with
> slowly changing dimensions
> h2. Expected Benefits
> * *Improved throughput*: 2-5x improvement in lookup throughput for
> high-volume scenarios
> * *Reduced network overhead*: Fewer network round-trips through batching
> * *Better resource utilization*: More efficient use of network bandwidth and
> external system resources
> * *Configurable performance*: Users can tune batch size and flush interval
> based on their latency/throughput requirements
> h2. Configuration Options
> h3. table.optimizer.dim-lookup-join.batch.enabled
> * *Type*: Boolean
> * *Default*: false
> * *Description*: Whether to enable the dim table batch lookup join
> optimization
> h3. table.optimizer.dim-lookup-join.batch.size
> * *Type*: Integer
> * *Default*: 100
> * *Description*: The batch size of dim table lookup join. Controls how many
> lookup requests are batched together.
> h3. table.optimizer.dim-lookup-join.batch.flush.millis
> * *Type*: Long
> * *Default*: 2000L
> * *Description*: The flush interval of dim table lookup join in batch mode,
> in milliseconds. Controls the maximum time to wait before flushing a batch.
> h2. Implementation Details
> h3. Core Components
> # *AsyncBatchLookupJoinRunner*: Main runner for batch async lookup joins
> without calc
> # *AsyncBatchLookupJoinWithCalcRunner*: Runner for batch async lookup joins
> with post-lookup calculations
> # *BatchLookupFunctionWrapper*: Adapter that wraps single lookup functions
> for batch processing
> # *BatchResultFutureWrapper*: Wrapper for handling batch result futures
> h3. Architecture
> {noformat}
> Input Stream → Batch Accumulator → Batch Lookup Function → Result Processor →
> Output Stream
> ↓
> Timeout Flusher (configurable interval)
> {noformat}
> h3. Batch Processing Flow
> # *Accumulation*: Input records are accumulated into batches up to the
> configured batch size
> # *Timeout Handling*: If batch size isn't reached within the flush interval,
> the batch is processed anyway
> # *Batch Lookup*: The entire batch is sent to the lookup function as a single
> request
> # *Result Distribution*: Results are distributed back to the corresponding
> input records
> # *Output*: Joined results are emitted to the output stream
> h2. Compatibility
> * *Backward Compatible*: Existing temporal table joins continue to work
> unchanged
> * *Opt-in Feature*: Must be explicitly enabled via configuration
> * *API Compatible*: No changes to existing Table API or SQL syntax
> h2. Testing Strategy
> * *Unit Tests*: Comprehensive tests for all batch processing components
> * *Integration Tests*: End-to-end tests with various batch sizes and flush
> intervals
> * *Performance Tests*: Benchmarks comparing batch vs non-batch performance
> * *Configuration Tests*: Validation of all configuration options
--
This message was sent by Atlassian Jira
(v8.20.10#820010)