[
https://issues.apache.org/jira/browse/FLINK-39154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
featzhang updated FLINK-39154:
------------------------------
Description:
This issue introduces Async Batch Lookup Join for temporal table joins,
enabling batch-based asynchronous lookup of dimension tables.
Currently, async lookup join performs row-by-row asynchronous invocation, where
each left input row triggers one async request. This leads to:
* High RPC overhead under large throughput
* Inefficient utilization of remote dimension stores
* Increased latency and resource pressure
This improvement introduces a batch-based async execution model, where multiple
input rows are buffered and sent in a single async request.
In addition, this change supports applying a Calc (projection/filter) on the
dimension table before evaluating the join condition.
*Motivation*
In many production scenarios:
* Dimension lookup backends support batch key query
* Per-request overhead dominates total cost
* High QPS streaming jobs create excessive external calls
Batching lookup requests:
* Reduces network round-trips
* Improves throughput
* Lowers CPU and serialization overhead
* Reduces pressure on external systems
*Proposed Changes*
*1. Runtime*
Introduce a new async runner:
{code:java}
AsyncBatchLookupJoinRunner
{code}
{*}Key behaviors{*}:
* Buffer left input rows and corresponding ResultFutures
* Trigger flush when: Batch size reaches configured threshold, OR Flush
interval timeout is reached
* Invoke async fetcher with List<RowData>
* Distribute lookup results back to corresponding left rows
* Support LEFT OUTER JOIN semantics
* Reuse ResultFuture instances to reduce allocation cost
If a Calc exists on the temporal table, use:
{code:java}
AsyncBatchLookupJoinWithCalcRunner
{code}
which applies:
* Async fetch
* Convert to internal RowData
* Apply generated Calc (projection/filter)
* Apply join condition
* Produce joined results
*2. Planner & Code Generation*
* Extend LookupJoinCodeGenerator to support batch async mode
* Integrate with existing generated ResultFuture pipeline
* Support Calc push-down for temporal table
* Maintain compatibility with join condition filtering
A new optimizer option is introduced:
{code:java}
table.optimizer.dim-lookup-join.batch-enabled
{code}
Default: false
When enabled, planner generates batch async lookup runner instead of row-based
async runner.
*3. Tests*
Enhancements include:
* Extend in-memory lookup source to support batch key lookup
* Add IT cases: Async batch temporal join, Async batch join with Calc push-down
Tests verify:
* Correct join semantics
* LEFT OUTER JOIN behavior
* Calc correctness
* Result ordering and consistency
*Compatibility & Migration*
Fully backward compatible
* Disabled by default
* No change in SQL semantics
* No state format changes
* No public API changes
*Performance Impact*
Expected improvements:
* Reduced async invocation count
* Lower RPC overhead
* Improved throughput
* Better resource utilization
Particularly beneficial for:
* High-throughput streaming jobs
* Remote dimension stores (e.g., HTTP/KV-based lookups)
* Latency-sensitive real-time pipelines
*Future Work*
* Code-generate a fully integrated JoinedRowResultFuture to simplify layering
* Adaptive batch size tuning
* Add metrics for batch flush and async latency
* Unify async batch logic across connectors
was:
This issue introduces Async Batch Lookup Join for temporal table joins,
enabling batch-based asynchronous lookup of dimension tables.
Currently, async lookup join performs row-by-row asynchronous invocation, where
each left input row triggers one async request. This leads to:
* High RPC overhead under large throughput
* Inefficient utilization of remote dimension stores
* Increased latency and resource pressure
This improvement introduces a batch-based async execution model, where multiple
input rows are buffered and sent in a single async request.
In addition, this change supports applying a Calc (projection/filter) on the
dimension table before evaluating the join condition.
*Motivation*
In many production scenarios:
* Dimension lookup backends support batch key query
* Per-request overhead dominates total cost
* High QPS streaming jobs create excessive external calls
Batching lookup requests:
* Reduces network round-trips
* Improves throughput
* Lowers CPU and serialization overhead
* Reduces pressure on external systems
*Proposed Changes*
*1. Runtime*
Introduce a new async runner:
{code:java}
AsyncBatchLookupJoinRunner
{code}
*Key behaviors*:
* Buffer left input rows and corresponding ResultFutures
* Trigger flush when: Batch size reaches configured threshold, OR Flush
interval timeout is reached
* Invoke async fetcher with List<RowData>
* Distribute lookup results back to corresponding left rows
* Support LEFT OUTER JOIN semantics
* Reuse ResultFuture instances to reduce allocation cost
If a Calc exists on the temporal table, use:
{code:java}
AsyncBatchLookupJoinWithCalcRunner
{code}
which applies:
* Async fetch
* Convert to internal RowData
* Apply generated Calc (projection/filter)
* Apply join condition
* Produce joined results
*2. Planner & Code Generation*
* Extend LookupJoinCodeGenerator to support batch async mode
* Integrate with existing generated ResultFuture pipeline
* Support Calc push-down for temporal table
* Maintain compatibility with join condition filtering
A new optimizer option is introduced:
{code:java}
table.optimizer.dim-lookup-join.batch-enabled
{code}
Default: false
When enabled, planner generates batch async lookup runner instead of row-based
async runner.
*3. Tests*
Enhancements include:
* Extend in-memory lookup source to support batch key lookup
* Add IT cases: Async batch temporal join, Async batch join with Calc push-down
Tests verify:
* Correct join semantics
* LEFT OUTER JOIN behavior
* Calc correctness
* Result ordering and consistency
*Compatibility & Migration*
Fully backward compatible
* Disabled by default
* No change in SQL semantics
* No state format changes
* No public API changes
*Performance Impact*
Expected improvements:
* Reduced async invocation count
* Lower RPC overhead
* Improved throughput
* Better resource utilization
Particularly beneficial for:
* High-throughput streaming jobs
* Remote dimension stores (e.g., HTTP/KV-based lookups)
* Latency-sensitive real-time pipelines
*Future Work*
* Code-generate a fully integrated JoinedRowResultFuture to simplify layering
* Adaptive batch size tuning
* Add metrics for batch flush and async latency
* Unify async batch logic across connectors
*Example*
{code:sql}
SET 'table.optimizer.dim-lookup-join.batch-enabled' = 'true';
SELECT T.id, T.len, T.content, D.name
FROM src AS T
JOIN user_table FOR SYSTEM_TIME AS OF T.proctime AS D
ON T.id = D.id
{code}
With push-down:
{code:sql}
SELECT T.id, T.len, T.content, D.name
FROM src AS T
JOIN user_table FOR SYSTEM_TIME AS OF T.proctime AS D
ON T.id = D.id AND D.age > 20
{code}
> [Table]Support Async Batch Lookup Join (with Calc) for Temporal Table Join
> --------------------------------------------------------------------------
>
> Key: FLINK-39154
> URL: https://issues.apache.org/jira/browse/FLINK-39154
> Project: Flink
> Issue Type: Improvement
> Components: Table SQL / API, Table SQL / Planner, Table SQL / Runtime
> Reporter: featzhang
> Priority: Major
>
> This issue introduces Async Batch Lookup Join for temporal table joins,
> enabling batch-based asynchronous lookup of dimension tables.
> Currently, async lookup join performs row-by-row asynchronous invocation,
> where each left input row triggers one async request. This leads to:
> * High RPC overhead under large throughput
> * Inefficient utilization of remote dimension stores
> * Increased latency and resource pressure
> This improvement introduces a batch-based async execution model, where
> multiple input rows are buffered and sent in a single async request.
> In addition, this change supports applying a Calc (projection/filter) on the
> dimension table before evaluating the join condition.
> *Motivation*
> In many production scenarios:
> * Dimension lookup backends support batch key query
> * Per-request overhead dominates total cost
> * High QPS streaming jobs create excessive external calls
> Batching lookup requests:
> * Reduces network round-trips
> * Improves throughput
> * Lowers CPU and serialization overhead
> * Reduces pressure on external systems
> *Proposed Changes*
> *1. Runtime*
> Introduce a new async runner:
> {code:java}
> AsyncBatchLookupJoinRunner
> {code}
> {*}Key behaviors{*}:
> * Buffer left input rows and corresponding ResultFutures
> * Trigger flush when: Batch size reaches configured threshold, OR Flush
> interval timeout is reached
> * Invoke async fetcher with List<RowData>
> * Distribute lookup results back to corresponding left rows
> * Support LEFT OUTER JOIN semantics
> * Reuse ResultFuture instances to reduce allocation cost
> If a Calc exists on the temporal table, use:
> {code:java}
> AsyncBatchLookupJoinWithCalcRunner
> {code}
> which applies:
> * Async fetch
> * Convert to internal RowData
> * Apply generated Calc (projection/filter)
> * Apply join condition
> * Produce joined results
> *2. Planner & Code Generation*
> * Extend LookupJoinCodeGenerator to support batch async mode
> * Integrate with existing generated ResultFuture pipeline
> * Support Calc push-down for temporal table
> * Maintain compatibility with join condition filtering
> A new optimizer option is introduced:
> {code:java}
> table.optimizer.dim-lookup-join.batch-enabled
> {code}
> Default: false
> When enabled, planner generates batch async lookup runner instead of
> row-based async runner.
> *3. Tests*
> Enhancements include:
> * Extend in-memory lookup source to support batch key lookup
> * Add IT cases: Async batch temporal join, Async batch join with Calc
> push-down
> Tests verify:
> * Correct join semantics
> * LEFT OUTER JOIN behavior
> * Calc correctness
> * Result ordering and consistency
> *Compatibility & Migration*
> Fully backward compatible
> * Disabled by default
> * No change in SQL semantics
> * No state format changes
> * No public API changes
> *Performance Impact*
> Expected improvements:
> * Reduced async invocation count
> * Lower RPC overhead
> * Improved throughput
> * Better resource utilization
> Particularly beneficial for:
> * High-throughput streaming jobs
> * Remote dimension stores (e.g., HTTP/KV-based lookups)
> * Latency-sensitive real-time pipelines
> *Future Work*
> * Code-generate a fully integrated JoinedRowResultFuture to simplify layering
> * Adaptive batch size tuning
> * Add metrics for batch flush and async latency
> * Unify async batch logic across connectors
--
This message was sent by Atlassian Jira
(v8.20.10#820010)