[ 
https://issues.apache.org/jira/browse/FLINK-39154?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

featzhang updated FLINK-39154:
------------------------------
    Description: 
This issue introduces Async Batch Lookup Join for temporal table joins, 
enabling batch-based asynchronous lookup of dimension tables.

Currently, async lookup join performs row-by-row asynchronous invocation, where 
each left input row triggers one async request. This leads to:
 * High RPC overhead under large throughput
 * Inefficient utilization of remote dimension stores
 * Increased latency and resource pressure

This improvement introduces a batch-based async execution model, where multiple 
input rows are buffered and sent in a single async request.

In addition, this change supports applying a Calc (projection/filter) on the 
dimension table before evaluating the join condition.

*Motivation*

In many production scenarios:
 * Dimension lookup backends support batch key query
 * Per-request overhead dominates total cost
 * High QPS streaming jobs create excessive external calls

Batching lookup requests:
 * Reduces network round-trips
 * Improves throughput
 * Lowers CPU and serialization overhead
 * Reduces pressure on external systems

*Proposed Changes*
*1. Runtime*

Introduce a new async runner:
{code:java}
AsyncBatchLookupJoinRunner
{code}
{*}Key behaviors{*}:
 * Buffer left input rows and corresponding ResultFutures
 * Trigger flush when: Batch size reaches configured threshold, OR Flush 
interval timeout is reached
 * Invoke async fetcher with List<RowData>
 * Distribute lookup results back to corresponding left rows
 * Support LEFT OUTER JOIN semantics
 * Reuse ResultFuture instances to reduce allocation cost

If a Calc exists on the temporal table, use:
{code:java}
AsyncBatchLookupJoinWithCalcRunner
{code}
which applies:
 * Async fetch
 * Convert to internal RowData
 * Apply generated Calc (projection/filter)
 * Apply join condition
 * Produce joined results

*2. Planner & Code Generation*
 * Extend LookupJoinCodeGenerator to support batch async mode
 * Integrate with existing generated ResultFuture pipeline
 * Support Calc push-down for temporal table
 * Maintain compatibility with join condition filtering

A new optimizer option is introduced:
{code:java}
table.optimizer.dim-lookup-join.batch-enabled
{code}
Default: false

When enabled, planner generates batch async lookup runner instead of row-based 
async runner.

*3. Tests*

Enhancements include:
 * Extend in-memory lookup source to support batch key lookup
 * Add IT cases: Async batch temporal join, Async batch join with Calc push-down

Tests verify:
 * Correct join semantics
 * LEFT OUTER JOIN behavior
 * Calc correctness
 * Result ordering and consistency

*Compatibility & Migration*

Fully backward compatible
 * Disabled by default
 * No change in SQL semantics
 * No state format changes
 * No public API changes

*Performance Impact*

Expected improvements:
 * Reduced async invocation count
 * Lower RPC overhead
 * Improved throughput
 * Better resource utilization

Particularly beneficial for:
 * High-throughput streaming jobs
 * Remote dimension stores (e.g., HTTP/KV-based lookups)
 * Latency-sensitive real-time pipelines

*Future Work*
 * Code-generate a fully integrated JoinedRowResultFuture to simplify layering
 * Adaptive batch size tuning
 * Add metrics for batch flush and async latency
 * Unify async batch logic across connectors

  was:
This issue introduces Async Batch Lookup Join for temporal table joins, 
enabling batch-based asynchronous lookup of dimension tables.

Currently, async lookup join performs row-by-row asynchronous invocation, where 
each left input row triggers one async request. This leads to:

* High RPC overhead under large throughput
* Inefficient utilization of remote dimension stores
* Increased latency and resource pressure

This improvement introduces a batch-based async execution model, where multiple 
input rows are buffered and sent in a single async request.

In addition, this change supports applying a Calc (projection/filter) on the 
dimension table before evaluating the join condition.

*Motivation*

In many production scenarios:

* Dimension lookup backends support batch key query
* Per-request overhead dominates total cost
* High QPS streaming jobs create excessive external calls

Batching lookup requests:

* Reduces network round-trips
* Improves throughput
* Lowers CPU and serialization overhead
* Reduces pressure on external systems

*Proposed Changes*
*1. Runtime*

Introduce a new async runner:


{code:java}
AsyncBatchLookupJoinRunner
{code}


*Key behaviors*:

* Buffer left input rows and corresponding ResultFutures
* Trigger flush when: Batch size reaches configured threshold, OR  Flush 
interval timeout is reached
* Invoke async fetcher with List<RowData>
* Distribute lookup results back to corresponding left rows
* Support LEFT OUTER JOIN semantics
* Reuse ResultFuture instances to reduce allocation cost

If a Calc exists on the temporal table, use:


{code:java}
AsyncBatchLookupJoinWithCalcRunner
{code}


which applies:

* Async fetch
* Convert to internal RowData
* Apply generated Calc (projection/filter)
* Apply join condition
* Produce joined results

*2. Planner & Code Generation*

* Extend LookupJoinCodeGenerator to support batch async mode
* Integrate with existing generated ResultFuture pipeline
* Support Calc push-down for temporal table
* Maintain compatibility with join condition filtering

A new optimizer option is introduced:


{code:java}
table.optimizer.dim-lookup-join.batch-enabled
{code}

Default: false

When enabled, planner generates batch async lookup runner instead of row-based 
async runner.

*3. Tests*

Enhancements include:

* Extend in-memory lookup source to support batch key lookup
* Add IT cases: Async batch temporal join, Async batch join with Calc push-down

Tests verify:

* Correct join semantics
* LEFT OUTER JOIN behavior
* Calc correctness
* Result ordering and consistency

*Compatibility & Migration*

Fully backward compatible

* Disabled by default
* No change in SQL semantics
* No state format changes
* No public API changes

*Performance Impact*

Expected improvements:

* Reduced async invocation count
* Lower RPC overhead
* Improved throughput
* Better resource utilization

Particularly beneficial for:

* High-throughput streaming jobs
* Remote dimension stores (e.g., HTTP/KV-based lookups)
* Latency-sensitive real-time pipelines

*Future Work*

* Code-generate a fully integrated JoinedRowResultFuture to simplify layering
* Adaptive batch size tuning
* Add metrics for batch flush and async latency
* Unify async batch logic across connectors

*Example*


{code:sql}
SET 'table.optimizer.dim-lookup-join.batch-enabled' = 'true';

SELECT T.id, T.len, T.content, D.name
FROM src AS T
JOIN user_table FOR SYSTEM_TIME AS OF T.proctime AS D
ON T.id = D.id
{code}

With push-down:


{code:sql}
SELECT T.id, T.len, T.content, D.name
FROM src AS T
JOIN user_table FOR SYSTEM_TIME AS OF T.proctime AS D
ON T.id = D.id AND D.age > 20
{code}




> [Table]Support Async Batch Lookup Join (with Calc) for Temporal Table Join
> --------------------------------------------------------------------------
>
>                 Key: FLINK-39154
>                 URL: https://issues.apache.org/jira/browse/FLINK-39154
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / API, Table SQL / Planner, Table SQL / Runtime
>            Reporter: featzhang
>            Priority: Major
>
> This issue introduces Async Batch Lookup Join for temporal table joins, 
> enabling batch-based asynchronous lookup of dimension tables.
> Currently, async lookup join performs row-by-row asynchronous invocation, 
> where each left input row triggers one async request. This leads to:
>  * High RPC overhead under large throughput
>  * Inefficient utilization of remote dimension stores
>  * Increased latency and resource pressure
> This improvement introduces a batch-based async execution model, where 
> multiple input rows are buffered and sent in a single async request.
> In addition, this change supports applying a Calc (projection/filter) on the 
> dimension table before evaluating the join condition.
> *Motivation*
> In many production scenarios:
>  * Dimension lookup backends support batch key query
>  * Per-request overhead dominates total cost
>  * High QPS streaming jobs create excessive external calls
> Batching lookup requests:
>  * Reduces network round-trips
>  * Improves throughput
>  * Lowers CPU and serialization overhead
>  * Reduces pressure on external systems
> *Proposed Changes*
> *1. Runtime*
> Introduce a new async runner:
> {code:java}
> AsyncBatchLookupJoinRunner
> {code}
> {*}Key behaviors{*}:
>  * Buffer left input rows and corresponding ResultFutures
>  * Trigger flush when: Batch size reaches configured threshold, OR Flush 
> interval timeout is reached
>  * Invoke async fetcher with List<RowData>
>  * Distribute lookup results back to corresponding left rows
>  * Support LEFT OUTER JOIN semantics
>  * Reuse ResultFuture instances to reduce allocation cost
> If a Calc exists on the temporal table, use:
> {code:java}
> AsyncBatchLookupJoinWithCalcRunner
> {code}
> which applies:
>  * Async fetch
>  * Convert to internal RowData
>  * Apply generated Calc (projection/filter)
>  * Apply join condition
>  * Produce joined results
> *2. Planner & Code Generation*
>  * Extend LookupJoinCodeGenerator to support batch async mode
>  * Integrate with existing generated ResultFuture pipeline
>  * Support Calc push-down for temporal table
>  * Maintain compatibility with join condition filtering
> A new optimizer option is introduced:
> {code:java}
> table.optimizer.dim-lookup-join.batch-enabled
> {code}
> Default: false
> When enabled, planner generates batch async lookup runner instead of 
> row-based async runner.
> *3. Tests*
> Enhancements include:
>  * Extend in-memory lookup source to support batch key lookup
>  * Add IT cases: Async batch temporal join, Async batch join with Calc 
> push-down
> Tests verify:
>  * Correct join semantics
>  * LEFT OUTER JOIN behavior
>  * Calc correctness
>  * Result ordering and consistency
> *Compatibility & Migration*
> Fully backward compatible
>  * Disabled by default
>  * No change in SQL semantics
>  * No state format changes
>  * No public API changes
> *Performance Impact*
> Expected improvements:
>  * Reduced async invocation count
>  * Lower RPC overhead
>  * Improved throughput
>  * Better resource utilization
> Particularly beneficial for:
>  * High-throughput streaming jobs
>  * Remote dimension stores (e.g., HTTP/KV-based lookups)
>  * Latency-sensitive real-time pipelines
> *Future Work*
>  * Code-generate a fully integrated JoinedRowResultFuture to simplify layering
>  * Adaptive batch size tuning
>  * Add metrics for batch flush and async latency
>  * Unify async batch logic across connectors



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to