Hi Rahul,

Thanks for the FLIP. I think leveraging the LLM services' batch API is
pretty meaningful. But after reading the design doc, I want to check how
users use the batch API with current design. For example, do we still use
SQL(e.g CREATE MODEL with some new options the `WITH` clause in the doc's
`public interfaces` section)? If that's the case, the Option2 seems to not
output any result and an extra pipeline would be introduced to extract the
final result, which makes it not a single SQL solution.


Best,
Biao Geng


Rahul Bhattacharya <[email protected]> 于2025年11月21日周五 10:38写道:

> Sorry the example json did not include the original question in the
> request, this is a better example . notice i have the original_question in
> metadata and i am not using any custom_id.  metadata has no field length
> restrictions
>
> {
>   "method": "POST",
>   "url": "/v1/chat/completions",
>   "body": {
>     "model": "gpt-4.1-mini",
>     "metadata": {
>       "topic": "questions-topic",
>       "partition": 5,
>       "offset": 10223,
>       "original_question": "what is 2+5"
>     },
>     "messages": [
>       { "role": "user", "content": "what is 2+5" }
>     ]
>   }
> }
>
> Sample response for above request
>
> {
>   "response": {
>     "metadata": {
>       "topic": "questions-topic",
>       "partition": 5,
>       "offset": 10223,
>       "original_question": "what is 2+5"
>     },
>     "choices": [
>       { "message": { "role": "assistant", "content": "2+5 = 7" } }
>     ]
>   }
> }
>
>
> On Thu, Nov 20, 2025 at 8:32 PM Rahul Bhattacharya <[email protected]>
> wrote:
>
> > Hi Hao,
> > the custom_id field is not mandatory.
> > it's a field to match the answer to the question, but you can't use very
> > long strings in custom_id.
> > my suggestion will be to not pass any custom_id, but use the metadata
> > field instead which does not have any restrictions.
> >
> > Sample jsonl request file
> >
> > {
> >   "method": "POST",
> >   "url": "/v1/chat/completions",
> >   "body": {
> >     "model": "gpt-4.1-mini",
> >     "metadata": {
> >       "topic": "questions-topic",
> >       "partition": 5,
> >       "offset": 10223,
> >       "row_id": 789
> >     },
> >     "messages": [
> >       { "role": "user", "content": "what is 2+5" }
> >     ]
> >   }
> > }
> >
> > Sample response for above request
> >
> > {
> >   "response": {
> >     "metadata": {
> >       "topic": "questions-topic",
> >       "partition": 5,
> >       "offset": 10223,
> >       "row_id": 789
> >     },
> >     "choices": [
> >       { "message": { "role": "assistant", "content": "2+5 = 7" } }
> >     ]
> >   }
> > }
> >
> >
> > For the second question, the status of the batch is for the entire batch
> > of records. Once the batch shows completed users can download the
> response
> > file which will have many lines/records like the above json
> >
> >
> > regards
> > Rahul
> >
> > On Thu, Nov 20, 2025 at 4:49 PM Hao Li via dev <[email protected]>
> > wrote:
> >
> >> Hi Rahul,
> >>
> >> FYI, the FLIP process involves a voting process as well [1] before
> >> implementation. I have two more question regarding sending batch id
> >> downstream:
> >>
> >> 1. For openai batch api specifically, does it needs a `custom_id` for
> each
> >> request? Where is this custom_id from?
> >> 2. If you batch several requests, they will get the same batch_id as the
> >> response. Does downstream need to poll them for each request even though
> >> they all have the same batch_id?
> >>
> >> [1]
> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-Accepted
> >>
> >> Thanks,
> >> Hao
> >>
> >> On Thu, Nov 20, 2025 at 2:09 PM Rahul Bhattacharya <[email protected]
> >
> >> wrote:
> >>
> >> > i created a Jira for this
> >> > https://issues.apache.org/jira/browse/FLINK-38708
> >> >
> >> > On Fri, Nov 14, 2025 at 2:04 PM Hao Li <[email protected]>
> >> wrote:
> >> >
> >> > > Hi Shengkai,
> >> > >
> >> > > > process them together, and complete the future objects
> sequentially
> >> as
> >> > > they finish
> >> > >
> >> > > The batch api Rahul proposed is [1] which sends batch request from
> >> file,
> >> > > return an id and we need to poll the id for results. The important
> >> part
> >> > is
> >> > > it can take 24 hours to finish. So Rahua is proposing to just send
> the
> >> > > result id to downstream.
> >> > >
> >> > > [1] https://platform.openai.com/docs/guides/batch
> >> > >
> >> > > On Fri, Nov 14, 2025 at 8:35 AM Rahul Bhattacharya <
> >> [email protected]>
> >> > > wrote:
> >> > >
> >> > > > Hi Shengkai,
> >> > > > so i am understanding we will go with option 1 and send the
> batchid
> >> > > > downstream to do whatever the user needs to do with the batchids?
> >> > > >
> >> > > > i also in the opinion that option 1 is a better option for now
> than
> >> > > option
> >> > > > 2.
> >> > > > Based on a parameter setting we should batch n records , create a
> >> > > > jsonl file and post it to the LLM batch api.
> >> > > > The LLM will immediately return the batch id which we can just
> send
> >> it
> >> > > > downstream. this implementation will be stateless and really
> simple
> >> to
> >> > > > implement
> >> > > >
> >> > > >
> >> > > > regards
> >> > > >
> >> > > >
> >> > > >
> >> > > > On Fri, Nov 14, 2025 at 4:18 AM Shengkai Fang <[email protected]>
> >> > wrote:
> >> > > >
> >> > > > > Hi. Rahul
> >> > > > >
> >> > > > > First, I believe we don't need to modify the framework. Instead,
> >> we
> >> > can
> >> > > > > have the async predict function collect records in batches,
> >> process
> >> > > them
> >> > > > > together, and complete the future objects sequentially as they
> >> > finish.
> >> > > > This
> >> > > > > approach allows us to move forward quickly.
> >> > > > >
> >> > > > > Second, I have concerns about introducing state or external
> >> storage.
> >> > On
> >> > > > one
> >> > > > > hand, the current design is stateless, and transitioning to a
> >> > stateful
> >> > > > > architecture would require significant refactoring. On the other
> >> > hand,
> >> > > I
> >> > > > > don't see clear advantages to storing batch IDs in state, since
> we
> >> > > cannot
> >> > > > > guarantee that elements will arrive in the same order after
> >> restoring
> >> > > > from
> >> > > > > a checkpoint. For example, if the ML predictor receives elements
> >> e1,
> >> > > e2,
> >> > > > e3
> >> > > > > in the first run, it might receive e2, e3, e1 after recovery.
> >> With a
> >> > > > batch
> >> > > > > size of 2, we wouldn't be able to reuse the in-flight requests
> >> > > > effectively.
> >> > > > >
> >> > > > > Finally, I suggest we leverage the IOManager to handle JSON file
> >> > > > > management.
> >> > > > >
> >> > > > > Best,
> >> > > > > Shengkai
> >> > > > >
> >> > > > >
> >> > > > > Rahul Bhattacharya <[email protected]> 于2025年11月11日周二
> 09:03写道:
> >> > > > >
> >> > > > > > Hi Hao
> >> > > > > > For option 2 the guarantees like exactly once and other things
> >> are
> >> > > not
> >> > > > > > guaranteed as it’s not in the same flink process
> >> > > > > >
> >> > > > > > The flink process finishes after the submission and getting
> >> > batchids
> >> > > > > which
> >> > > > > > it sends downstream
> >> > > > > >
> >> > > > > > There has to be another process(doesn’t have to be flink)
> which
> >> > > takes
> >> > > > > > these batchids and polls the OpenAI endpoint for status
> >> completed.
> >> > > > > >
> >> > > > > > Once it gets completed it downloads the results and sends
> >> > downstream
> >> > > > > >
> >> > > > > > This secondary process is on client discretion , for Kafka
> >> > probably a
> >> > > > > http
> >> > > > > > sink connector or Kafka consumer
> >> > > > > >
> >> > > > > > Thanks And Regards
> >> > > > > > Rahul
> >> > > > > >
> >> > > > > >
> >> > > > > > On Mon, Nov 10, 2025 at 6:45 PM Hao Li
> <[email protected]
> >> >
> >> > > > wrote:
> >> > > > > >
> >> > > > > > > Hi Rahul,
> >> > > > > > >
> >> > > > > > > Thanks for the proposal. From some offline discussion, the
> >> > endpoint
> >> > > > you
> >> > > > > > > have in mind to support is OpenAI batch API [1]. The doc
> >> states
> >> > > that
> >> > > > > > "Each
> >> > > > > > > batch completes within 24 hours (and often more quickly)".
> >> With
> >> > > this
> >> > > > > > > context, I have some questions:
> >> > > > > > >
> >> > > > > > > 1. For design option 1, does the operator always wait for
> >> batch
> >> > > > > response
> >> > > > > > > until processing next batch? This can take 24 hours which
> >> isn't
> >> > > > > feasible
> >> > > > > > > for streaming job I think.
> >> > > > > > >
> >> > > > > > > 2. For design option 2, why it loses exact-once and have
> >> higher
> >> > > > latency
> >> > > > > > > compared to 1?
> >> > > > > > >
> >> > > > > > > 3. Also for the public interface section, are the parameters
> >> in
> >> > > > > > > `ml_predict` config or in options when `create model`?
> >> > > > > > >
> >> > > > > > > Thanks,
> >> > > > > > > Hao
> >> > > > > > >
> >> > > > > > >
> >> > > > > > > [1] https://platform.openai.com/docs/guides/batch/batch-api
> >> > > > > > >
> >> > > > > > > On Mon, Nov 10, 2025 at 10:19 AM Asimansu Bera <
> >> > > > > [email protected]>
> >> > > > > > > wrote:
> >> > > > > > >
> >> > > > > > > > +1
> >> > > > > > > >
> >> > > > > > > > This proposal is needed for optimizing network calls and
> >> > > processing
> >> > > > > > asyc.
> >> > > > > > > >
> >> > > > > > > > Thanks
> >> > > > > > > > Asimansu
> >> > > > > > > >
> >> > > > > > > > On Mon, Nov 10, 2025 at 4:04 AM Shengkai Fang <
> >> > [email protected]
> >> > > >
> >> > > > > > wrote:
> >> > > > > > > >
> >> > > > > > > > > Hi, Rahul.
> >> > > > > > > > >
> >> > > > > > > > > +1 for this proposal. However, due to my current
> workload,
> >> > I'll
> >> > > > > need
> >> > > > > > > > until
> >> > > > > > > > > the end of this week to review it thoroughly.
> >> > > > > > > > >
> >> > > > > > > > > Best,
> >> > > > > > > > > Shengkai
> >> > > > > > > > >
> >> > > > > > > > > Rahul Bhattacharya <[email protected]> 于2025年11月9日周日
> >> > > 13:08写道:
> >> > > > > > > > >
> >> > > > > > > > > > Hi ,
> >> > > > > > > > > > i have created a draft FLIP for it
> >> > > > > > > > > >
> >> > > > > > >
> >> > > >
> >> https://docs.google.com/document/d/1U-eSuKwi5vIgAPt6ZBvb-RcbcRJiRy0e/
> >> > > > > > > > > >
> >> > > > > > > > > > Please let me know your thoughts
> >> > > > > > > > > >
> >> > > > > > > > > > regards
> >> > > > > > > > > > Rahul
> >> > > > > > > > > >
> >> > > > > > > > > > On Sat, Nov 8, 2025 at 5:54 PM Rahul Bhattacharya <
> >> > > > > > > [email protected]
> >> > > > > > > > >
> >> > > > > > > > > > wrote:
> >> > > > > > > > > >
> >> > > > > > > > > > > Hi,
> >> > > > > > > > > > > I actually thought of reworking my previous
> response.
> >> I
> >> > > want
> >> > > > > the
> >> > > > > > > > table
> >> > > > > > > > > > api
> >> > > > > > > > > > > to create jsonl files and call openai/claude batch
> >> apis.
> >> > > > > > > > > > > The implementation I am doing is going to batch the
> >> > records
> >> > > > > into
> >> > > > > > a
> >> > > > > > > > file
> >> > > > > > > > > > > and call the api with the file and then continuously
> >> poll
> >> > > the
> >> > > > > > > repose
> >> > > > > > > > to
> >> > > > > > > > > > see
> >> > > > > > > > > > > the status of the batch and then use that to write
> the
> >> > > > response
> >> > > > > > > > > records.
> >> > > > > > > > > > > The ML_Predict in its current form is not usable as
> >> > people
> >> > > > are
> >> > > > > > not
> >> > > > > > > > > > looking
> >> > > > > > > > > > > for synchronous response which is twice as expensive
> >> as
> >> > the
> >> > > > > > > > > asynchronous
> >> > > > > > > > > > > response.
> >> > > > > > > > > > > let me know you thoughts and i can create a FLIP for
> >> it
> >> > > > > > > > > > > regards
> >> > > > > > > > > > >
> >> > > > > > > > > > > On Sat, Nov 8, 2025 at 3:14 PM Rahul Bhattacharya <
> >> > > > > > > > [email protected]
> >> > > > > > > > > >
> >> > > > > > > > > > > wrote:
> >> > > > > > > > > > >
> >> > > > > > > > > > >> Hi Flink Community,
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> I'm interested in contributing an enhancement to
> >> Apache
> >> > > > > Flink's
> >> > > > > > > > > > >> ML_PREDICT
> >> > > > > > > > > > >> functionality for LLM interactions. I'd like to
> gauge
> >> > > > > community
> >> > > > > > > > > interest
> >> > > > > > > > > > >> and get
> >> > > > > > > > > > >> early feedback before proceeding with detailed
> design
> >> > or a
> >> > > > > FLIP.
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> ## Problem Statement
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> Currently, when using Flink SQL's ML_PREDICT with
> LLM
> >> > > > > endpoints,
> >> > > > > > > > each
> >> > > > > > > > > > >> record
> >> > > > > > > > > > >> triggers an individual API call. For a stream
> >> processing
> >> > > > 1000
> >> > > > > > > > > > >> records/second,
> >> > > > > > > > > > >> this results in:
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> - **1000 separate API calls per second**
> >> > > > > > > > > > >> - **High latency**: Each call has network overhead
> +
> >> API
> >> > > > > > > processing
> >> > > > > > > > > time
> >> > > > > > > > > > >> - **High cost**: Most LLM providers charge per
> token,
> >> > and
> >> > > > lack
> >> > > > > > of
> >> > > > > > > > > > >> batching means
> >> > > > > > > > > > >>   no cost optimization
> >> > > > > > > > > > >> - **Rate limiting issues**: Hitting provider rate
> >> limits
> >> > > > > quickly
> >> > > > > > > > > > >> - **Poor throughput**: API calls are serialized per
> >> > record
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> ### Current Behavior (Inefficient)
> >> > > > > > > > > > >> ```sql
> >> > > > > > > > > > >> -- This makes 10 individual API calls
> >> > > > > > > > > > >> SELECT id, ML_PREDICT('llm_model', text) as result
> >> > > > > > > > > > >> FROM (VALUES
> >> > > > > > > > > > >>     (1, 'text1'), (2, 'text2'), ..., (10, 'text10')
> >> > > > > > > > > > >> ) AS t(id, text);
> >> > > > > > > > > > >> ```
> >> > > > > > > > > > >> **Result**: 10 separate HTTP requests, 10x latency,
> >> 10x
> >> > > > > overhead
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> ## Proposed Solution: Application-Level Batching
> with
> >> > > Prompt
> >> > > > > > > > > Engineering
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> Since most LLM APIs (OpenAI, Anthropic Claude,
> etc.)
> >> > don't
> >> > > > > > provide
> >> > > > > > > > > > native
> >> > > > > > > > > > >> batch
> >> > > > > > > > > > >> endpoints, we propose implementing batching at the
> >> > > > application
> >> > > > > > > level
> >> > > > > > > > > by:
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> 1. **Accumulating N records** into a single batch
> >> > > > > > > > > > >> 2. **Injecting records into a structured prompt**
> >> that
> >> > > > > instructs
> >> > > > > > > the
> >> > > > > > > > > LLM
> >> > > > > > > > > > >> to
> >> > > > > > > > > > >>    process multiple items
> >> > > > > > > > > > >> 3. **Parsing structured responses** to extract
> >> results
> >> > for
> >> > > > > each
> >> > > > > > > > record
> >> > > > > > > > > > >> 4. **Emitting individual results** back to the
> Flink
> >> > > > pipeline
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> ### How It Works
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> **Step 1: Batch Accumulation**
> >> > > > > > > > > > >> Collect up to `batch.size` records or wait up to `
> >> > > > > > > batch.timeout.ms`
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> **Step 2: Prompt Construction**
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> System: You are a sentiment analyzer. Process each
> >> item
> >> > > and
> >> > > > > > > respond
> >> > > > > > > > > with
> >> > > > > > > > > > >> JSON.
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> User: Analyze the sentiment of these texts. Return
> a
> >> > JSON
> >> > > > > array
> >> > > > > > > with
> >> > > > > > > > > one
> >> > > > > > > > > > >> object per input containing "index" and "sentiment"
> >> > > fields.
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> Input 1: "This product is amazing!" Input 2:
> >> "Terrible
> >> > > > > > experience,
> >> > > > > > > > > very
> >> > > > > > > > > > >> disappointed" Input 3: "It's okay, nothing special"
> >> ...
> >> > > > Input
> >> > > > > > 10:
> >> > > > > > > > > "Best
> >> > > > > > > > > > >> purchase ever!"
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> Respond with: [{"index": 1, "sentiment": "..."},
> >> > {"index":
> >> > > > 2,
> >> > > > > > > > > > >> "sentiment": "..."}, ...]
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> **Step 3: Response Parsing**
> >> > > > > > > > > > >> ```json
> >> > > > > > > > > > >> [
> >> > > > > > > > > > >>   {"index": 1, "sentiment": "positive"},
> >> > > > > > > > > > >>   {"index": 2, "sentiment": "negative"},
> >> > > > > > > > > > >>   {"index": 3, "sentiment": "neutral"},
> >> > > > > > > > > > >>   ...
> >> > > > > > > > > > >>   {"index": 10, "sentiment": "positive"}
> >> > > > > > > > > > >> ]
> >> > > > > > > > > > >> ```
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> **Step 4: Result Distribution**
> >> > > > > > > > > > >> Parse JSON and emit individual results back to
> >> > > corresponding
> >> > > > > > > records
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> ### Model Configuration (Defaults)
> >> > > > > > > > > > >> ```sql
> >> > > > > > > > > > >> CREATE MODEL llm_sentiment WITH (
> >> > > > > > > > > > >>     'provider' = 'openai',
> >> > > > > > > > > > >>     'model' = 'gpt-4',
> >> > > > > > > > > > >>     'api_key' = '${API_KEY}',
> >> > > > > > > > > > >>     'batch.size' = '20',
> >> > > > > > > > > > >>     'batch.timeout.ms' = '1000',
> >> > > > > > > > > > >>     'system.prompt' = 'You are a sentiment
> analyzer.
> >> > > Always
> >> > > > > > > respond
> >> > > > > > > > > with
> >> > > > > > > > > > >> valid JSON.',
> >> > > > > > > > > > >>     'batch.prompt.template' = 'Analyze sentiment
> for
> >> > these
> >> > > > > > texts.
> >> > > > > > > > > Return
> >> > > > > > > > > > >> JSON array: [{"index": <n>, "sentiment":
> >> > > > > > > > > > "<positive|negative|neutral>"}]',
> >> > > > > > > > > > >>     'response.format' = 'json',
> >> > > > > > > > > > >>     'response.path' = '$[*]',  -- JSONPath to
> extract
> >> > > array
> >> > > > of
> >> > > > > > > > results
> >> > > > > > > > > > >>     'response.index.field' = 'index',  -- Field
> >> > containing
> >> > > > > > record
> >> > > > > > > > > index
> >> > > > > > > > > > >>     'response.value.field' = 'sentiment'  -- Field
> >> > > > containing
> >> > > > > > > result
> >> > > > > > > > > > >> );
> >> > > > > > > > > > >> ```
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> ### Query Usage (Use Defaults)
> >> > > > > > > > > > >> ```sql
> >> > > > > > > > > > >> -- Uses batch_size=20 from model definition
> >> > > > > > > > > > >> SELECT id, text, ML_PREDICT('llm_sentiment', text)
> as
> >> > > > > sentiment
> >> > > > > > > > > > >> FROM customer_reviews;
> >> > > > > > > > > > >> ```
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> ### Query Usage (Override for Custom Analysis)
> >> > > > > > > > > > >> ```sql
> >> > > > > > > > > > >> -- Override prompt and batch size for different use
> >> case
> >> > > > > > > > > > >> SELECT id, text, ML_PREDICT('llm_sentiment', text,
> >> > > > > > > > > > >>     MAP['batch.size', '50',
> >> > > > > > > > > > >>         'batch.prompt.template', 'Extract key
> >> entities.
> >> > > > Return
> >> > > > > > > JSON:
> >> > > > > > > > > > >> [{"index": <n>, "entities": [...]}]',
> >> > > > > > > > > > >>         'response.value.field', 'entities']) as
> >> entities
> >> > > > > > > > > > >> FROM documents;
> >> > > > > > > > > > >> ```
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> ## Performance and Cost Impact
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> ### Example: Processing 10,000 customer reviews
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> **Current (unbatched)**:
> >> > > > > > > > > > >> - 10,000 API calls
> >> > > > > > > > > > >> - ~10,000 x 200ms latency = 2,000 seconds total
> >> > processing
> >> > > > > time
> >> > > > > > > > > > >> (serialized)
> >> > > > > > > > > > >> - ~10,000 x $0.002 = $20 in API costs
> >> > > > > > > > > > >> - High rate limit pressure
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> **With batching (batch_size=20)**:
> >> > > > > > > > > > >> - 500 API calls (10,000 / 20)
> >> > > > > > > > > > >> - ~500 x 300ms latency = 150 seconds total
> processing
> >> > time
> >> > > > > > > > > > >> - ~500 x $0.006 = $3 in API costs (slightly higher
> >> per
> >> > > call
> >> > > > > due
> >> > > > > > to
> >> > > > > > > > > > larger
> >> > > > > > > > > > >> prompts,
> >> > > > > > > > > > >>   but still 85% cheaper overall)
> >> > > > > > > > > > >> - **20x fewer API calls**
> >> > > > > > > > > > >> - **13x faster processing**
> >> > > > > > > > > > >> - **85% cost reduction**
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> ## Proposed Implementation
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> ### Configuration Parameters
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> **Model-level (defaults)**:
> >> > > > > > > > > > >> - `batch.size`: Maximum records per batch
> (default: 1
> >> > for
> >> > > > > > backward
> >> > > > > > > > > > >> compatibility)
> >> > > > > > > > > > >> - `batch.timeout.ms`: Max time to wait before
> >> flushing
> >> > > > > > incomplete
> >> > > > > > > > > batch
> >> > > > > > > > > > >> (default: 1000ms)
> >> > > > > > > > > > >> - `system.prompt`: System-level instruction for the
> >> LLM
> >> > > > > > > > > > >> - `batch.prompt.template`: Template explaining how
> to
> >> > > > process
> >> > > > > > > > batched
> >> > > > > > > > > > >> inputs
> >> > > > > > > > > > >> - `response.format`: Expected response format
> >> ('json',
> >> > > > 'xml',
> >> > > > > > > > > > 'delimited')
> >> > > > > > > > > > >> - `response.path`: JSONPath or XPath to extract
> >> results
> >> > > > array
> >> > > > > > > > > > >> - `response.index.field`: Field name containing the
> >> > record
> >> > > > > index
> >> > > > > > > > > > >> - `response.value.field`: Field name containing the
> >> > actual
> >> > > > > > result
> >> > > > > > > > > > >> - `max.retries`: Retry attempts for failed batches
> >> > > (default:
> >> > > > > 3)
> >> > > > > > > > > > >> - `request.timeout.ms`: Timeout for API calls
> >> (default:
> >> > > > > > 30000ms)
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> **Query-level (overrides)**:
> >> > > > > > > > > > >> - Any of the above can be overridden via MAP
> >> parameter
> >> > in
> >> > > > > > > ML_PREDICT
> >> > > > > > > > > > >> - Per-query customization for different analysis
> >> tasks
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> ### Key Features
> >> > > > > > > > > > >> 1. **Prompt injection**: Automatically construct
> >> batch
> >> > > > prompts
> >> > > > > > > with
> >> > > > > > > > > > >> indexed inputs
> >> > > > > > > > > > >> 2. **Structured response parsing**: Support JSON,
> >> XML,
> >> > or
> >> > > > > > > delimited
> >> > > > > > > > > > >> formats
> >> > > > > > > > > > >> 3. **Index tracking**: Maintain record-to-result
> >> mapping
> >> > > > > through
> >> > > > > > > the
> >> > > > > > > > > > batch
> >> > > > > > > > > > >> 4. **Error handling**: Handle parsing failures,
> >> missing
> >> > > > > indices,
> >> > > > > > > > > > >> malformed responses
> >> > > > > > > > > > >> 5. **Fallback to individual calls**: If batch
> fails,
> >> > > > > optionally
> >> > > > > > > > retry
> >> > > > > > > > > > >> records individually
> >> > > > > > > > > > >> 6. **Provider-agnostic**: Works with any LLM API
> >> > (OpenAI,
> >> > > > > > > Anthropic,
> >> > > > > > > > > > >> Azure, self-hosted)
> >> > > > > > > > > > >> 7. **Async processing**: Non-blocking batch
> requests
> >> > > > > > > > > > >> 8. **Back-pressure**: Proper flow control when API
> is
> >> > slow
> >> > > > > > > > > > >> 9. **Backward compatible**: batch.size=1 maintains
> >> > current
> >> > > > > > > behavior
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> ### Technical Approach
> >> > > > > > > > > > >> - Extend existing ML_PREDICT infrastructure
> >> > > > > > > > > > >> - Add batching buffer in the ML_PREDICT operator
> >> > > > > > > > > > >> - Implement prompt template engine for batch
> >> > construction:
> >> > > > > > > > > > >>   - Inject record index + content into template
> >> > > > > > > > > > >>   - Support various templating formats (JSON, XML,
> >> plain
> >> > > > text)
> >> > > > > > > > > > >> - Implement response parser:
> >> > > > > > > > > > >>   - Extract structured data (JSONPath, XPath,
> regex)
> >> > > > > > > > > > >>   - Map results back to original records by index
> >> > > > > > > > > > >>   - Handle missing or malformed responses
> >> > > > > > > > > > >> - Maintain record ordering and error attribution
> >> > > > > > > > > > >> - Support parameter override mechanism in
> ML_PREDICT
> >> > > > function
> >> > > > > > > > > signature
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> ### Response Parsing Strategy
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> The implementation must handle:
> >> > > > > > > > > > >> 1. **Successful batch response**: Parse and
> >> distribute
> >> > > > results
> >> > > > > > > > > > >> 2. **Partial failure**: Some records missing from
> >> > > response →
> >> > > > > > emit
> >> > > > > > > > > errors
> >> > > > > > > > > > >> for those
> >> > > > > > > > > > >> 3. **Complete parse failure**: Optionally fallback
> to
> >> > > > > individual
> >> > > > > > > > calls
> >> > > > > > > > > > >> 4. **Index mismatch**: Response indices don't match
> >> > input
> >> > > →
> >> > > > > log
> >> > > > > > > > > warning
> >> > > > > > > > > > >> and best-effort match
> >> > > > > > > > > > >> 5. **Malformed JSON**: Retry with error handling
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> Example error handling:
> >> > > > > > > > > > >> ```sql
> >> > > > > > > > > > >> -- Records that fail parsing get null results with
> >> error
> >> > > > > > metadata
> >> > > > > > > > > > >> SELECT
> >> > > > > > > > > > >>     id,
> >> > > > > > > > > > >>     text,
> >> > > > > > > > > > >>     result.value as sentiment,
> >> > > > > > > > > > >>     result.error as error_msg
> >> > > > > > > > > > >> FROM source_table,
> >> > > > > > > > > > >> LATERAL TABLE(ML_PREDICT('llm_sentiment', text));
> >> > > > > > > > > > >> ```
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> ## Limitations and Considerations
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> 1. **LLM instruction following**: Depends on
> model's
> >> > > ability
> >> > > > > to
> >> > > > > > > > follow
> >> > > > > > > > > > >> structured
> >> > > > > > > > > > >>    output instructions. GPT-4 and Claude are
> >> reliable;
> >> > > older
> >> > > > > > > models
> >> > > > > > > > > may
> >> > > > > > > > > > >> struggle.
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> 2. **Prompt size limits**: Batching too many
> records
> >> may
> >> > > > > exceed
> >> > > > > > > > > context
> >> > > > > > > > > > >> windows
> >> > > > > > > > > > >>    - GPT-4: ~8K tokens input limit
> >> > > > > > > > > > >>    - Claude: ~200K tokens but practical batches
> >> smaller
> >> > > > > > > > > > >>    - Need configurable max batch size based on
> >> average
> >> > > > record
> >> > > > > > > length
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> 3. **Token cost trade-off**: Larger batches mean:
> >> > > > > > > > > > >>    - Fewer API calls (good)
> >> > > > > > > > > > >>    - But larger prompts with
> instructions/formatting
> >> > > (slight
> >> > > > > > > > overhead)
> >> > > > > > > > > > >>    - Net savings still 80-90% in practice
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> 4. **Parsing reliability**: Small risk of malformed
> >> > > > responses
> >> > > > > > > > > > >>    - Mitigated by: clear instructions, JSON mode
> >> > (GPT-4),
> >> > > > > retry
> >> > > > > > > > logic
> >> > > > > > > > > > >>    - Fallback to individual calls if batch parsing
> >> fails
> >> > > > > > > repeatedly
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> 5. **Latency characteristics**:
> >> > > > > > > > > > >>    - Individual records see slightly higher latency
> >> > > (waiting
> >> > > > > for
> >> > > > > > > > > batch)
> >> > > > > > > > > > >>    - Overall throughput dramatically improved
> >> > > > > > > > > > >>    - Use `batch.timeout.ms` to balance latency vs
> >> > > > throughput
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> ## Future Extensions
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> This batching architecture would support:
> >> > > > > > > > > > >> 1. **Stateful chat sessions**: Batch multiple turns
> >> of a
> >> > > > > > > > conversation
> >> > > > > > > > > > >> with
> >> > > > > > > > > > >>    maintained history per session key
> >> > > > > > > > > > >> 2. **Embedding generation**: Some providers
> (OpenAI)
> >> do
> >> > > have
> >> > > > > > batch
> >> > > > > > > > > > >> embedding APIs
> >> > > > > > > > > > >> 3. **Multi-modal batching**: Batch image + text
> >> > processing
> >> > > > > with
> >> > > > > > > > > > >> structured outputs
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> ## Questions for the Community
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> 1. **Architecture**: Should this extend ML_PREDICT
> or
> >> > be a
> >> > > > new
> >> > > > > > > > > function?
> >> > > > > > > > > > >>    (I propose extending ML_PREDICT for backward
> >> > > > compatibility)
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> 2. **FLIP Required?**: Does this enhancement
> warrant
> >> a
> >> > > FLIP?
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> 3. **Existing Work**: Is anyone working on batching
> >> for
> >> > > > > > ML_PREDICT
> >> > > > > > > > or
> >> > > > > > > > > > >> similar
> >> > > > > > > > > > >>    functionality?
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> 4. **Prompt Template Engine**: Should we:
> >> > > > > > > > > > >>    - Build a custom template engine?
> >> > > > > > > > > > >>    - Use existing library (e.g., StringTemplate,
> >> > > Mustache)?
> >> > > > > > > > > > >>    - Keep it simple with String.format initially?
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> 5. **Response Parsing**: Preferred approach:
> >> > > > > > > > > > >>    - JSONPath library (flexible but adds
> dependency)
> >> > > > > > > > > > >>    - Simple JSON parsing with field names
> >> > > > > > > > > > >>    - Pluggable parser interface for extensibility?
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> 6. **Error Handling**: If parsing fails for entire
> >> > batch:
> >> > > > > > > > > > >>    - Fail all records in batch?
> >> > > > > > > > > > >>    - Retry batch once more?
> >> > > > > > > > > > >>    - Fallback to individual calls (with circuit
> >> > breaker)?
> >> > > > > > > > > > >>    - Make strategy configurable?
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> 7. **Batch Assembly**: Should batching happen:
> >> > > > > > > > > > >>    - Per parallel instance (each task maintains its
> >> own
> >> > > > > batch)?
> >> > > > > > > > > > >>    - Globally coordinated (shuffle to batch
> >> > coordinator)?
> >> > > > > > > > > > >>    - I propose per-instance for simplicity and
> lower
> >> > > latency
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> 8. **Compatibility**: Default batch.size=1 to
> >> maintain
> >> > > > current
> >> > > > > > > > > behavior,
> >> > > > > > > > > > >> users
> >> > > > > > > > > > >>    opt-in to batching?
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> ## Why This Matters
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> LLM inference is becoming a critical part of
> >> real-time
> >> > > data
> >> > > > > > > > pipelines.
> >> > > > > > > > > > >> Without
> >> > > > > > > > > > >> batching:
> >> > > > > > > > > > >> - Users face prohibitive costs for high-throughput
> >> > > workloads
> >> > > > > > > > > > >> - Rate limits block production deployments
> >> > > > > > > > > > >> - Latency makes real-time processing impractical
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> While LLM providers don't offer native batch APIs,
> >> > > > > > > application-level
> >> > > > > > > > > > >> batching
> >> > > > > > > > > > >> through prompt engineering is a proven pattern used
> >> in
> >> > > > > > production
> >> > > > > > > by
> >> > > > > > > > > > many
> >> > > > > > > > > > >> organizations. This proposal brings that capability
> >> > > natively
> >> > > > > > into
> >> > > > > > > > > Flink.
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> The hybrid configuration approach provides:
> >> > > > > > > > > > >> - **Sensible defaults** for common use cases
> >> (sentiment
> >> > > > > > analysis,
> >> > > > > > > > > > >> classification)
> >> > > > > > > > > > >> - **Flexibility** to customize prompts and parsing
> >> for
> >> > > > > specific
> >> > > > > > > > needs
> >> > > > > > > > > > >> - **Easy migration** for existing queries
> >> (batch.size=1
> >> > > > > default)
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> ## Next Steps
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> If there's interest from the community, I'm happy
> to:
> >> > > > > > > > > > >> 1. Prepare a detailed design document with prompt
> >> > > templates
> >> > > > > and
> >> > > > > > > > > parsing
> >> > > > > > > > > > >> examples
> >> > > > > > > > > > >> 2. Create a JIRA ticket
> >> > > > > > > > > > >> 3. Develop a prototype demonstrating the batching
> and
> >> > > > parsing
> >> > > > > > > logic
> >> > > > > > > > > > >> 4. Write a FLIP if required
> >> > > > > > > > > > >>
> >> > > > > > > > > > >> Looking forward to your feedback and guidance on
> how
> >> > best
> >> > > to
> >> > > > > > > > > proceed!--
> >> > > > > > > > > > >> Thanks And Regards
> >> > > > > > > > > > >> Rahul
> >> > > > > > > > > > >>
> >> > > > > > > > > > >
> >> > > > > > > > > > >
> >> > > > > > > > > > > --
> >> > > > > > > > > > > Thanks And Regards
> >> > > > > > > > > > > Rahul
> >> > > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > >
> >> > > > > > > > > > --
> >> > > > > > > > > > Thanks And Regards
> >> > > > > > > > > > Rahul
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > > >
> >> > > > --
> >> > > > Thanks And Regards
> >> > > > Rahul
> >> > > >
> >> > >
> >> >
> >> >
> >> > --
> >> > Thanks And Regards
> >> > Rahul
> >> >
> >>
> >
> >
> > --
> > Thanks And Regards
> > Rahul
> >
>
>
> --
> Thanks And Regards
> Rahul
>

Reply via email to