Hi ,
i have created a draft FLIP for it
https://docs.google.com/document/d/1U-eSuKwi5vIgAPt6ZBvb-RcbcRJiRy0e/

Please let me know your thoughts

regards
Rahul

On Sat, Nov 8, 2025 at 5:54 PM Rahul Bhattacharya <[email protected]>
wrote:

> Hi,
> I actually thought of reworking my previous response. I want the table api
> to create jsonl files and call openai/claude batch apis.
> The implementation I am doing is going to batch the records into a file
> and call the api with the file and then continuously poll the repose to see
> the status of the batch and then use that to write the response records.
> The ML_Predict in its current form is not usable as people are not looking
> for synchronous response which is twice as expensive as the asynchronous
> response.
> let me know you thoughts and i can create a FLIP for it
> regards
>
> On Sat, Nov 8, 2025 at 3:14 PM Rahul Bhattacharya <[email protected]>
> wrote:
>
>> Hi Flink Community,
>>
>> I'm interested in contributing an enhancement to Apache Flink's
>> ML_PREDICT
>> functionality for LLM interactions. I'd like to gauge community interest
>> and get
>> early feedback before proceeding with detailed design or a FLIP.
>>
>> ## Problem Statement
>>
>> Currently, when using Flink SQL's ML_PREDICT with LLM endpoints, each
>> record
>> triggers an individual API call. For a stream processing 1000
>> records/second,
>> this results in:
>>
>> - **1000 separate API calls per second**
>> - **High latency**: Each call has network overhead + API processing time
>> - **High cost**: Most LLM providers charge per token, and lack of
>> batching means
>>   no cost optimization
>> - **Rate limiting issues**: Hitting provider rate limits quickly
>> - **Poor throughput**: API calls are serialized per record
>>
>> ### Current Behavior (Inefficient)
>> ```sql
>> -- This makes 10 individual API calls
>> SELECT id, ML_PREDICT('llm_model', text) as result
>> FROM (VALUES
>>     (1, 'text1'), (2, 'text2'), ..., (10, 'text10')
>> ) AS t(id, text);
>> ```
>> **Result**: 10 separate HTTP requests, 10x latency, 10x overhead
>>
>> ## Proposed Solution: Application-Level Batching with Prompt Engineering
>>
>> Since most LLM APIs (OpenAI, Anthropic Claude, etc.) don't provide native
>> batch
>> endpoints, we propose implementing batching at the application level by:
>>
>> 1. **Accumulating N records** into a single batch
>> 2. **Injecting records into a structured prompt** that instructs the LLM
>> to
>>    process multiple items
>> 3. **Parsing structured responses** to extract results for each record
>> 4. **Emitting individual results** back to the Flink pipeline
>>
>> ### How It Works
>>
>> **Step 1: Batch Accumulation**
>> Collect up to `batch.size` records or wait up to `batch.timeout.ms`
>>
>> **Step 2: Prompt Construction**
>>
>> System: You are a sentiment analyzer. Process each item and respond with
>> JSON.
>>
>> User: Analyze the sentiment of these texts. Return a JSON array with one
>> object per input containing "index" and "sentiment" fields.
>>
>> Input 1: "This product is amazing!" Input 2: "Terrible experience, very
>> disappointed" Input 3: "It's okay, nothing special" ... Input 10: "Best
>> purchase ever!"
>>
>> Respond with: [{"index": 1, "sentiment": "..."}, {"index": 2,
>> "sentiment": "..."}, ...]
>>
>> **Step 3: Response Parsing**
>> ```json
>> [
>>   {"index": 1, "sentiment": "positive"},
>>   {"index": 2, "sentiment": "negative"},
>>   {"index": 3, "sentiment": "neutral"},
>>   ...
>>   {"index": 10, "sentiment": "positive"}
>> ]
>> ```
>>
>> **Step 4: Result Distribution**
>> Parse JSON and emit individual results back to corresponding records
>>
>> ### Model Configuration (Defaults)
>> ```sql
>> CREATE MODEL llm_sentiment WITH (
>>     'provider' = 'openai',
>>     'model' = 'gpt-4',
>>     'api_key' = '${API_KEY}',
>>     'batch.size' = '20',
>>     'batch.timeout.ms' = '1000',
>>     'system.prompt' = 'You are a sentiment analyzer. Always respond with
>> valid JSON.',
>>     'batch.prompt.template' = 'Analyze sentiment for these texts. Return
>> JSON array: [{"index": <n>, "sentiment": "<positive|negative|neutral>"}]',
>>     'response.format' = 'json',
>>     'response.path' = '$[*]',  -- JSONPath to extract array of results
>>     'response.index.field' = 'index',  -- Field containing record index
>>     'response.value.field' = 'sentiment'  -- Field containing result
>> );
>> ```
>>
>> ### Query Usage (Use Defaults)
>> ```sql
>> -- Uses batch_size=20 from model definition
>> SELECT id, text, ML_PREDICT('llm_sentiment', text) as sentiment
>> FROM customer_reviews;
>> ```
>>
>> ### Query Usage (Override for Custom Analysis)
>> ```sql
>> -- Override prompt and batch size for different use case
>> SELECT id, text, ML_PREDICT('llm_sentiment', text,
>>     MAP['batch.size', '50',
>>         'batch.prompt.template', 'Extract key entities. Return JSON:
>> [{"index": <n>, "entities": [...]}]',
>>         'response.value.field', 'entities']) as entities
>> FROM documents;
>> ```
>>
>> ## Performance and Cost Impact
>>
>> ### Example: Processing 10,000 customer reviews
>>
>> **Current (unbatched)**:
>> - 10,000 API calls
>> - ~10,000 x 200ms latency = 2,000 seconds total processing time
>> (serialized)
>> - ~10,000 x $0.002 = $20 in API costs
>> - High rate limit pressure
>>
>> **With batching (batch_size=20)**:
>> - 500 API calls (10,000 / 20)
>> - ~500 x 300ms latency = 150 seconds total processing time
>> - ~500 x $0.006 = $3 in API costs (slightly higher per call due to larger
>> prompts,
>>   but still 85% cheaper overall)
>> - **20x fewer API calls**
>> - **13x faster processing**
>> - **85% cost reduction**
>>
>> ## Proposed Implementation
>>
>> ### Configuration Parameters
>>
>> **Model-level (defaults)**:
>> - `batch.size`: Maximum records per batch (default: 1 for backward
>> compatibility)
>> - `batch.timeout.ms`: Max time to wait before flushing incomplete batch
>> (default: 1000ms)
>> - `system.prompt`: System-level instruction for the LLM
>> - `batch.prompt.template`: Template explaining how to process batched
>> inputs
>> - `response.format`: Expected response format ('json', 'xml', 'delimited')
>> - `response.path`: JSONPath or XPath to extract results array
>> - `response.index.field`: Field name containing the record index
>> - `response.value.field`: Field name containing the actual result
>> - `max.retries`: Retry attempts for failed batches (default: 3)
>> - `request.timeout.ms`: Timeout for API calls (default: 30000ms)
>>
>> **Query-level (overrides)**:
>> - Any of the above can be overridden via MAP parameter in ML_PREDICT
>> - Per-query customization for different analysis tasks
>>
>> ### Key Features
>> 1. **Prompt injection**: Automatically construct batch prompts with
>> indexed inputs
>> 2. **Structured response parsing**: Support JSON, XML, or delimited
>> formats
>> 3. **Index tracking**: Maintain record-to-result mapping through the batch
>> 4. **Error handling**: Handle parsing failures, missing indices,
>> malformed responses
>> 5. **Fallback to individual calls**: If batch fails, optionally retry
>> records individually
>> 6. **Provider-agnostic**: Works with any LLM API (OpenAI, Anthropic,
>> Azure, self-hosted)
>> 7. **Async processing**: Non-blocking batch requests
>> 8. **Back-pressure**: Proper flow control when API is slow
>> 9. **Backward compatible**: batch.size=1 maintains current behavior
>>
>> ### Technical Approach
>> - Extend existing ML_PREDICT infrastructure
>> - Add batching buffer in the ML_PREDICT operator
>> - Implement prompt template engine for batch construction:
>>   - Inject record index + content into template
>>   - Support various templating formats (JSON, XML, plain text)
>> - Implement response parser:
>>   - Extract structured data (JSONPath, XPath, regex)
>>   - Map results back to original records by index
>>   - Handle missing or malformed responses
>> - Maintain record ordering and error attribution
>> - Support parameter override mechanism in ML_PREDICT function signature
>>
>> ### Response Parsing Strategy
>>
>> The implementation must handle:
>> 1. **Successful batch response**: Parse and distribute results
>> 2. **Partial failure**: Some records missing from response → emit errors
>> for those
>> 3. **Complete parse failure**: Optionally fallback to individual calls
>> 4. **Index mismatch**: Response indices don't match input → log warning
>> and best-effort match
>> 5. **Malformed JSON**: Retry with error handling
>>
>> Example error handling:
>> ```sql
>> -- Records that fail parsing get null results with error metadata
>> SELECT
>>     id,
>>     text,
>>     result.value as sentiment,
>>     result.error as error_msg
>> FROM source_table,
>> LATERAL TABLE(ML_PREDICT('llm_sentiment', text));
>> ```
>>
>> ## Limitations and Considerations
>>
>> 1. **LLM instruction following**: Depends on model's ability to follow
>> structured
>>    output instructions. GPT-4 and Claude are reliable; older models may
>> struggle.
>>
>> 2. **Prompt size limits**: Batching too many records may exceed context
>> windows
>>    - GPT-4: ~8K tokens input limit
>>    - Claude: ~200K tokens but practical batches smaller
>>    - Need configurable max batch size based on average record length
>>
>> 3. **Token cost trade-off**: Larger batches mean:
>>    - Fewer API calls (good)
>>    - But larger prompts with instructions/formatting (slight overhead)
>>    - Net savings still 80-90% in practice
>>
>> 4. **Parsing reliability**: Small risk of malformed responses
>>    - Mitigated by: clear instructions, JSON mode (GPT-4), retry logic
>>    - Fallback to individual calls if batch parsing fails repeatedly
>>
>> 5. **Latency characteristics**:
>>    - Individual records see slightly higher latency (waiting for batch)
>>    - Overall throughput dramatically improved
>>    - Use `batch.timeout.ms` to balance latency vs throughput
>>
>> ## Future Extensions
>>
>> This batching architecture would support:
>> 1. **Stateful chat sessions**: Batch multiple turns of a conversation
>> with
>>    maintained history per session key
>> 2. **Embedding generation**: Some providers (OpenAI) do have batch
>> embedding APIs
>> 3. **Multi-modal batching**: Batch image + text processing with
>> structured outputs
>>
>> ## Questions for the Community
>>
>> 1. **Architecture**: Should this extend ML_PREDICT or be a new function?
>>    (I propose extending ML_PREDICT for backward compatibility)
>>
>> 2. **FLIP Required?**: Does this enhancement warrant a FLIP?
>>
>> 3. **Existing Work**: Is anyone working on batching for ML_PREDICT or
>> similar
>>    functionality?
>>
>> 4. **Prompt Template Engine**: Should we:
>>    - Build a custom template engine?
>>    - Use existing library (e.g., StringTemplate, Mustache)?
>>    - Keep it simple with String.format initially?
>>
>> 5. **Response Parsing**: Preferred approach:
>>    - JSONPath library (flexible but adds dependency)
>>    - Simple JSON parsing with field names
>>    - Pluggable parser interface for extensibility?
>>
>> 6. **Error Handling**: If parsing fails for entire batch:
>>    - Fail all records in batch?
>>    - Retry batch once more?
>>    - Fallback to individual calls (with circuit breaker)?
>>    - Make strategy configurable?
>>
>> 7. **Batch Assembly**: Should batching happen:
>>    - Per parallel instance (each task maintains its own batch)?
>>    - Globally coordinated (shuffle to batch coordinator)?
>>    - I propose per-instance for simplicity and lower latency
>>
>> 8. **Compatibility**: Default batch.size=1 to maintain current behavior,
>> users
>>    opt-in to batching?
>>
>> ## Why This Matters
>>
>> LLM inference is becoming a critical part of real-time data pipelines.
>> Without
>> batching:
>> - Users face prohibitive costs for high-throughput workloads
>> - Rate limits block production deployments
>> - Latency makes real-time processing impractical
>>
>> While LLM providers don't offer native batch APIs, application-level
>> batching
>> through prompt engineering is a proven pattern used in production by many
>> organizations. This proposal brings that capability natively into Flink.
>>
>> The hybrid configuration approach provides:
>> - **Sensible defaults** for common use cases (sentiment analysis,
>> classification)
>> - **Flexibility** to customize prompts and parsing for specific needs
>> - **Easy migration** for existing queries (batch.size=1 default)
>>
>> ## Next Steps
>>
>> If there's interest from the community, I'm happy to:
>> 1. Prepare a detailed design document with prompt templates and parsing
>> examples
>> 2. Create a JIRA ticket
>> 3. Develop a prototype demonstrating the batching and parsing logic
>> 4. Write a FLIP if required
>>
>> Looking forward to your feedback and guidance on how best to proceed!--
>> Thanks And Regards
>> Rahul
>>
>
>
> --
> Thanks And Regards
> Rahul
>


-- 
Thanks And Regards
Rahul

Reply via email to