Hi Rahul,

Thanks for the detailed explanation! I agree that option 1 is more concrete. 
For this option, I share same concern with Shengkai. For example, if there are 
2 Kafka sources and we union them as input of the Batch_Inference operator. 
When restoring from a checkpooint, flink cannot guarantee the Batch_Inference 
operator receives the input date from the 2 sources in the same order as the 
last run. As a result, the stored `batch_id` in the state can lead to wrong 
result. 
Another case came to my mind is that the `batch_id` is generated and maintained 
by the LLM services like OpenAI or Gemini. The `batch_id` could be invalid, for 
example, Gemini provides [1] to delete a batch.
Due to the correctness and uncontrollability, perhaps we may need to rethink 
the usage of the state.

[1] https://ai.google.dev/gemini-api/docs/batch-api#delete-batch-job

Best,
Biao Geng


> 2025年11月9日 13:07,Rahul Bhattacharya <[email protected]> 写道:
> 
> Hi ,
> i have created a draft FLIP for it
> https://docs.google.com/document/d/1U-eSuKwi5vIgAPt6ZBvb-RcbcRJiRy0e/
> 
> Please let me know your thoughts
> 
> regards
> Rahul
> 
> On Sat, Nov 8, 2025 at 5:54 PM Rahul Bhattacharya <[email protected]>
> wrote:
> 
>> Hi,
>> I actually thought of reworking my previous response. I want the table api
>> to create jsonl files and call openai/claude batch apis.
>> The implementation I am doing is going to batch the records into a file
>> and call the api with the file and then continuously poll the repose to see
>> the status of the batch and then use that to write the response records.
>> The ML_Predict in its current form is not usable as people are not looking
>> for synchronous response which is twice as expensive as the asynchronous
>> response.
>> let me know you thoughts and i can create a FLIP for it
>> regards
>> 
>> On Sat, Nov 8, 2025 at 3:14 PM Rahul Bhattacharya <[email protected]>
>> wrote:
>> 
>>> Hi Flink Community,
>>> 
>>> I'm interested in contributing an enhancement to Apache Flink's
>>> ML_PREDICT
>>> functionality for LLM interactions. I'd like to gauge community interest
>>> and get
>>> early feedback before proceeding with detailed design or a FLIP.
>>> 
>>> ## Problem Statement
>>> 
>>> Currently, when using Flink SQL's ML_PREDICT with LLM endpoints, each
>>> record
>>> triggers an individual API call. For a stream processing 1000
>>> records/second,
>>> this results in:
>>> 
>>> - **1000 separate API calls per second**
>>> - **High latency**: Each call has network overhead + API processing time
>>> - **High cost**: Most LLM providers charge per token, and lack of
>>> batching means
>>>  no cost optimization
>>> - **Rate limiting issues**: Hitting provider rate limits quickly
>>> - **Poor throughput**: API calls are serialized per record
>>> 
>>> ### Current Behavior (Inefficient)
>>> ```sql
>>> -- This makes 10 individual API calls
>>> SELECT id, ML_PREDICT('llm_model', text) as result
>>> FROM (VALUES
>>>    (1, 'text1'), (2, 'text2'), ..., (10, 'text10')
>>> ) AS t(id, text);
>>> ```
>>> **Result**: 10 separate HTTP requests, 10x latency, 10x overhead
>>> 
>>> ## Proposed Solution: Application-Level Batching with Prompt Engineering
>>> 
>>> Since most LLM APIs (OpenAI, Anthropic Claude, etc.) don't provide native
>>> batch
>>> endpoints, we propose implementing batching at the application level by:
>>> 
>>> 1. **Accumulating N records** into a single batch
>>> 2. **Injecting records into a structured prompt** that instructs the LLM
>>> to
>>>   process multiple items
>>> 3. **Parsing structured responses** to extract results for each record
>>> 4. **Emitting individual results** back to the Flink pipeline
>>> 
>>> ### How It Works
>>> 
>>> **Step 1: Batch Accumulation**
>>> Collect up to `batch.size` records or wait up to `batch.timeout.ms`
>>> 
>>> **Step 2: Prompt Construction**
>>> 
>>> System: You are a sentiment analyzer. Process each item and respond with
>>> JSON.
>>> 
>>> User: Analyze the sentiment of these texts. Return a JSON array with one
>>> object per input containing "index" and "sentiment" fields.
>>> 
>>> Input 1: "This product is amazing!" Input 2: "Terrible experience, very
>>> disappointed" Input 3: "It's okay, nothing special" ... Input 10: "Best
>>> purchase ever!"
>>> 
>>> Respond with: [{"index": 1, "sentiment": "..."}, {"index": 2,
>>> "sentiment": "..."}, ...]
>>> 
>>> **Step 3: Response Parsing**
>>> ```json
>>> [
>>>  {"index": 1, "sentiment": "positive"},
>>>  {"index": 2, "sentiment": "negative"},
>>>  {"index": 3, "sentiment": "neutral"},
>>>  ...
>>>  {"index": 10, "sentiment": "positive"}
>>> ]
>>> ```
>>> 
>>> **Step 4: Result Distribution**
>>> Parse JSON and emit individual results back to corresponding records
>>> 
>>> ### Model Configuration (Defaults)
>>> ```sql
>>> CREATE MODEL llm_sentiment WITH (
>>>    'provider' = 'openai',
>>>    'model' = 'gpt-4',
>>>    'api_key' = '${API_KEY}',
>>>    'batch.size' = '20',
>>>    'batch.timeout.ms' = '1000',
>>>    'system.prompt' = 'You are a sentiment analyzer. Always respond with
>>> valid JSON.',
>>>    'batch.prompt.template' = 'Analyze sentiment for these texts. Return
>>> JSON array: [{"index": <n>, "sentiment": "<positive|negative|neutral>"}]',
>>>    'response.format' = 'json',
>>>    'response.path' = '$[*]',  -- JSONPath to extract array of results
>>>    'response.index.field' = 'index',  -- Field containing record index
>>>    'response.value.field' = 'sentiment'  -- Field containing result
>>> );
>>> ```
>>> 
>>> ### Query Usage (Use Defaults)
>>> ```sql
>>> -- Uses batch_size=20 from model definition
>>> SELECT id, text, ML_PREDICT('llm_sentiment', text) as sentiment
>>> FROM customer_reviews;
>>> ```
>>> 
>>> ### Query Usage (Override for Custom Analysis)
>>> ```sql
>>> -- Override prompt and batch size for different use case
>>> SELECT id, text, ML_PREDICT('llm_sentiment', text,
>>>    MAP['batch.size', '50',
>>>        'batch.prompt.template', 'Extract key entities. Return JSON:
>>> [{"index": <n>, "entities": [...]}]',
>>>        'response.value.field', 'entities']) as entities
>>> FROM documents;
>>> ```
>>> 
>>> ## Performance and Cost Impact
>>> 
>>> ### Example: Processing 10,000 customer reviews
>>> 
>>> **Current (unbatched)**:
>>> - 10,000 API calls
>>> - ~10,000 x 200ms latency = 2,000 seconds total processing time
>>> (serialized)
>>> - ~10,000 x $0.002 = $20 in API costs
>>> - High rate limit pressure
>>> 
>>> **With batching (batch_size=20)**:
>>> - 500 API calls (10,000 / 20)
>>> - ~500 x 300ms latency = 150 seconds total processing time
>>> - ~500 x $0.006 = $3 in API costs (slightly higher per call due to larger
>>> prompts,
>>>  but still 85% cheaper overall)
>>> - **20x fewer API calls**
>>> - **13x faster processing**
>>> - **85% cost reduction**
>>> 
>>> ## Proposed Implementation
>>> 
>>> ### Configuration Parameters
>>> 
>>> **Model-level (defaults)**:
>>> - `batch.size`: Maximum records per batch (default: 1 for backward
>>> compatibility)
>>> - `batch.timeout.ms`: Max time to wait before flushing incomplete batch
>>> (default: 1000ms)
>>> - `system.prompt`: System-level instruction for the LLM
>>> - `batch.prompt.template`: Template explaining how to process batched
>>> inputs
>>> - `response.format`: Expected response format ('json', 'xml', 'delimited')
>>> - `response.path`: JSONPath or XPath to extract results array
>>> - `response.index.field`: Field name containing the record index
>>> - `response.value.field`: Field name containing the actual result
>>> - `max.retries`: Retry attempts for failed batches (default: 3)
>>> - `request.timeout.ms`: Timeout for API calls (default: 30000ms)
>>> 
>>> **Query-level (overrides)**:
>>> - Any of the above can be overridden via MAP parameter in ML_PREDICT
>>> - Per-query customization for different analysis tasks
>>> 
>>> ### Key Features
>>> 1. **Prompt injection**: Automatically construct batch prompts with
>>> indexed inputs
>>> 2. **Structured response parsing**: Support JSON, XML, or delimited
>>> formats
>>> 3. **Index tracking**: Maintain record-to-result mapping through the batch
>>> 4. **Error handling**: Handle parsing failures, missing indices,
>>> malformed responses
>>> 5. **Fallback to individual calls**: If batch fails, optionally retry
>>> records individually
>>> 6. **Provider-agnostic**: Works with any LLM API (OpenAI, Anthropic,
>>> Azure, self-hosted)
>>> 7. **Async processing**: Non-blocking batch requests
>>> 8. **Back-pressure**: Proper flow control when API is slow
>>> 9. **Backward compatible**: batch.size=1 maintains current behavior
>>> 
>>> ### Technical Approach
>>> - Extend existing ML_PREDICT infrastructure
>>> - Add batching buffer in the ML_PREDICT operator
>>> - Implement prompt template engine for batch construction:
>>>  - Inject record index + content into template
>>>  - Support various templating formats (JSON, XML, plain text)
>>> - Implement response parser:
>>>  - Extract structured data (JSONPath, XPath, regex)
>>>  - Map results back to original records by index
>>>  - Handle missing or malformed responses
>>> - Maintain record ordering and error attribution
>>> - Support parameter override mechanism in ML_PREDICT function signature
>>> 
>>> ### Response Parsing Strategy
>>> 
>>> The implementation must handle:
>>> 1. **Successful batch response**: Parse and distribute results
>>> 2. **Partial failure**: Some records missing from response → emit errors
>>> for those
>>> 3. **Complete parse failure**: Optionally fallback to individual calls
>>> 4. **Index mismatch**: Response indices don't match input → log warning
>>> and best-effort match
>>> 5. **Malformed JSON**: Retry with error handling
>>> 
>>> Example error handling:
>>> ```sql
>>> -- Records that fail parsing get null results with error metadata
>>> SELECT
>>>    id,
>>>    text,
>>>    result.value as sentiment,
>>>    result.error as error_msg
>>> FROM source_table,
>>> LATERAL TABLE(ML_PREDICT('llm_sentiment', text));
>>> ```
>>> 
>>> ## Limitations and Considerations
>>> 
>>> 1. **LLM instruction following**: Depends on model's ability to follow
>>> structured
>>>   output instructions. GPT-4 and Claude are reliable; older models may
>>> struggle.
>>> 
>>> 2. **Prompt size limits**: Batching too many records may exceed context
>>> windows
>>>   - GPT-4: ~8K tokens input limit
>>>   - Claude: ~200K tokens but practical batches smaller
>>>   - Need configurable max batch size based on average record length
>>> 
>>> 3. **Token cost trade-off**: Larger batches mean:
>>>   - Fewer API calls (good)
>>>   - But larger prompts with instructions/formatting (slight overhead)
>>>   - Net savings still 80-90% in practice
>>> 
>>> 4. **Parsing reliability**: Small risk of malformed responses
>>>   - Mitigated by: clear instructions, JSON mode (GPT-4), retry logic
>>>   - Fallback to individual calls if batch parsing fails repeatedly
>>> 
>>> 5. **Latency characteristics**:
>>>   - Individual records see slightly higher latency (waiting for batch)
>>>   - Overall throughput dramatically improved
>>>   - Use `batch.timeout.ms` to balance latency vs throughput
>>> 
>>> ## Future Extensions
>>> 
>>> This batching architecture would support:
>>> 1. **Stateful chat sessions**: Batch multiple turns of a conversation
>>> with
>>>   maintained history per session key
>>> 2. **Embedding generation**: Some providers (OpenAI) do have batch
>>> embedding APIs
>>> 3. **Multi-modal batching**: Batch image + text processing with
>>> structured outputs
>>> 
>>> ## Questions for the Community
>>> 
>>> 1. **Architecture**: Should this extend ML_PREDICT or be a new function?
>>>   (I propose extending ML_PREDICT for backward compatibility)
>>> 
>>> 2. **FLIP Required?**: Does this enhancement warrant a FLIP?
>>> 
>>> 3. **Existing Work**: Is anyone working on batching for ML_PREDICT or
>>> similar
>>>   functionality?
>>> 
>>> 4. **Prompt Template Engine**: Should we:
>>>   - Build a custom template engine?
>>>   - Use existing library (e.g., StringTemplate, Mustache)?
>>>   - Keep it simple with String.format initially?
>>> 
>>> 5. **Response Parsing**: Preferred approach:
>>>   - JSONPath library (flexible but adds dependency)
>>>   - Simple JSON parsing with field names
>>>   - Pluggable parser interface for extensibility?
>>> 
>>> 6. **Error Handling**: If parsing fails for entire batch:
>>>   - Fail all records in batch?
>>>   - Retry batch once more?
>>>   - Fallback to individual calls (with circuit breaker)?
>>>   - Make strategy configurable?
>>> 
>>> 7. **Batch Assembly**: Should batching happen:
>>>   - Per parallel instance (each task maintains its own batch)?
>>>   - Globally coordinated (shuffle to batch coordinator)?
>>>   - I propose per-instance for simplicity and lower latency
>>> 
>>> 8. **Compatibility**: Default batch.size=1 to maintain current behavior,
>>> users
>>>   opt-in to batching?
>>> 
>>> ## Why This Matters
>>> 
>>> LLM inference is becoming a critical part of real-time data pipelines.
>>> Without
>>> batching:
>>> - Users face prohibitive costs for high-throughput workloads
>>> - Rate limits block production deployments
>>> - Latency makes real-time processing impractical
>>> 
>>> While LLM providers don't offer native batch APIs, application-level
>>> batching
>>> through prompt engineering is a proven pattern used in production by many
>>> organizations. This proposal brings that capability natively into Flink.
>>> 
>>> The hybrid configuration approach provides:
>>> - **Sensible defaults** for common use cases (sentiment analysis,
>>> classification)
>>> - **Flexibility** to customize prompts and parsing for specific needs
>>> - **Easy migration** for existing queries (batch.size=1 default)
>>> 
>>> ## Next Steps
>>> 
>>> If there's interest from the community, I'm happy to:
>>> 1. Prepare a detailed design document with prompt templates and parsing
>>> examples
>>> 2. Create a JIRA ticket
>>> 3. Develop a prototype demonstrating the batching and parsing logic
>>> 4. Write a FLIP if required
>>> 
>>> Looking forward to your feedback and guidance on how best to proceed!--
>>> Thanks And Regards
>>> Rahul
>>> 
>> 
>> 
>> --
>> Thanks And Regards
>> Rahul
>> 
> 
> 
> -- 
> Thanks And Regards
> Rahul

Reply via email to