Hi Rahul, Thanks for the proposal. From some offline discussion, the endpoint you have in mind to support is OpenAI batch API [1]. The doc states that "Each batch completes within 24 hours (and often more quickly)". With this context, I have some questions:
1. For design option 1, does the operator always wait for batch response until processing next batch? This can take 24 hours which isn't feasible for streaming job I think. 2. For design option 2, why it loses exact-once and have higher latency compared to 1? 3. Also for the public interface section, are the parameters in `ml_predict` config or in options when `create model`? Thanks, Hao [1] https://platform.openai.com/docs/guides/batch/batch-api On Mon, Nov 10, 2025 at 10:19 AM Asimansu Bera <[email protected]> wrote: > +1 > > This proposal is needed for optimizing network calls and processing asyc. > > Thanks > Asimansu > > On Mon, Nov 10, 2025 at 4:04 AM Shengkai Fang <[email protected]> wrote: > > > Hi, Rahul. > > > > +1 for this proposal. However, due to my current workload, I'll need > until > > the end of this week to review it thoroughly. > > > > Best, > > Shengkai > > > > Rahul Bhattacharya <[email protected]> 于2025年11月9日周日 13:08写道: > > > > > Hi , > > > i have created a draft FLIP for it > > > https://docs.google.com/document/d/1U-eSuKwi5vIgAPt6ZBvb-RcbcRJiRy0e/ > > > > > > Please let me know your thoughts > > > > > > regards > > > Rahul > > > > > > On Sat, Nov 8, 2025 at 5:54 PM Rahul Bhattacharya <[email protected] > > > > > wrote: > > > > > > > Hi, > > > > I actually thought of reworking my previous response. I want the > table > > > api > > > > to create jsonl files and call openai/claude batch apis. > > > > The implementation I am doing is going to batch the records into a > file > > > > and call the api with the file and then continuously poll the repose > to > > > see > > > > the status of the batch and then use that to write the response > > records. > > > > The ML_Predict in its current form is not usable as people are not > > > looking > > > > for synchronous response which is twice as expensive as the > > asynchronous > > > > response. > > > > let me know you thoughts and i can create a FLIP for it > > > > regards > > > > > > > > On Sat, Nov 8, 2025 at 3:14 PM Rahul Bhattacharya < > [email protected] > > > > > > > wrote: > > > > > > > >> Hi Flink Community, > > > >> > > > >> I'm interested in contributing an enhancement to Apache Flink's > > > >> ML_PREDICT > > > >> functionality for LLM interactions. I'd like to gauge community > > interest > > > >> and get > > > >> early feedback before proceeding with detailed design or a FLIP. > > > >> > > > >> ## Problem Statement > > > >> > > > >> Currently, when using Flink SQL's ML_PREDICT with LLM endpoints, > each > > > >> record > > > >> triggers an individual API call. For a stream processing 1000 > > > >> records/second, > > > >> this results in: > > > >> > > > >> - **1000 separate API calls per second** > > > >> - **High latency**: Each call has network overhead + API processing > > time > > > >> - **High cost**: Most LLM providers charge per token, and lack of > > > >> batching means > > > >> no cost optimization > > > >> - **Rate limiting issues**: Hitting provider rate limits quickly > > > >> - **Poor throughput**: API calls are serialized per record > > > >> > > > >> ### Current Behavior (Inefficient) > > > >> ```sql > > > >> -- This makes 10 individual API calls > > > >> SELECT id, ML_PREDICT('llm_model', text) as result > > > >> FROM (VALUES > > > >> (1, 'text1'), (2, 'text2'), ..., (10, 'text10') > > > >> ) AS t(id, text); > > > >> ``` > > > >> **Result**: 10 separate HTTP requests, 10x latency, 10x overhead > > > >> > > > >> ## Proposed Solution: Application-Level Batching with Prompt > > Engineering > > > >> > > > >> Since most LLM APIs (OpenAI, Anthropic Claude, etc.) don't provide > > > native > > > >> batch > > > >> endpoints, we propose implementing batching at the application level > > by: > > > >> > > > >> 1. **Accumulating N records** into a single batch > > > >> 2. **Injecting records into a structured prompt** that instructs the > > LLM > > > >> to > > > >> process multiple items > > > >> 3. **Parsing structured responses** to extract results for each > record > > > >> 4. **Emitting individual results** back to the Flink pipeline > > > >> > > > >> ### How It Works > > > >> > > > >> **Step 1: Batch Accumulation** > > > >> Collect up to `batch.size` records or wait up to `batch.timeout.ms` > > > >> > > > >> **Step 2: Prompt Construction** > > > >> > > > >> System: You are a sentiment analyzer. Process each item and respond > > with > > > >> JSON. > > > >> > > > >> User: Analyze the sentiment of these texts. Return a JSON array with > > one > > > >> object per input containing "index" and "sentiment" fields. > > > >> > > > >> Input 1: "This product is amazing!" Input 2: "Terrible experience, > > very > > > >> disappointed" Input 3: "It's okay, nothing special" ... Input 10: > > "Best > > > >> purchase ever!" > > > >> > > > >> Respond with: [{"index": 1, "sentiment": "..."}, {"index": 2, > > > >> "sentiment": "..."}, ...] > > > >> > > > >> **Step 3: Response Parsing** > > > >> ```json > > > >> [ > > > >> {"index": 1, "sentiment": "positive"}, > > > >> {"index": 2, "sentiment": "negative"}, > > > >> {"index": 3, "sentiment": "neutral"}, > > > >> ... > > > >> {"index": 10, "sentiment": "positive"} > > > >> ] > > > >> ``` > > > >> > > > >> **Step 4: Result Distribution** > > > >> Parse JSON and emit individual results back to corresponding records > > > >> > > > >> ### Model Configuration (Defaults) > > > >> ```sql > > > >> CREATE MODEL llm_sentiment WITH ( > > > >> 'provider' = 'openai', > > > >> 'model' = 'gpt-4', > > > >> 'api_key' = '${API_KEY}', > > > >> 'batch.size' = '20', > > > >> 'batch.timeout.ms' = '1000', > > > >> 'system.prompt' = 'You are a sentiment analyzer. Always respond > > with > > > >> valid JSON.', > > > >> 'batch.prompt.template' = 'Analyze sentiment for these texts. > > Return > > > >> JSON array: [{"index": <n>, "sentiment": > > > "<positive|negative|neutral>"}]', > > > >> 'response.format' = 'json', > > > >> 'response.path' = '$[*]', -- JSONPath to extract array of > results > > > >> 'response.index.field' = 'index', -- Field containing record > > index > > > >> 'response.value.field' = 'sentiment' -- Field containing result > > > >> ); > > > >> ``` > > > >> > > > >> ### Query Usage (Use Defaults) > > > >> ```sql > > > >> -- Uses batch_size=20 from model definition > > > >> SELECT id, text, ML_PREDICT('llm_sentiment', text) as sentiment > > > >> FROM customer_reviews; > > > >> ``` > > > >> > > > >> ### Query Usage (Override for Custom Analysis) > > > >> ```sql > > > >> -- Override prompt and batch size for different use case > > > >> SELECT id, text, ML_PREDICT('llm_sentiment', text, > > > >> MAP['batch.size', '50', > > > >> 'batch.prompt.template', 'Extract key entities. Return JSON: > > > >> [{"index": <n>, "entities": [...]}]', > > > >> 'response.value.field', 'entities']) as entities > > > >> FROM documents; > > > >> ``` > > > >> > > > >> ## Performance and Cost Impact > > > >> > > > >> ### Example: Processing 10,000 customer reviews > > > >> > > > >> **Current (unbatched)**: > > > >> - 10,000 API calls > > > >> - ~10,000 x 200ms latency = 2,000 seconds total processing time > > > >> (serialized) > > > >> - ~10,000 x $0.002 = $20 in API costs > > > >> - High rate limit pressure > > > >> > > > >> **With batching (batch_size=20)**: > > > >> - 500 API calls (10,000 / 20) > > > >> - ~500 x 300ms latency = 150 seconds total processing time > > > >> - ~500 x $0.006 = $3 in API costs (slightly higher per call due to > > > larger > > > >> prompts, > > > >> but still 85% cheaper overall) > > > >> - **20x fewer API calls** > > > >> - **13x faster processing** > > > >> - **85% cost reduction** > > > >> > > > >> ## Proposed Implementation > > > >> > > > >> ### Configuration Parameters > > > >> > > > >> **Model-level (defaults)**: > > > >> - `batch.size`: Maximum records per batch (default: 1 for backward > > > >> compatibility) > > > >> - `batch.timeout.ms`: Max time to wait before flushing incomplete > > batch > > > >> (default: 1000ms) > > > >> - `system.prompt`: System-level instruction for the LLM > > > >> - `batch.prompt.template`: Template explaining how to process > batched > > > >> inputs > > > >> - `response.format`: Expected response format ('json', 'xml', > > > 'delimited') > > > >> - `response.path`: JSONPath or XPath to extract results array > > > >> - `response.index.field`: Field name containing the record index > > > >> - `response.value.field`: Field name containing the actual result > > > >> - `max.retries`: Retry attempts for failed batches (default: 3) > > > >> - `request.timeout.ms`: Timeout for API calls (default: 30000ms) > > > >> > > > >> **Query-level (overrides)**: > > > >> - Any of the above can be overridden via MAP parameter in ML_PREDICT > > > >> - Per-query customization for different analysis tasks > > > >> > > > >> ### Key Features > > > >> 1. **Prompt injection**: Automatically construct batch prompts with > > > >> indexed inputs > > > >> 2. **Structured response parsing**: Support JSON, XML, or delimited > > > >> formats > > > >> 3. **Index tracking**: Maintain record-to-result mapping through the > > > batch > > > >> 4. **Error handling**: Handle parsing failures, missing indices, > > > >> malformed responses > > > >> 5. **Fallback to individual calls**: If batch fails, optionally > retry > > > >> records individually > > > >> 6. **Provider-agnostic**: Works with any LLM API (OpenAI, Anthropic, > > > >> Azure, self-hosted) > > > >> 7. **Async processing**: Non-blocking batch requests > > > >> 8. **Back-pressure**: Proper flow control when API is slow > > > >> 9. **Backward compatible**: batch.size=1 maintains current behavior > > > >> > > > >> ### Technical Approach > > > >> - Extend existing ML_PREDICT infrastructure > > > >> - Add batching buffer in the ML_PREDICT operator > > > >> - Implement prompt template engine for batch construction: > > > >> - Inject record index + content into template > > > >> - Support various templating formats (JSON, XML, plain text) > > > >> - Implement response parser: > > > >> - Extract structured data (JSONPath, XPath, regex) > > > >> - Map results back to original records by index > > > >> - Handle missing or malformed responses > > > >> - Maintain record ordering and error attribution > > > >> - Support parameter override mechanism in ML_PREDICT function > > signature > > > >> > > > >> ### Response Parsing Strategy > > > >> > > > >> The implementation must handle: > > > >> 1. **Successful batch response**: Parse and distribute results > > > >> 2. **Partial failure**: Some records missing from response → emit > > errors > > > >> for those > > > >> 3. **Complete parse failure**: Optionally fallback to individual > calls > > > >> 4. **Index mismatch**: Response indices don't match input → log > > warning > > > >> and best-effort match > > > >> 5. **Malformed JSON**: Retry with error handling > > > >> > > > >> Example error handling: > > > >> ```sql > > > >> -- Records that fail parsing get null results with error metadata > > > >> SELECT > > > >> id, > > > >> text, > > > >> result.value as sentiment, > > > >> result.error as error_msg > > > >> FROM source_table, > > > >> LATERAL TABLE(ML_PREDICT('llm_sentiment', text)); > > > >> ``` > > > >> > > > >> ## Limitations and Considerations > > > >> > > > >> 1. **LLM instruction following**: Depends on model's ability to > follow > > > >> structured > > > >> output instructions. GPT-4 and Claude are reliable; older models > > may > > > >> struggle. > > > >> > > > >> 2. **Prompt size limits**: Batching too many records may exceed > > context > > > >> windows > > > >> - GPT-4: ~8K tokens input limit > > > >> - Claude: ~200K tokens but practical batches smaller > > > >> - Need configurable max batch size based on average record length > > > >> > > > >> 3. **Token cost trade-off**: Larger batches mean: > > > >> - Fewer API calls (good) > > > >> - But larger prompts with instructions/formatting (slight > overhead) > > > >> - Net savings still 80-90% in practice > > > >> > > > >> 4. **Parsing reliability**: Small risk of malformed responses > > > >> - Mitigated by: clear instructions, JSON mode (GPT-4), retry > logic > > > >> - Fallback to individual calls if batch parsing fails repeatedly > > > >> > > > >> 5. **Latency characteristics**: > > > >> - Individual records see slightly higher latency (waiting for > > batch) > > > >> - Overall throughput dramatically improved > > > >> - Use `batch.timeout.ms` to balance latency vs throughput > > > >> > > > >> ## Future Extensions > > > >> > > > >> This batching architecture would support: > > > >> 1. **Stateful chat sessions**: Batch multiple turns of a > conversation > > > >> with > > > >> maintained history per session key > > > >> 2. **Embedding generation**: Some providers (OpenAI) do have batch > > > >> embedding APIs > > > >> 3. **Multi-modal batching**: Batch image + text processing with > > > >> structured outputs > > > >> > > > >> ## Questions for the Community > > > >> > > > >> 1. **Architecture**: Should this extend ML_PREDICT or be a new > > function? > > > >> (I propose extending ML_PREDICT for backward compatibility) > > > >> > > > >> 2. **FLIP Required?**: Does this enhancement warrant a FLIP? > > > >> > > > >> 3. **Existing Work**: Is anyone working on batching for ML_PREDICT > or > > > >> similar > > > >> functionality? > > > >> > > > >> 4. **Prompt Template Engine**: Should we: > > > >> - Build a custom template engine? > > > >> - Use existing library (e.g., StringTemplate, Mustache)? > > > >> - Keep it simple with String.format initially? > > > >> > > > >> 5. **Response Parsing**: Preferred approach: > > > >> - JSONPath library (flexible but adds dependency) > > > >> - Simple JSON parsing with field names > > > >> - Pluggable parser interface for extensibility? > > > >> > > > >> 6. **Error Handling**: If parsing fails for entire batch: > > > >> - Fail all records in batch? > > > >> - Retry batch once more? > > > >> - Fallback to individual calls (with circuit breaker)? > > > >> - Make strategy configurable? > > > >> > > > >> 7. **Batch Assembly**: Should batching happen: > > > >> - Per parallel instance (each task maintains its own batch)? > > > >> - Globally coordinated (shuffle to batch coordinator)? > > > >> - I propose per-instance for simplicity and lower latency > > > >> > > > >> 8. **Compatibility**: Default batch.size=1 to maintain current > > behavior, > > > >> users > > > >> opt-in to batching? > > > >> > > > >> ## Why This Matters > > > >> > > > >> LLM inference is becoming a critical part of real-time data > pipelines. > > > >> Without > > > >> batching: > > > >> - Users face prohibitive costs for high-throughput workloads > > > >> - Rate limits block production deployments > > > >> - Latency makes real-time processing impractical > > > >> > > > >> While LLM providers don't offer native batch APIs, application-level > > > >> batching > > > >> through prompt engineering is a proven pattern used in production by > > > many > > > >> organizations. This proposal brings that capability natively into > > Flink. > > > >> > > > >> The hybrid configuration approach provides: > > > >> - **Sensible defaults** for common use cases (sentiment analysis, > > > >> classification) > > > >> - **Flexibility** to customize prompts and parsing for specific > needs > > > >> - **Easy migration** for existing queries (batch.size=1 default) > > > >> > > > >> ## Next Steps > > > >> > > > >> If there's interest from the community, I'm happy to: > > > >> 1. Prepare a detailed design document with prompt templates and > > parsing > > > >> examples > > > >> 2. Create a JIRA ticket > > > >> 3. Develop a prototype demonstrating the batching and parsing logic > > > >> 4. Write a FLIP if required > > > >> > > > >> Looking forward to your feedback and guidance on how best to > > proceed!-- > > > >> Thanks And Regards > > > >> Rahul > > > >> > > > > > > > > > > > > -- > > > > Thanks And Regards > > > > Rahul > > > > > > > > > > > > > -- > > > Thanks And Regards > > > Rahul > > > > > >
