Hi Rahul, Thanks for the detailed explanation! I agree that option 1 is more concrete. For this option, I share same concern with Shengkai. For example, if there are 2 Kafka sources and we union them as input of the Batch_Inference operator. When restoring from a checkpooint, flink cannot guarantee the Batch_Inference operator receives the input date from the 2 sources in the same order as the last run. As a result, the stored `batch_id` in the state can lead to wrong result. Another case came to my mind is that the `batch_id` is generated and maintained by the LLM services like OpenAI or Gemini. The `batch_id` could be invalid, for example, Gemini provides [1] to delete a batch. Due to the correctness and uncontrollability, perhaps we may need to rethink the usage of the state.
[1] https://ai.google.dev/gemini-api/docs/batch-api#delete-batch-job Best, Biao Geng > 2025年11月9日 13:07,Rahul Bhattacharya <[email protected]> 写道: > > Hi , > i have created a draft FLIP for it > https://docs.google.com/document/d/1U-eSuKwi5vIgAPt6ZBvb-RcbcRJiRy0e/ > > Please let me know your thoughts > > regards > Rahul > > On Sat, Nov 8, 2025 at 5:54 PM Rahul Bhattacharya <[email protected]> > wrote: > >> Hi, >> I actually thought of reworking my previous response. I want the table api >> to create jsonl files and call openai/claude batch apis. >> The implementation I am doing is going to batch the records into a file >> and call the api with the file and then continuously poll the repose to see >> the status of the batch and then use that to write the response records. >> The ML_Predict in its current form is not usable as people are not looking >> for synchronous response which is twice as expensive as the asynchronous >> response. >> let me know you thoughts and i can create a FLIP for it >> regards >> >> On Sat, Nov 8, 2025 at 3:14 PM Rahul Bhattacharya <[email protected]> >> wrote: >> >>> Hi Flink Community, >>> >>> I'm interested in contributing an enhancement to Apache Flink's >>> ML_PREDICT >>> functionality for LLM interactions. I'd like to gauge community interest >>> and get >>> early feedback before proceeding with detailed design or a FLIP. >>> >>> ## Problem Statement >>> >>> Currently, when using Flink SQL's ML_PREDICT with LLM endpoints, each >>> record >>> triggers an individual API call. For a stream processing 1000 >>> records/second, >>> this results in: >>> >>> - **1000 separate API calls per second** >>> - **High latency**: Each call has network overhead + API processing time >>> - **High cost**: Most LLM providers charge per token, and lack of >>> batching means >>> no cost optimization >>> - **Rate limiting issues**: Hitting provider rate limits quickly >>> - **Poor throughput**: API calls are serialized per record >>> >>> ### Current Behavior (Inefficient) >>> ```sql >>> -- This makes 10 individual API calls >>> SELECT id, ML_PREDICT('llm_model', text) as result >>> FROM (VALUES >>> (1, 'text1'), (2, 'text2'), ..., (10, 'text10') >>> ) AS t(id, text); >>> ``` >>> **Result**: 10 separate HTTP requests, 10x latency, 10x overhead >>> >>> ## Proposed Solution: Application-Level Batching with Prompt Engineering >>> >>> Since most LLM APIs (OpenAI, Anthropic Claude, etc.) don't provide native >>> batch >>> endpoints, we propose implementing batching at the application level by: >>> >>> 1. **Accumulating N records** into a single batch >>> 2. **Injecting records into a structured prompt** that instructs the LLM >>> to >>> process multiple items >>> 3. **Parsing structured responses** to extract results for each record >>> 4. **Emitting individual results** back to the Flink pipeline >>> >>> ### How It Works >>> >>> **Step 1: Batch Accumulation** >>> Collect up to `batch.size` records or wait up to `batch.timeout.ms` >>> >>> **Step 2: Prompt Construction** >>> >>> System: You are a sentiment analyzer. Process each item and respond with >>> JSON. >>> >>> User: Analyze the sentiment of these texts. Return a JSON array with one >>> object per input containing "index" and "sentiment" fields. >>> >>> Input 1: "This product is amazing!" Input 2: "Terrible experience, very >>> disappointed" Input 3: "It's okay, nothing special" ... Input 10: "Best >>> purchase ever!" >>> >>> Respond with: [{"index": 1, "sentiment": "..."}, {"index": 2, >>> "sentiment": "..."}, ...] >>> >>> **Step 3: Response Parsing** >>> ```json >>> [ >>> {"index": 1, "sentiment": "positive"}, >>> {"index": 2, "sentiment": "negative"}, >>> {"index": 3, "sentiment": "neutral"}, >>> ... >>> {"index": 10, "sentiment": "positive"} >>> ] >>> ``` >>> >>> **Step 4: Result Distribution** >>> Parse JSON and emit individual results back to corresponding records >>> >>> ### Model Configuration (Defaults) >>> ```sql >>> CREATE MODEL llm_sentiment WITH ( >>> 'provider' = 'openai', >>> 'model' = 'gpt-4', >>> 'api_key' = '${API_KEY}', >>> 'batch.size' = '20', >>> 'batch.timeout.ms' = '1000', >>> 'system.prompt' = 'You are a sentiment analyzer. Always respond with >>> valid JSON.', >>> 'batch.prompt.template' = 'Analyze sentiment for these texts. Return >>> JSON array: [{"index": <n>, "sentiment": "<positive|negative|neutral>"}]', >>> 'response.format' = 'json', >>> 'response.path' = '$[*]', -- JSONPath to extract array of results >>> 'response.index.field' = 'index', -- Field containing record index >>> 'response.value.field' = 'sentiment' -- Field containing result >>> ); >>> ``` >>> >>> ### Query Usage (Use Defaults) >>> ```sql >>> -- Uses batch_size=20 from model definition >>> SELECT id, text, ML_PREDICT('llm_sentiment', text) as sentiment >>> FROM customer_reviews; >>> ``` >>> >>> ### Query Usage (Override for Custom Analysis) >>> ```sql >>> -- Override prompt and batch size for different use case >>> SELECT id, text, ML_PREDICT('llm_sentiment', text, >>> MAP['batch.size', '50', >>> 'batch.prompt.template', 'Extract key entities. Return JSON: >>> [{"index": <n>, "entities": [...]}]', >>> 'response.value.field', 'entities']) as entities >>> FROM documents; >>> ``` >>> >>> ## Performance and Cost Impact >>> >>> ### Example: Processing 10,000 customer reviews >>> >>> **Current (unbatched)**: >>> - 10,000 API calls >>> - ~10,000 x 200ms latency = 2,000 seconds total processing time >>> (serialized) >>> - ~10,000 x $0.002 = $20 in API costs >>> - High rate limit pressure >>> >>> **With batching (batch_size=20)**: >>> - 500 API calls (10,000 / 20) >>> - ~500 x 300ms latency = 150 seconds total processing time >>> - ~500 x $0.006 = $3 in API costs (slightly higher per call due to larger >>> prompts, >>> but still 85% cheaper overall) >>> - **20x fewer API calls** >>> - **13x faster processing** >>> - **85% cost reduction** >>> >>> ## Proposed Implementation >>> >>> ### Configuration Parameters >>> >>> **Model-level (defaults)**: >>> - `batch.size`: Maximum records per batch (default: 1 for backward >>> compatibility) >>> - `batch.timeout.ms`: Max time to wait before flushing incomplete batch >>> (default: 1000ms) >>> - `system.prompt`: System-level instruction for the LLM >>> - `batch.prompt.template`: Template explaining how to process batched >>> inputs >>> - `response.format`: Expected response format ('json', 'xml', 'delimited') >>> - `response.path`: JSONPath or XPath to extract results array >>> - `response.index.field`: Field name containing the record index >>> - `response.value.field`: Field name containing the actual result >>> - `max.retries`: Retry attempts for failed batches (default: 3) >>> - `request.timeout.ms`: Timeout for API calls (default: 30000ms) >>> >>> **Query-level (overrides)**: >>> - Any of the above can be overridden via MAP parameter in ML_PREDICT >>> - Per-query customization for different analysis tasks >>> >>> ### Key Features >>> 1. **Prompt injection**: Automatically construct batch prompts with >>> indexed inputs >>> 2. **Structured response parsing**: Support JSON, XML, or delimited >>> formats >>> 3. **Index tracking**: Maintain record-to-result mapping through the batch >>> 4. **Error handling**: Handle parsing failures, missing indices, >>> malformed responses >>> 5. **Fallback to individual calls**: If batch fails, optionally retry >>> records individually >>> 6. **Provider-agnostic**: Works with any LLM API (OpenAI, Anthropic, >>> Azure, self-hosted) >>> 7. **Async processing**: Non-blocking batch requests >>> 8. **Back-pressure**: Proper flow control when API is slow >>> 9. **Backward compatible**: batch.size=1 maintains current behavior >>> >>> ### Technical Approach >>> - Extend existing ML_PREDICT infrastructure >>> - Add batching buffer in the ML_PREDICT operator >>> - Implement prompt template engine for batch construction: >>> - Inject record index + content into template >>> - Support various templating formats (JSON, XML, plain text) >>> - Implement response parser: >>> - Extract structured data (JSONPath, XPath, regex) >>> - Map results back to original records by index >>> - Handle missing or malformed responses >>> - Maintain record ordering and error attribution >>> - Support parameter override mechanism in ML_PREDICT function signature >>> >>> ### Response Parsing Strategy >>> >>> The implementation must handle: >>> 1. **Successful batch response**: Parse and distribute results >>> 2. **Partial failure**: Some records missing from response → emit errors >>> for those >>> 3. **Complete parse failure**: Optionally fallback to individual calls >>> 4. **Index mismatch**: Response indices don't match input → log warning >>> and best-effort match >>> 5. **Malformed JSON**: Retry with error handling >>> >>> Example error handling: >>> ```sql >>> -- Records that fail parsing get null results with error metadata >>> SELECT >>> id, >>> text, >>> result.value as sentiment, >>> result.error as error_msg >>> FROM source_table, >>> LATERAL TABLE(ML_PREDICT('llm_sentiment', text)); >>> ``` >>> >>> ## Limitations and Considerations >>> >>> 1. **LLM instruction following**: Depends on model's ability to follow >>> structured >>> output instructions. GPT-4 and Claude are reliable; older models may >>> struggle. >>> >>> 2. **Prompt size limits**: Batching too many records may exceed context >>> windows >>> - GPT-4: ~8K tokens input limit >>> - Claude: ~200K tokens but practical batches smaller >>> - Need configurable max batch size based on average record length >>> >>> 3. **Token cost trade-off**: Larger batches mean: >>> - Fewer API calls (good) >>> - But larger prompts with instructions/formatting (slight overhead) >>> - Net savings still 80-90% in practice >>> >>> 4. **Parsing reliability**: Small risk of malformed responses >>> - Mitigated by: clear instructions, JSON mode (GPT-4), retry logic >>> - Fallback to individual calls if batch parsing fails repeatedly >>> >>> 5. **Latency characteristics**: >>> - Individual records see slightly higher latency (waiting for batch) >>> - Overall throughput dramatically improved >>> - Use `batch.timeout.ms` to balance latency vs throughput >>> >>> ## Future Extensions >>> >>> This batching architecture would support: >>> 1. **Stateful chat sessions**: Batch multiple turns of a conversation >>> with >>> maintained history per session key >>> 2. **Embedding generation**: Some providers (OpenAI) do have batch >>> embedding APIs >>> 3. **Multi-modal batching**: Batch image + text processing with >>> structured outputs >>> >>> ## Questions for the Community >>> >>> 1. **Architecture**: Should this extend ML_PREDICT or be a new function? >>> (I propose extending ML_PREDICT for backward compatibility) >>> >>> 2. **FLIP Required?**: Does this enhancement warrant a FLIP? >>> >>> 3. **Existing Work**: Is anyone working on batching for ML_PREDICT or >>> similar >>> functionality? >>> >>> 4. **Prompt Template Engine**: Should we: >>> - Build a custom template engine? >>> - Use existing library (e.g., StringTemplate, Mustache)? >>> - Keep it simple with String.format initially? >>> >>> 5. **Response Parsing**: Preferred approach: >>> - JSONPath library (flexible but adds dependency) >>> - Simple JSON parsing with field names >>> - Pluggable parser interface for extensibility? >>> >>> 6. **Error Handling**: If parsing fails for entire batch: >>> - Fail all records in batch? >>> - Retry batch once more? >>> - Fallback to individual calls (with circuit breaker)? >>> - Make strategy configurable? >>> >>> 7. **Batch Assembly**: Should batching happen: >>> - Per parallel instance (each task maintains its own batch)? >>> - Globally coordinated (shuffle to batch coordinator)? >>> - I propose per-instance for simplicity and lower latency >>> >>> 8. **Compatibility**: Default batch.size=1 to maintain current behavior, >>> users >>> opt-in to batching? >>> >>> ## Why This Matters >>> >>> LLM inference is becoming a critical part of real-time data pipelines. >>> Without >>> batching: >>> - Users face prohibitive costs for high-throughput workloads >>> - Rate limits block production deployments >>> - Latency makes real-time processing impractical >>> >>> While LLM providers don't offer native batch APIs, application-level >>> batching >>> through prompt engineering is a proven pattern used in production by many >>> organizations. This proposal brings that capability natively into Flink. >>> >>> The hybrid configuration approach provides: >>> - **Sensible defaults** for common use cases (sentiment analysis, >>> classification) >>> - **Flexibility** to customize prompts and parsing for specific needs >>> - **Easy migration** for existing queries (batch.size=1 default) >>> >>> ## Next Steps >>> >>> If there's interest from the community, I'm happy to: >>> 1. Prepare a detailed design document with prompt templates and parsing >>> examples >>> 2. Create a JIRA ticket >>> 3. Develop a prototype demonstrating the batching and parsing logic >>> 4. Write a FLIP if required >>> >>> Looking forward to your feedback and guidance on how best to proceed!-- >>> Thanks And Regards >>> Rahul >>> >> >> >> -- >> Thanks And Regards >> Rahul >> > > > -- > Thanks And Regards > Rahul
