Hi,
coming to think of it, I was thinking of just putting the batchids in
response topic , and use some other process like http source connector to
check if the batches completed and extract the output records.
This will simplify this a lot . if you guys are ok i can update the google
doc

On Mon, Nov 10, 2025 at 4:54 AM David Radley <[email protected]>
wrote:

> Hi, Rahul,
> Thanks for the proposal. A see in your pseudo code you have a polling loop
> with a 10 second wait. Can we use an asynchronous API with a future to be
> notified of completion of the batch job,
>  Kind regards, David.
>
> From: Shengkai Fang <[email protected]>
> Date: Monday, 10 November 2025 at 10:04
> To: [email protected] <[email protected]>
> Subject: [EXTERNAL] Re: [DISCUSS] FLIP-XXX: Native Batch Inference Support
> in Flink SQL's ML_PREDICT
>
> Hi, Rahul.
>
> +1 for this proposal. However, due to my current workload, I'll need until
> the end of this week to review it thoroughly.
>
> Best,
> Shengkai
>
> Rahul Bhattacharya <[email protected]> 于2025年11月9日周日 13:08写道:
>
> > Hi ,
> > i have created a draft FLIP for it
> > https://docs.google.com/document/d/1U-eSuKwi5vIgAPt6ZBvb-RcbcRJiRy0e/
> >
> > Please let me know your thoughts
> >
> > regards
> > Rahul
> >
> > On Sat, Nov 8, 2025 at 5:54 PM Rahul Bhattacharya <[email protected]>
> > wrote:
> >
> > > Hi,
> > > I actually thought of reworking my previous response. I want the table
> > api
> > > to create jsonl files and call openai/claude batch apis.
> > > The implementation I am doing is going to batch the records into a file
> > > and call the api with the file and then continuously poll the repose to
> > see
> > > the status of the batch and then use that to write the response
> records.
> > > The ML_Predict in its current form is not usable as people are not
> > looking
> > > for synchronous response which is twice as expensive as the
> asynchronous
> > > response.
> > > let me know you thoughts and i can create a FLIP for it
> > > regards
> > >
> > > On Sat, Nov 8, 2025 at 3:14 PM Rahul Bhattacharya <[email protected]
> >
> > > wrote:
> > >
> > >> Hi Flink Community,
> > >>
> > >> I'm interested in contributing an enhancement to Apache Flink's
> > >> ML_PREDICT
> > >> functionality for LLM interactions. I'd like to gauge community
> interest
> > >> and get
> > >> early feedback before proceeding with detailed design or a FLIP.
> > >>
> > >> ## Problem Statement
> > >>
> > >> Currently, when using Flink SQL's ML_PREDICT with LLM endpoints, each
> > >> record
> > >> triggers an individual API call. For a stream processing 1000
> > >> records/second,
> > >> this results in:
> > >>
> > >> - **1000 separate API calls per second**
> > >> - **High latency**: Each call has network overhead + API processing
> time
> > >> - **High cost**: Most LLM providers charge per token, and lack of
> > >> batching means
> > >>   no cost optimization
> > >> - **Rate limiting issues**: Hitting provider rate limits quickly
> > >> - **Poor throughput**: API calls are serialized per record
> > >>
> > >> ### Current Behavior (Inefficient)
> > >> ```sql
> > >> -- This makes 10 individual API calls
> > >> SELECT id, ML_PREDICT('llm_model', text) as result
> > >> FROM (VALUES
> > >>     (1, 'text1'), (2, 'text2'), ..., (10, 'text10')
> > >> ) AS t(id, text);
> > >> ```
> > >> **Result**: 10 separate HTTP requests, 10x latency, 10x overhead
> > >>
> > >> ## Proposed Solution: Application-Level Batching with Prompt
> Engineering
> > >>
> > >> Since most LLM APIs (OpenAI, Anthropic Claude, etc.) don't provide
> > native
> > >> batch
> > >> endpoints, we propose implementing batching at the application level
> by:
> > >>
> > >> 1. **Accumulating N records** into a single batch
> > >> 2. **Injecting records into a structured prompt** that instructs the
> LLM
> > >> to
> > >>    process multiple items
> > >> 3. **Parsing structured responses** to extract results for each record
> > >> 4. **Emitting individual results** back to the Flink pipeline
> > >>
> > >> ### How It Works
> > >>
> > >> **Step 1: Batch Accumulation**
> > >> Collect up to `batch.size` records or wait up to `batch.timeout.ms`
> > >>
> > >> **Step 2: Prompt Construction**
> > >>
> > >> System: You are a sentiment analyzer. Process each item and respond
> with
> > >> JSON.
> > >>
> > >> User: Analyze the sentiment of these texts. Return a JSON array with
> one
> > >> object per input containing "index" and "sentiment" fields.
> > >>
> > >> Input 1: "This product is amazing!" Input 2: "Terrible experience,
> very
> > >> disappointed" Input 3: "It's okay, nothing special" ... Input 10:
> "Best
> > >> purchase ever!"
> > >>
> > >> Respond with: [{"index": 1, "sentiment": "..."}, {"index": 2,
> > >> "sentiment": "..."}, ...]
> > >>
> > >> **Step 3: Response Parsing**
> > >> ```json
> > >> [
> > >>   {"index": 1, "sentiment": "positive"},
> > >>   {"index": 2, "sentiment": "negative"},
> > >>   {"index": 3, "sentiment": "neutral"},
> > >>   ...
> > >>   {"index": 10, "sentiment": "positive"}
> > >> ]
> > >> ```
> > >>
> > >> **Step 4: Result Distribution**
> > >> Parse JSON and emit individual results back to corresponding records
> > >>
> > >> ### Model Configuration (Defaults)
> > >> ```sql
> > >> CREATE MODEL llm_sentiment WITH (
> > >>     'provider' = 'openai',
> > >>     'model' = 'gpt-4',
> > >>     'api_key' = '${API_KEY}',
> > >>     'batch.size' = '20',
> > >>     'batch.timeout.ms' = '1000',
> > >>     'system.prompt' = 'You are a sentiment analyzer. Always respond
> with
> > >> valid JSON.',
> > >>     'batch.prompt.template' = 'Analyze sentiment for these texts.
> Return
> > >> JSON array: [{"index": <n>, "sentiment":
> > "<positive|negative|neutral>"}]',
> > >>     'response.format' = 'json',
> > >>     'response.path' = '$[*]',  -- JSONPath to extract array of results
> > >>     'response.index.field' = 'index',  -- Field containing record
> index
> > >>     'response.value.field' = 'sentiment'  -- Field containing result
> > >> );
> > >> ```
> > >>
> > >> ### Query Usage (Use Defaults)
> > >> ```sql
> > >> -- Uses batch_size=20 from model definition
> > >> SELECT id, text, ML_PREDICT('llm_sentiment', text) as sentiment
> > >> FROM customer_reviews;
> > >> ```
> > >>
> > >> ### Query Usage (Override for Custom Analysis)
> > >> ```sql
> > >> -- Override prompt and batch size for different use case
> > >> SELECT id, text, ML_PREDICT('llm_sentiment', text,
> > >>     MAP['batch.size', '50',
> > >>         'batch.prompt.template', 'Extract key entities. Return JSON:
> > >> [{"index": <n>, "entities": [...]}]',
> > >>         'response.value.field', 'entities']) as entities
> > >> FROM documents;
> > >> ```
> > >>
> > >> ## Performance and Cost Impact
> > >>
> > >> ### Example: Processing 10,000 customer reviews
> > >>
> > >> **Current (unbatched)**:
> > >> - 10,000 API calls
> > >> - ~10,000 x 200ms latency = 2,000 seconds total processing time
> > >> (serialized)
> > >> - ~10,000 x $0.002 = $20 in API costs
> > >> - High rate limit pressure
> > >>
> > >> **With batching (batch_size=20)**:
> > >> - 500 API calls (10,000 / 20)
> > >> - ~500 x 300ms latency = 150 seconds total processing time
> > >> - ~500 x $0.006 = $3 in API costs (slightly higher per call due to
> > larger
> > >> prompts,
> > >>   but still 85% cheaper overall)
> > >> - **20x fewer API calls**
> > >> - **13x faster processing**
> > >> - **85% cost reduction**
> > >>
> > >> ## Proposed Implementation
> > >>
> > >> ### Configuration Parameters
> > >>
> > >> **Model-level (defaults)**:
> > >> - `batch.size`: Maximum records per batch (default: 1 for backward
> > >> compatibility)
> > >> - `batch.timeout.ms`: Max time to wait before flushing incomplete
> batch
> > >> (default: 1000ms)
> > >> - `system.prompt`: System-level instruction for the LLM
> > >> - `batch.prompt.template`: Template explaining how to process batched
> > >> inputs
> > >> - `response.format`: Expected response format ('json', 'xml',
> > 'delimited')
> > >> - `response.path`: JSONPath or XPath to extract results array
> > >> - `response.index.field`: Field name containing the record index
> > >> - `response.value.field`: Field name containing the actual result
> > >> - `max.retries`: Retry attempts for failed batches (default: 3)
> > >> - `request.timeout.ms`: Timeout for API calls (default: 30000ms)
> > >>
> > >> **Query-level (overrides)**:
> > >> - Any of the above can be overridden via MAP parameter in ML_PREDICT
> > >> - Per-query customization for different analysis tasks
> > >>
> > >> ### Key Features
> > >> 1. **Prompt injection**: Automatically construct batch prompts with
> > >> indexed inputs
> > >> 2. **Structured response parsing**: Support JSON, XML, or delimited
> > >> formats
> > >> 3. **Index tracking**: Maintain record-to-result mapping through the
> > batch
> > >> 4. **Error handling**: Handle parsing failures, missing indices,
> > >> malformed responses
> > >> 5. **Fallback to individual calls**: If batch fails, optionally retry
> > >> records individually
> > >> 6. **Provider-agnostic**: Works with any LLM API (OpenAI, Anthropic,
> > >> Azure, self-hosted)
> > >> 7. **Async processing**: Non-blocking batch requests
> > >> 8. **Back-pressure**: Proper flow control when API is slow
> > >> 9. **Backward compatible**: batch.size=1 maintains current behavior
> > >>
> > >> ### Technical Approach
> > >> - Extend existing ML_PREDICT infrastructure
> > >> - Add batching buffer in the ML_PREDICT operator
> > >> - Implement prompt template engine for batch construction:
> > >>   - Inject record index + content into template
> > >>   - Support various templating formats (JSON, XML, plain text)
> > >> - Implement response parser:
> > >>   - Extract structured data (JSONPath, XPath, regex)
> > >>   - Map results back to original records by index
> > >>   - Handle missing or malformed responses
> > >> - Maintain record ordering and error attribution
> > >> - Support parameter override mechanism in ML_PREDICT function
> signature
> > >>
> > >> ### Response Parsing Strategy
> > >>
> > >> The implementation must handle:
> > >> 1. **Successful batch response**: Parse and distribute results
> > >> 2. **Partial failure**: Some records missing from response → emit
> errors
> > >> for those
> > >> 3. **Complete parse failure**: Optionally fallback to individual calls
> > >> 4. **Index mismatch**: Response indices don't match input → log
> warning
> > >> and best-effort match
> > >> 5. **Malformed JSON**: Retry with error handling
> > >>
> > >> Example error handling:
> > >> ```sql
> > >> -- Records that fail parsing get null results with error metadata
> > >> SELECT
> > >>     id,
> > >>     text,
> > >>     result.value as sentiment,
> > >>     result.error as error_msg
> > >> FROM source_table,
> > >> LATERAL TABLE(ML_PREDICT('llm_sentiment', text));
> > >> ```
> > >>
> > >> ## Limitations and Considerations
> > >>
> > >> 1. **LLM instruction following**: Depends on model's ability to follow
> > >> structured
> > >>    output instructions. GPT-4 and Claude are reliable; older models
> may
> > >> struggle.
> > >>
> > >> 2. **Prompt size limits**: Batching too many records may exceed
> context
> > >> windows
> > >>    - GPT-4: ~8K tokens input limit
> > >>    - Claude: ~200K tokens but practical batches smaller
> > >>    - Need configurable max batch size based on average record length
> > >>
> > >> 3. **Token cost trade-off**: Larger batches mean:
> > >>    - Fewer API calls (good)
> > >>    - But larger prompts with instructions/formatting (slight overhead)
> > >>    - Net savings still 80-90% in practice
> > >>
> > >> 4. **Parsing reliability**: Small risk of malformed responses
> > >>    - Mitigated by: clear instructions, JSON mode (GPT-4), retry logic
> > >>    - Fallback to individual calls if batch parsing fails repeatedly
> > >>
> > >> 5. **Latency characteristics**:
> > >>    - Individual records see slightly higher latency (waiting for
> batch)
> > >>    - Overall throughput dramatically improved
> > >>    - Use `batch.timeout.ms` to balance latency vs throughput
> > >>
> > >> ## Future Extensions
> > >>
> > >> This batching architecture would support:
> > >> 1. **Stateful chat sessions**: Batch multiple turns of a conversation
> > >> with
> > >>    maintained history per session key
> > >> 2. **Embedding generation**: Some providers (OpenAI) do have batch
> > >> embedding APIs
> > >> 3. **Multi-modal batching**: Batch image + text processing with
> > >> structured outputs
> > >>
> > >> ## Questions for the Community
> > >>
> > >> 1. **Architecture**: Should this extend ML_PREDICT or be a new
> function?
> > >>    (I propose extending ML_PREDICT for backward compatibility)
> > >>
> > >> 2. **FLIP Required?**: Does this enhancement warrant a FLIP?
> > >>
> > >> 3. **Existing Work**: Is anyone working on batching for ML_PREDICT or
> > >> similar
> > >>    functionality?
> > >>
> > >> 4. **Prompt Template Engine**: Should we:
> > >>    - Build a custom template engine?
> > >>    - Use existing library (e.g., StringTemplate, Mustache)?
> > >>    - Keep it simple with String.format initially?
> > >>
> > >> 5. **Response Parsing**: Preferred approach:
> > >>    - JSONPath library (flexible but adds dependency)
> > >>    - Simple JSON parsing with field names
> > >>    - Pluggable parser interface for extensibility?
> > >>
> > >> 6. **Error Handling**: If parsing fails for entire batch:
> > >>    - Fail all records in batch?
> > >>    - Retry batch once more?
> > >>    - Fallback to individual calls (with circuit breaker)?
> > >>    - Make strategy configurable?
> > >>
> > >> 7. **Batch Assembly**: Should batching happen:
> > >>    - Per parallel instance (each task maintains its own batch)?
> > >>    - Globally coordinated (shuffle to batch coordinator)?
> > >>    - I propose per-instance for simplicity and lower latency
> > >>
> > >> 8. **Compatibility**: Default batch.size=1 to maintain current
> behavior,
> > >> users
> > >>    opt-in to batching?
> > >>
> > >> ## Why This Matters
> > >>
> > >> LLM inference is becoming a critical part of real-time data pipelines.
> > >> Without
> > >> batching:
> > >> - Users face prohibitive costs for high-throughput workloads
> > >> - Rate limits block production deployments
> > >> - Latency makes real-time processing impractical
> > >>
> > >> While LLM providers don't offer native batch APIs, application-level
> > >> batching
> > >> through prompt engineering is a proven pattern used in production by
> > many
> > >> organizations. This proposal brings that capability natively into
> Flink.
> > >>
> > >> The hybrid configuration approach provides:
> > >> - **Sensible defaults** for common use cases (sentiment analysis,
> > >> classification)
> > >> - **Flexibility** to customize prompts and parsing for specific needs
> > >> - **Easy migration** for existing queries (batch.size=1 default)
> > >>
> > >> ## Next Steps
> > >>
> > >> If there's interest from the community, I'm happy to:
> > >> 1. Prepare a detailed design document with prompt templates and
> parsing
> > >> examples
> > >> 2. Create a JIRA ticket
> > >> 3. Develop a prototype demonstrating the batching and parsing logic
> > >> 4. Write a FLIP if required
> > >>
> > >> Looking forward to your feedback and guidance on how best to
> proceed!--
> > >> Thanks And Regards
> > >> Rahul
> > >>
> > >
> > >
> > > --
> > > Thanks And Regards
> > > Rahul
> > >
> >
> >
> > --
> > Thanks And Regards
> > Rahul
> >
>
> Unless otherwise stated above:
>
> IBM United Kingdom Limited
> Registered in England and Wales with number 741598
> Registered office: Building C, IBM Hursley Office, Hursley Park Road,
> Winchester, Hampshire SO21 2JN
>


-- 
Thanks And Regards
Rahul

Reply via email to