Hi Shengkai, > process them together, and complete the future objects sequentially as they finish
The batch api Rahul proposed is [1] which sends batch request from file, return an id and we need to poll the id for results. The important part is it can take 24 hours to finish. So Rahua is proposing to just send the result id to downstream. [1] https://platform.openai.com/docs/guides/batch On Fri, Nov 14, 2025 at 8:35 AM Rahul Bhattacharya <[email protected]> wrote: > Hi Shengkai, > so i am understanding we will go with option 1 and send the batchid > downstream to do whatever the user needs to do with the batchids? > > i also in the opinion that option 1 is a better option for now than option > 2. > Based on a parameter setting we should batch n records , create a > jsonl file and post it to the LLM batch api. > The LLM will immediately return the batch id which we can just send it > downstream. this implementation will be stateless and really simple to > implement > > > regards > > > > On Fri, Nov 14, 2025 at 4:18 AM Shengkai Fang <[email protected]> wrote: > > > Hi. Rahul > > > > First, I believe we don't need to modify the framework. Instead, we can > > have the async predict function collect records in batches, process them > > together, and complete the future objects sequentially as they finish. > This > > approach allows us to move forward quickly. > > > > Second, I have concerns about introducing state or external storage. On > one > > hand, the current design is stateless, and transitioning to a stateful > > architecture would require significant refactoring. On the other hand, I > > don't see clear advantages to storing batch IDs in state, since we cannot > > guarantee that elements will arrive in the same order after restoring > from > > a checkpoint. For example, if the ML predictor receives elements e1, e2, > e3 > > in the first run, it might receive e2, e3, e1 after recovery. With a > batch > > size of 2, we wouldn't be able to reuse the in-flight requests > effectively. > > > > Finally, I suggest we leverage the IOManager to handle JSON file > > management. > > > > Best, > > Shengkai > > > > > > Rahul Bhattacharya <[email protected]> 于2025年11月11日周二 09:03写道: > > > > > Hi Hao > > > For option 2 the guarantees like exactly once and other things are not > > > guaranteed as it’s not in the same flink process > > > > > > The flink process finishes after the submission and getting batchids > > which > > > it sends downstream > > > > > > There has to be another process(doesn’t have to be flink) which takes > > > these batchids and polls the OpenAI endpoint for status completed. > > > > > > Once it gets completed it downloads the results and sends downstream > > > > > > This secondary process is on client discretion , for Kafka probably a > > http > > > sink connector or Kafka consumer > > > > > > Thanks And Regards > > > Rahul > > > > > > > > > On Mon, Nov 10, 2025 at 6:45 PM Hao Li <[email protected]> > wrote: > > > > > > > Hi Rahul, > > > > > > > > Thanks for the proposal. From some offline discussion, the endpoint > you > > > > have in mind to support is OpenAI batch API [1]. The doc states that > > > "Each > > > > batch completes within 24 hours (and often more quickly)". With this > > > > context, I have some questions: > > > > > > > > 1. For design option 1, does the operator always wait for batch > > response > > > > until processing next batch? This can take 24 hours which isn't > > feasible > > > > for streaming job I think. > > > > > > > > 2. For design option 2, why it loses exact-once and have higher > latency > > > > compared to 1? > > > > > > > > 3. Also for the public interface section, are the parameters in > > > > `ml_predict` config or in options when `create model`? > > > > > > > > Thanks, > > > > Hao > > > > > > > > > > > > [1] https://platform.openai.com/docs/guides/batch/batch-api > > > > > > > > On Mon, Nov 10, 2025 at 10:19 AM Asimansu Bera < > > [email protected]> > > > > wrote: > > > > > > > > > +1 > > > > > > > > > > This proposal is needed for optimizing network calls and processing > > > asyc. > > > > > > > > > > Thanks > > > > > Asimansu > > > > > > > > > > On Mon, Nov 10, 2025 at 4:04 AM Shengkai Fang <[email protected]> > > > wrote: > > > > > > > > > > > Hi, Rahul. > > > > > > > > > > > > +1 for this proposal. However, due to my current workload, I'll > > need > > > > > until > > > > > > the end of this week to review it thoroughly. > > > > > > > > > > > > Best, > > > > > > Shengkai > > > > > > > > > > > > Rahul Bhattacharya <[email protected]> 于2025年11月9日周日 13:08写道: > > > > > > > > > > > > > Hi , > > > > > > > i have created a draft FLIP for it > > > > > > > > > > > > https://docs.google.com/document/d/1U-eSuKwi5vIgAPt6ZBvb-RcbcRJiRy0e/ > > > > > > > > > > > > > > Please let me know your thoughts > > > > > > > > > > > > > > regards > > > > > > > Rahul > > > > > > > > > > > > > > On Sat, Nov 8, 2025 at 5:54 PM Rahul Bhattacharya < > > > > [email protected] > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi, > > > > > > > > I actually thought of reworking my previous response. I want > > the > > > > > table > > > > > > > api > > > > > > > > to create jsonl files and call openai/claude batch apis. > > > > > > > > The implementation I am doing is going to batch the records > > into > > > a > > > > > file > > > > > > > > and call the api with the file and then continuously poll the > > > > repose > > > > > to > > > > > > > see > > > > > > > > the status of the batch and then use that to write the > response > > > > > > records. > > > > > > > > The ML_Predict in its current form is not usable as people > are > > > not > > > > > > > looking > > > > > > > > for synchronous response which is twice as expensive as the > > > > > > asynchronous > > > > > > > > response. > > > > > > > > let me know you thoughts and i can create a FLIP for it > > > > > > > > regards > > > > > > > > > > > > > > > > On Sat, Nov 8, 2025 at 3:14 PM Rahul Bhattacharya < > > > > > [email protected] > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > >> Hi Flink Community, > > > > > > > >> > > > > > > > >> I'm interested in contributing an enhancement to Apache > > Flink's > > > > > > > >> ML_PREDICT > > > > > > > >> functionality for LLM interactions. I'd like to gauge > > community > > > > > > interest > > > > > > > >> and get > > > > > > > >> early feedback before proceeding with detailed design or a > > FLIP. > > > > > > > >> > > > > > > > >> ## Problem Statement > > > > > > > >> > > > > > > > >> Currently, when using Flink SQL's ML_PREDICT with LLM > > endpoints, > > > > > each > > > > > > > >> record > > > > > > > >> triggers an individual API call. For a stream processing > 1000 > > > > > > > >> records/second, > > > > > > > >> this results in: > > > > > > > >> > > > > > > > >> - **1000 separate API calls per second** > > > > > > > >> - **High latency**: Each call has network overhead + API > > > > processing > > > > > > time > > > > > > > >> - **High cost**: Most LLM providers charge per token, and > lack > > > of > > > > > > > >> batching means > > > > > > > >> no cost optimization > > > > > > > >> - **Rate limiting issues**: Hitting provider rate limits > > quickly > > > > > > > >> - **Poor throughput**: API calls are serialized per record > > > > > > > >> > > > > > > > >> ### Current Behavior (Inefficient) > > > > > > > >> ```sql > > > > > > > >> -- This makes 10 individual API calls > > > > > > > >> SELECT id, ML_PREDICT('llm_model', text) as result > > > > > > > >> FROM (VALUES > > > > > > > >> (1, 'text1'), (2, 'text2'), ..., (10, 'text10') > > > > > > > >> ) AS t(id, text); > > > > > > > >> ``` > > > > > > > >> **Result**: 10 separate HTTP requests, 10x latency, 10x > > overhead > > > > > > > >> > > > > > > > >> ## Proposed Solution: Application-Level Batching with Prompt > > > > > > Engineering > > > > > > > >> > > > > > > > >> Since most LLM APIs (OpenAI, Anthropic Claude, etc.) don't > > > provide > > > > > > > native > > > > > > > >> batch > > > > > > > >> endpoints, we propose implementing batching at the > application > > > > level > > > > > > by: > > > > > > > >> > > > > > > > >> 1. **Accumulating N records** into a single batch > > > > > > > >> 2. **Injecting records into a structured prompt** that > > instructs > > > > the > > > > > > LLM > > > > > > > >> to > > > > > > > >> process multiple items > > > > > > > >> 3. **Parsing structured responses** to extract results for > > each > > > > > record > > > > > > > >> 4. **Emitting individual results** back to the Flink > pipeline > > > > > > > >> > > > > > > > >> ### How It Works > > > > > > > >> > > > > > > > >> **Step 1: Batch Accumulation** > > > > > > > >> Collect up to `batch.size` records or wait up to ` > > > > batch.timeout.ms` > > > > > > > >> > > > > > > > >> **Step 2: Prompt Construction** > > > > > > > >> > > > > > > > >> System: You are a sentiment analyzer. Process each item and > > > > respond > > > > > > with > > > > > > > >> JSON. > > > > > > > >> > > > > > > > >> User: Analyze the sentiment of these texts. Return a JSON > > array > > > > with > > > > > > one > > > > > > > >> object per input containing "index" and "sentiment" fields. > > > > > > > >> > > > > > > > >> Input 1: "This product is amazing!" Input 2: "Terrible > > > experience, > > > > > > very > > > > > > > >> disappointed" Input 3: "It's okay, nothing special" ... > Input > > > 10: > > > > > > "Best > > > > > > > >> purchase ever!" > > > > > > > >> > > > > > > > >> Respond with: [{"index": 1, "sentiment": "..."}, {"index": > 2, > > > > > > > >> "sentiment": "..."}, ...] > > > > > > > >> > > > > > > > >> **Step 3: Response Parsing** > > > > > > > >> ```json > > > > > > > >> [ > > > > > > > >> {"index": 1, "sentiment": "positive"}, > > > > > > > >> {"index": 2, "sentiment": "negative"}, > > > > > > > >> {"index": 3, "sentiment": "neutral"}, > > > > > > > >> ... > > > > > > > >> {"index": 10, "sentiment": "positive"} > > > > > > > >> ] > > > > > > > >> ``` > > > > > > > >> > > > > > > > >> **Step 4: Result Distribution** > > > > > > > >> Parse JSON and emit individual results back to corresponding > > > > records > > > > > > > >> > > > > > > > >> ### Model Configuration (Defaults) > > > > > > > >> ```sql > > > > > > > >> CREATE MODEL llm_sentiment WITH ( > > > > > > > >> 'provider' = 'openai', > > > > > > > >> 'model' = 'gpt-4', > > > > > > > >> 'api_key' = '${API_KEY}', > > > > > > > >> 'batch.size' = '20', > > > > > > > >> 'batch.timeout.ms' = '1000', > > > > > > > >> 'system.prompt' = 'You are a sentiment analyzer. Always > > > > respond > > > > > > with > > > > > > > >> valid JSON.', > > > > > > > >> 'batch.prompt.template' = 'Analyze sentiment for these > > > texts. > > > > > > Return > > > > > > > >> JSON array: [{"index": <n>, "sentiment": > > > > > > > "<positive|negative|neutral>"}]', > > > > > > > >> 'response.format' = 'json', > > > > > > > >> 'response.path' = '$[*]', -- JSONPath to extract array > of > > > > > results > > > > > > > >> 'response.index.field' = 'index', -- Field containing > > > record > > > > > > index > > > > > > > >> 'response.value.field' = 'sentiment' -- Field > containing > > > > result > > > > > > > >> ); > > > > > > > >> ``` > > > > > > > >> > > > > > > > >> ### Query Usage (Use Defaults) > > > > > > > >> ```sql > > > > > > > >> -- Uses batch_size=20 from model definition > > > > > > > >> SELECT id, text, ML_PREDICT('llm_sentiment', text) as > > sentiment > > > > > > > >> FROM customer_reviews; > > > > > > > >> ``` > > > > > > > >> > > > > > > > >> ### Query Usage (Override for Custom Analysis) > > > > > > > >> ```sql > > > > > > > >> -- Override prompt and batch size for different use case > > > > > > > >> SELECT id, text, ML_PREDICT('llm_sentiment', text, > > > > > > > >> MAP['batch.size', '50', > > > > > > > >> 'batch.prompt.template', 'Extract key entities. > Return > > > > JSON: > > > > > > > >> [{"index": <n>, "entities": [...]}]', > > > > > > > >> 'response.value.field', 'entities']) as entities > > > > > > > >> FROM documents; > > > > > > > >> ``` > > > > > > > >> > > > > > > > >> ## Performance and Cost Impact > > > > > > > >> > > > > > > > >> ### Example: Processing 10,000 customer reviews > > > > > > > >> > > > > > > > >> **Current (unbatched)**: > > > > > > > >> - 10,000 API calls > > > > > > > >> - ~10,000 x 200ms latency = 2,000 seconds total processing > > time > > > > > > > >> (serialized) > > > > > > > >> - ~10,000 x $0.002 = $20 in API costs > > > > > > > >> - High rate limit pressure > > > > > > > >> > > > > > > > >> **With batching (batch_size=20)**: > > > > > > > >> - 500 API calls (10,000 / 20) > > > > > > > >> - ~500 x 300ms latency = 150 seconds total processing time > > > > > > > >> - ~500 x $0.006 = $3 in API costs (slightly higher per call > > due > > > to > > > > > > > larger > > > > > > > >> prompts, > > > > > > > >> but still 85% cheaper overall) > > > > > > > >> - **20x fewer API calls** > > > > > > > >> - **13x faster processing** > > > > > > > >> - **85% cost reduction** > > > > > > > >> > > > > > > > >> ## Proposed Implementation > > > > > > > >> > > > > > > > >> ### Configuration Parameters > > > > > > > >> > > > > > > > >> **Model-level (defaults)**: > > > > > > > >> - `batch.size`: Maximum records per batch (default: 1 for > > > backward > > > > > > > >> compatibility) > > > > > > > >> - `batch.timeout.ms`: Max time to wait before flushing > > > incomplete > > > > > > batch > > > > > > > >> (default: 1000ms) > > > > > > > >> - `system.prompt`: System-level instruction for the LLM > > > > > > > >> - `batch.prompt.template`: Template explaining how to > process > > > > > batched > > > > > > > >> inputs > > > > > > > >> - `response.format`: Expected response format ('json', > 'xml', > > > > > > > 'delimited') > > > > > > > >> - `response.path`: JSONPath or XPath to extract results > array > > > > > > > >> - `response.index.field`: Field name containing the record > > index > > > > > > > >> - `response.value.field`: Field name containing the actual > > > result > > > > > > > >> - `max.retries`: Retry attempts for failed batches (default: > > 3) > > > > > > > >> - `request.timeout.ms`: Timeout for API calls (default: > > > 30000ms) > > > > > > > >> > > > > > > > >> **Query-level (overrides)**: > > > > > > > >> - Any of the above can be overridden via MAP parameter in > > > > ML_PREDICT > > > > > > > >> - Per-query customization for different analysis tasks > > > > > > > >> > > > > > > > >> ### Key Features > > > > > > > >> 1. **Prompt injection**: Automatically construct batch > prompts > > > > with > > > > > > > >> indexed inputs > > > > > > > >> 2. **Structured response parsing**: Support JSON, XML, or > > > > delimited > > > > > > > >> formats > > > > > > > >> 3. **Index tracking**: Maintain record-to-result mapping > > through > > > > the > > > > > > > batch > > > > > > > >> 4. **Error handling**: Handle parsing failures, missing > > indices, > > > > > > > >> malformed responses > > > > > > > >> 5. **Fallback to individual calls**: If batch fails, > > optionally > > > > > retry > > > > > > > >> records individually > > > > > > > >> 6. **Provider-agnostic**: Works with any LLM API (OpenAI, > > > > Anthropic, > > > > > > > >> Azure, self-hosted) > > > > > > > >> 7. **Async processing**: Non-blocking batch requests > > > > > > > >> 8. **Back-pressure**: Proper flow control when API is slow > > > > > > > >> 9. **Backward compatible**: batch.size=1 maintains current > > > > behavior > > > > > > > >> > > > > > > > >> ### Technical Approach > > > > > > > >> - Extend existing ML_PREDICT infrastructure > > > > > > > >> - Add batching buffer in the ML_PREDICT operator > > > > > > > >> - Implement prompt template engine for batch construction: > > > > > > > >> - Inject record index + content into template > > > > > > > >> - Support various templating formats (JSON, XML, plain > text) > > > > > > > >> - Implement response parser: > > > > > > > >> - Extract structured data (JSONPath, XPath, regex) > > > > > > > >> - Map results back to original records by index > > > > > > > >> - Handle missing or malformed responses > > > > > > > >> - Maintain record ordering and error attribution > > > > > > > >> - Support parameter override mechanism in ML_PREDICT > function > > > > > > signature > > > > > > > >> > > > > > > > >> ### Response Parsing Strategy > > > > > > > >> > > > > > > > >> The implementation must handle: > > > > > > > >> 1. **Successful batch response**: Parse and distribute > results > > > > > > > >> 2. **Partial failure**: Some records missing from response → > > > emit > > > > > > errors > > > > > > > >> for those > > > > > > > >> 3. **Complete parse failure**: Optionally fallback to > > individual > > > > > calls > > > > > > > >> 4. **Index mismatch**: Response indices don't match input → > > log > > > > > > warning > > > > > > > >> and best-effort match > > > > > > > >> 5. **Malformed JSON**: Retry with error handling > > > > > > > >> > > > > > > > >> Example error handling: > > > > > > > >> ```sql > > > > > > > >> -- Records that fail parsing get null results with error > > > metadata > > > > > > > >> SELECT > > > > > > > >> id, > > > > > > > >> text, > > > > > > > >> result.value as sentiment, > > > > > > > >> result.error as error_msg > > > > > > > >> FROM source_table, > > > > > > > >> LATERAL TABLE(ML_PREDICT('llm_sentiment', text)); > > > > > > > >> ``` > > > > > > > >> > > > > > > > >> ## Limitations and Considerations > > > > > > > >> > > > > > > > >> 1. **LLM instruction following**: Depends on model's ability > > to > > > > > follow > > > > > > > >> structured > > > > > > > >> output instructions. GPT-4 and Claude are reliable; older > > > > models > > > > > > may > > > > > > > >> struggle. > > > > > > > >> > > > > > > > >> 2. **Prompt size limits**: Batching too many records may > > exceed > > > > > > context > > > > > > > >> windows > > > > > > > >> - GPT-4: ~8K tokens input limit > > > > > > > >> - Claude: ~200K tokens but practical batches smaller > > > > > > > >> - Need configurable max batch size based on average > record > > > > length > > > > > > > >> > > > > > > > >> 3. **Token cost trade-off**: Larger batches mean: > > > > > > > >> - Fewer API calls (good) > > > > > > > >> - But larger prompts with instructions/formatting (slight > > > > > overhead) > > > > > > > >> - Net savings still 80-90% in practice > > > > > > > >> > > > > > > > >> 4. **Parsing reliability**: Small risk of malformed > responses > > > > > > > >> - Mitigated by: clear instructions, JSON mode (GPT-4), > > retry > > > > > logic > > > > > > > >> - Fallback to individual calls if batch parsing fails > > > > repeatedly > > > > > > > >> > > > > > > > >> 5. **Latency characteristics**: > > > > > > > >> - Individual records see slightly higher latency (waiting > > for > > > > > > batch) > > > > > > > >> - Overall throughput dramatically improved > > > > > > > >> - Use `batch.timeout.ms` to balance latency vs > throughput > > > > > > > >> > > > > > > > >> ## Future Extensions > > > > > > > >> > > > > > > > >> This batching architecture would support: > > > > > > > >> 1. **Stateful chat sessions**: Batch multiple turns of a > > > > > conversation > > > > > > > >> with > > > > > > > >> maintained history per session key > > > > > > > >> 2. **Embedding generation**: Some providers (OpenAI) do have > > > batch > > > > > > > >> embedding APIs > > > > > > > >> 3. **Multi-modal batching**: Batch image + text processing > > with > > > > > > > >> structured outputs > > > > > > > >> > > > > > > > >> ## Questions for the Community > > > > > > > >> > > > > > > > >> 1. **Architecture**: Should this extend ML_PREDICT or be a > new > > > > > > function? > > > > > > > >> (I propose extending ML_PREDICT for backward > compatibility) > > > > > > > >> > > > > > > > >> 2. **FLIP Required?**: Does this enhancement warrant a FLIP? > > > > > > > >> > > > > > > > >> 3. **Existing Work**: Is anyone working on batching for > > > ML_PREDICT > > > > > or > > > > > > > >> similar > > > > > > > >> functionality? > > > > > > > >> > > > > > > > >> 4. **Prompt Template Engine**: Should we: > > > > > > > >> - Build a custom template engine? > > > > > > > >> - Use existing library (e.g., StringTemplate, Mustache)? > > > > > > > >> - Keep it simple with String.format initially? > > > > > > > >> > > > > > > > >> 5. **Response Parsing**: Preferred approach: > > > > > > > >> - JSONPath library (flexible but adds dependency) > > > > > > > >> - Simple JSON parsing with field names > > > > > > > >> - Pluggable parser interface for extensibility? > > > > > > > >> > > > > > > > >> 6. **Error Handling**: If parsing fails for entire batch: > > > > > > > >> - Fail all records in batch? > > > > > > > >> - Retry batch once more? > > > > > > > >> - Fallback to individual calls (with circuit breaker)? > > > > > > > >> - Make strategy configurable? > > > > > > > >> > > > > > > > >> 7. **Batch Assembly**: Should batching happen: > > > > > > > >> - Per parallel instance (each task maintains its own > > batch)? > > > > > > > >> - Globally coordinated (shuffle to batch coordinator)? > > > > > > > >> - I propose per-instance for simplicity and lower latency > > > > > > > >> > > > > > > > >> 8. **Compatibility**: Default batch.size=1 to maintain > current > > > > > > behavior, > > > > > > > >> users > > > > > > > >> opt-in to batching? > > > > > > > >> > > > > > > > >> ## Why This Matters > > > > > > > >> > > > > > > > >> LLM inference is becoming a critical part of real-time data > > > > > pipelines. > > > > > > > >> Without > > > > > > > >> batching: > > > > > > > >> - Users face prohibitive costs for high-throughput workloads > > > > > > > >> - Rate limits block production deployments > > > > > > > >> - Latency makes real-time processing impractical > > > > > > > >> > > > > > > > >> While LLM providers don't offer native batch APIs, > > > > application-level > > > > > > > >> batching > > > > > > > >> through prompt engineering is a proven pattern used in > > > production > > > > by > > > > > > > many > > > > > > > >> organizations. This proposal brings that capability natively > > > into > > > > > > Flink. > > > > > > > >> > > > > > > > >> The hybrid configuration approach provides: > > > > > > > >> - **Sensible defaults** for common use cases (sentiment > > > analysis, > > > > > > > >> classification) > > > > > > > >> - **Flexibility** to customize prompts and parsing for > > specific > > > > > needs > > > > > > > >> - **Easy migration** for existing queries (batch.size=1 > > default) > > > > > > > >> > > > > > > > >> ## Next Steps > > > > > > > >> > > > > > > > >> If there's interest from the community, I'm happy to: > > > > > > > >> 1. Prepare a detailed design document with prompt templates > > and > > > > > > parsing > > > > > > > >> examples > > > > > > > >> 2. Create a JIRA ticket > > > > > > > >> 3. Develop a prototype demonstrating the batching and > parsing > > > > logic > > > > > > > >> 4. Write a FLIP if required > > > > > > > >> > > > > > > > >> Looking forward to your feedback and guidance on how best to > > > > > > proceed!-- > > > > > > > >> Thanks And Regards > > > > > > > >> Rahul > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > Thanks And Regards > > > > > > > > Rahul > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > Thanks And Regards > > > > > > > Rahul > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > Thanks And Regards > Rahul >
