Hi Biao,
option 1 was to just call the batch api and get back the batch id and send
it downstream as it is. some other process (using flink or any other
mechanism ) , is continuously polling the openai batch apis to get the
status of the batches and downloads the batch to retrieve the records once
complete. The records will have the original question in metadata to avoid
joins with source topic

option 2 was to have another flink thread do the second part of polling the
batch apis to get to the records.


Me and most people in the thread are in the opinion option 1 is better.
Option 2 - there is no guarantee when openai will finish the batch and we
need to continuously run the polller in the flink ecosystem, and it becomes
exponentially more complicated to implement.



If everyone agrees I can drop option 2 from the flip.

regards




On Sun, Nov 23, 2025 at 11:24 AM Biao Geng <[email protected]> wrote:

> Hi Rahul,
>
> Thanks for the FLIP. I think leveraging the LLM services' batch API is
> pretty meaningful. But after reading the design doc, I want to check how
> users use the batch API with current design. For example, do we still use
> SQL(e.g CREATE MODEL with some new options the `WITH` clause in the doc's
> `public interfaces` section)? If that's the case, the Option2 seems to not
> output any result and an extra pipeline would be introduced to extract the
> final result, which makes it not a single SQL solution.
>
>
> Best,
> Biao Geng
>
>
> Rahul Bhattacharya <[email protected]> 于2025年11月21日周五 10:38写道:
>
> > Sorry the example json did not include the original question in the
> > request, this is a better example . notice i have the original_question
> in
> > metadata and i am not using any custom_id.  metadata has no field length
> > restrictions
> >
> > {
> >   "method": "POST",
> >   "url": "/v1/chat/completions",
> >   "body": {
> >     "model": "gpt-4.1-mini",
> >     "metadata": {
> >       "topic": "questions-topic",
> >       "partition": 5,
> >       "offset": 10223,
> >       "original_question": "what is 2+5"
> >     },
> >     "messages": [
> >       { "role": "user", "content": "what is 2+5" }
> >     ]
> >   }
> > }
> >
> > Sample response for above request
> >
> > {
> >   "response": {
> >     "metadata": {
> >       "topic": "questions-topic",
> >       "partition": 5,
> >       "offset": 10223,
> >       "original_question": "what is 2+5"
> >     },
> >     "choices": [
> >       { "message": { "role": "assistant", "content": "2+5 = 7" } }
> >     ]
> >   }
> > }
> >
> >
> > On Thu, Nov 20, 2025 at 8:32 PM Rahul Bhattacharya <[email protected]>
> > wrote:
> >
> > > Hi Hao,
> > > the custom_id field is not mandatory.
> > > it's a field to match the answer to the question, but you can't use
> very
> > > long strings in custom_id.
> > > my suggestion will be to not pass any custom_id, but use the metadata
> > > field instead which does not have any restrictions.
> > >
> > > Sample jsonl request file
> > >
> > > {
> > >   "method": "POST",
> > >   "url": "/v1/chat/completions",
> > >   "body": {
> > >     "model": "gpt-4.1-mini",
> > >     "metadata": {
> > >       "topic": "questions-topic",
> > >       "partition": 5,
> > >       "offset": 10223,
> > >       "row_id": 789
> > >     },
> > >     "messages": [
> > >       { "role": "user", "content": "what is 2+5" }
> > >     ]
> > >   }
> > > }
> > >
> > > Sample response for above request
> > >
> > > {
> > >   "response": {
> > >     "metadata": {
> > >       "topic": "questions-topic",
> > >       "partition": 5,
> > >       "offset": 10223,
> > >       "row_id": 789
> > >     },
> > >     "choices": [
> > >       { "message": { "role": "assistant", "content": "2+5 = 7" } }
> > >     ]
> > >   }
> > > }
> > >
> > >
> > > For the second question, the status of the batch is for the entire
> batch
> > > of records. Once the batch shows completed users can download the
> > response
> > > file which will have many lines/records like the above json
> > >
> > >
> > > regards
> > > Rahul
> > >
> > > On Thu, Nov 20, 2025 at 4:49 PM Hao Li via dev <[email protected]>
> > > wrote:
> > >
> > >> Hi Rahul,
> > >>
> > >> FYI, the FLIP process involves a voting process as well [1] before
> > >> implementation. I have two more question regarding sending batch id
> > >> downstream:
> > >>
> > >> 1. For openai batch api specifically, does it needs a `custom_id` for
> > each
> > >> request? Where is this custom_id from?
> > >> 2. If you batch several requests, they will get the same batch_id as
> the
> > >> response. Does downstream need to poll them for each request even
> though
> > >> they all have the same batch_id?
> > >>
> > >> [1]
> > >>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-Accepted
> > >>
> > >> Thanks,
> > >> Hao
> > >>
> > >> On Thu, Nov 20, 2025 at 2:09 PM Rahul Bhattacharya <
> [email protected]
> > >
> > >> wrote:
> > >>
> > >> > i created a Jira for this
> > >> > https://issues.apache.org/jira/browse/FLINK-38708
> > >> >
> > >> > On Fri, Nov 14, 2025 at 2:04 PM Hao Li <[email protected]>
> > >> wrote:
> > >> >
> > >> > > Hi Shengkai,
> > >> > >
> > >> > > > process them together, and complete the future objects
> > sequentially
> > >> as
> > >> > > they finish
> > >> > >
> > >> > > The batch api Rahul proposed is [1] which sends batch request from
> > >> file,
> > >> > > return an id and we need to poll the id for results. The important
> > >> part
> > >> > is
> > >> > > it can take 24 hours to finish. So Rahua is proposing to just send
> > the
> > >> > > result id to downstream.
> > >> > >
> > >> > > [1] https://platform.openai.com/docs/guides/batch
> > >> > >
> > >> > > On Fri, Nov 14, 2025 at 8:35 AM Rahul Bhattacharya <
> > >> [email protected]>
> > >> > > wrote:
> > >> > >
> > >> > > > Hi Shengkai,
> > >> > > > so i am understanding we will go with option 1 and send the
> > batchid
> > >> > > > downstream to do whatever the user needs to do with the
> batchids?
> > >> > > >
> > >> > > > i also in the opinion that option 1 is a better option for now
> > than
> > >> > > option
> > >> > > > 2.
> > >> > > > Based on a parameter setting we should batch n records , create
> a
> > >> > > > jsonl file and post it to the LLM batch api.
> > >> > > > The LLM will immediately return the batch id which we can just
> > send
> > >> it
> > >> > > > downstream. this implementation will be stateless and really
> > simple
> > >> to
> > >> > > > implement
> > >> > > >
> > >> > > >
> > >> > > > regards
> > >> > > >
> > >> > > >
> > >> > > >
> > >> > > > On Fri, Nov 14, 2025 at 4:18 AM Shengkai Fang <
> [email protected]>
> > >> > wrote:
> > >> > > >
> > >> > > > > Hi. Rahul
> > >> > > > >
> > >> > > > > First, I believe we don't need to modify the framework.
> Instead,
> > >> we
> > >> > can
> > >> > > > > have the async predict function collect records in batches,
> > >> process
> > >> > > them
> > >> > > > > together, and complete the future objects sequentially as they
> > >> > finish.
> > >> > > > This
> > >> > > > > approach allows us to move forward quickly.
> > >> > > > >
> > >> > > > > Second, I have concerns about introducing state or external
> > >> storage.
> > >> > On
> > >> > > > one
> > >> > > > > hand, the current design is stateless, and transitioning to a
> > >> > stateful
> > >> > > > > architecture would require significant refactoring. On the
> other
> > >> > hand,
> > >> > > I
> > >> > > > > don't see clear advantages to storing batch IDs in state,
> since
> > we
> > >> > > cannot
> > >> > > > > guarantee that elements will arrive in the same order after
> > >> restoring
> > >> > > > from
> > >> > > > > a checkpoint. For example, if the ML predictor receives
> elements
> > >> e1,
> > >> > > e2,
> > >> > > > e3
> > >> > > > > in the first run, it might receive e2, e3, e1 after recovery.
> > >> With a
> > >> > > > batch
> > >> > > > > size of 2, we wouldn't be able to reuse the in-flight requests
> > >> > > > effectively.
> > >> > > > >
> > >> > > > > Finally, I suggest we leverage the IOManager to handle JSON
> file
> > >> > > > > management.
> > >> > > > >
> > >> > > > > Best,
> > >> > > > > Shengkai
> > >> > > > >
> > >> > > > >
> > >> > > > > Rahul Bhattacharya <[email protected]> 于2025年11月11日周二
> > 09:03写道:
> > >> > > > >
> > >> > > > > > Hi Hao
> > >> > > > > > For option 2 the guarantees like exactly once and other
> things
> > >> are
> > >> > > not
> > >> > > > > > guaranteed as it’s not in the same flink process
> > >> > > > > >
> > >> > > > > > The flink process finishes after the submission and getting
> > >> > batchids
> > >> > > > > which
> > >> > > > > > it sends downstream
> > >> > > > > >
> > >> > > > > > There has to be another process(doesn’t have to be flink)
> > which
> > >> > > takes
> > >> > > > > > these batchids and polls the OpenAI endpoint for status
> > >> completed.
> > >> > > > > >
> > >> > > > > > Once it gets completed it downloads the results and sends
> > >> > downstream
> > >> > > > > >
> > >> > > > > > This secondary process is on client discretion , for Kafka
> > >> > probably a
> > >> > > > > http
> > >> > > > > > sink connector or Kafka consumer
> > >> > > > > >
> > >> > > > > > Thanks And Regards
> > >> > > > > > Rahul
> > >> > > > > >
> > >> > > > > >
> > >> > > > > > On Mon, Nov 10, 2025 at 6:45 PM Hao Li
> > <[email protected]
> > >> >
> > >> > > > wrote:
> > >> > > > > >
> > >> > > > > > > Hi Rahul,
> > >> > > > > > >
> > >> > > > > > > Thanks for the proposal. From some offline discussion, the
> > >> > endpoint
> > >> > > > you
> > >> > > > > > > have in mind to support is OpenAI batch API [1]. The doc
> > >> states
> > >> > > that
> > >> > > > > > "Each
> > >> > > > > > > batch completes within 24 hours (and often more quickly)".
> > >> With
> > >> > > this
> > >> > > > > > > context, I have some questions:
> > >> > > > > > >
> > >> > > > > > > 1. For design option 1, does the operator always wait for
> > >> batch
> > >> > > > > response
> > >> > > > > > > until processing next batch? This can take 24 hours which
> > >> isn't
> > >> > > > > feasible
> > >> > > > > > > for streaming job I think.
> > >> > > > > > >
> > >> > > > > > > 2. For design option 2, why it loses exact-once and have
> > >> higher
> > >> > > > latency
> > >> > > > > > > compared to 1?
> > >> > > > > > >
> > >> > > > > > > 3. Also for the public interface section, are the
> parameters
> > >> in
> > >> > > > > > > `ml_predict` config or in options when `create model`?
> > >> > > > > > >
> > >> > > > > > > Thanks,
> > >> > > > > > > Hao
> > >> > > > > > >
> > >> > > > > > >
> > >> > > > > > > [1]
> https://platform.openai.com/docs/guides/batch/batch-api
> > >> > > > > > >
> > >> > > > > > > On Mon, Nov 10, 2025 at 10:19 AM Asimansu Bera <
> > >> > > > > [email protected]>
> > >> > > > > > > wrote:
> > >> > > > > > >
> > >> > > > > > > > +1
> > >> > > > > > > >
> > >> > > > > > > > This proposal is needed for optimizing network calls and
> > >> > > processing
> > >> > > > > > asyc.
> > >> > > > > > > >
> > >> > > > > > > > Thanks
> > >> > > > > > > > Asimansu
> > >> > > > > > > >
> > >> > > > > > > > On Mon, Nov 10, 2025 at 4:04 AM Shengkai Fang <
> > >> > [email protected]
> > >> > > >
> > >> > > > > > wrote:
> > >> > > > > > > >
> > >> > > > > > > > > Hi, Rahul.
> > >> > > > > > > > >
> > >> > > > > > > > > +1 for this proposal. However, due to my current
> > workload,
> > >> > I'll
> > >> > > > > need
> > >> > > > > > > > until
> > >> > > > > > > > > the end of this week to review it thoroughly.
> > >> > > > > > > > >
> > >> > > > > > > > > Best,
> > >> > > > > > > > > Shengkai
> > >> > > > > > > > >
> > >> > > > > > > > > Rahul Bhattacharya <[email protected]>
> 于2025年11月9日周日
> > >> > > 13:08写道:
> > >> > > > > > > > >
> > >> > > > > > > > > > Hi ,
> > >> > > > > > > > > > i have created a draft FLIP for it
> > >> > > > > > > > > >
> > >> > > > > > >
> > >> > > >
> > >> https://docs.google.com/document/d/1U-eSuKwi5vIgAPt6ZBvb-RcbcRJiRy0e/
> > >> > > > > > > > > >
> > >> > > > > > > > > > Please let me know your thoughts
> > >> > > > > > > > > >
> > >> > > > > > > > > > regards
> > >> > > > > > > > > > Rahul
> > >> > > > > > > > > >
> > >> > > > > > > > > > On Sat, Nov 8, 2025 at 5:54 PM Rahul Bhattacharya <
> > >> > > > > > > [email protected]
> > >> > > > > > > > >
> > >> > > > > > > > > > wrote:
> > >> > > > > > > > > >
> > >> > > > > > > > > > > Hi,
> > >> > > > > > > > > > > I actually thought of reworking my previous
> > response.
> > >> I
> > >> > > want
> > >> > > > > the
> > >> > > > > > > > table
> > >> > > > > > > > > > api
> > >> > > > > > > > > > > to create jsonl files and call openai/claude batch
> > >> apis.
> > >> > > > > > > > > > > The implementation I am doing is going to batch
> the
> > >> > records
> > >> > > > > into
> > >> > > > > > a
> > >> > > > > > > > file
> > >> > > > > > > > > > > and call the api with the file and then
> continuously
> > >> poll
> > >> > > the
> > >> > > > > > > repose
> > >> > > > > > > > to
> > >> > > > > > > > > > see
> > >> > > > > > > > > > > the status of the batch and then use that to write
> > the
> > >> > > > response
> > >> > > > > > > > > records.
> > >> > > > > > > > > > > The ML_Predict in its current form is not usable
> as
> > >> > people
> > >> > > > are
> > >> > > > > > not
> > >> > > > > > > > > > looking
> > >> > > > > > > > > > > for synchronous response which is twice as
> expensive
> > >> as
> > >> > the
> > >> > > > > > > > > asynchronous
> > >> > > > > > > > > > > response.
> > >> > > > > > > > > > > let me know you thoughts and i can create a FLIP
> for
> > >> it
> > >> > > > > > > > > > > regards
> > >> > > > > > > > > > >
> > >> > > > > > > > > > > On Sat, Nov 8, 2025 at 3:14 PM Rahul Bhattacharya
> <
> > >> > > > > > > > [email protected]
> > >> > > > > > > > > >
> > >> > > > > > > > > > > wrote:
> > >> > > > > > > > > > >
> > >> > > > > > > > > > >> Hi Flink Community,
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> I'm interested in contributing an enhancement to
> > >> Apache
> > >> > > > > Flink's
> > >> > > > > > > > > > >> ML_PREDICT
> > >> > > > > > > > > > >> functionality for LLM interactions. I'd like to
> > gauge
> > >> > > > > community
> > >> > > > > > > > > interest
> > >> > > > > > > > > > >> and get
> > >> > > > > > > > > > >> early feedback before proceeding with detailed
> > design
> > >> > or a
> > >> > > > > FLIP.
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> ## Problem Statement
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> Currently, when using Flink SQL's ML_PREDICT with
> > LLM
> > >> > > > > endpoints,
> > >> > > > > > > > each
> > >> > > > > > > > > > >> record
> > >> > > > > > > > > > >> triggers an individual API call. For a stream
> > >> processing
> > >> > > > 1000
> > >> > > > > > > > > > >> records/second,
> > >> > > > > > > > > > >> this results in:
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> - **1000 separate API calls per second**
> > >> > > > > > > > > > >> - **High latency**: Each call has network
> overhead
> > +
> > >> API
> > >> > > > > > > processing
> > >> > > > > > > > > time
> > >> > > > > > > > > > >> - **High cost**: Most LLM providers charge per
> > token,
> > >> > and
> > >> > > > lack
> > >> > > > > > of
> > >> > > > > > > > > > >> batching means
> > >> > > > > > > > > > >>   no cost optimization
> > >> > > > > > > > > > >> - **Rate limiting issues**: Hitting provider rate
> > >> limits
> > >> > > > > quickly
> > >> > > > > > > > > > >> - **Poor throughput**: API calls are serialized
> per
> > >> > record
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> ### Current Behavior (Inefficient)
> > >> > > > > > > > > > >> ```sql
> > >> > > > > > > > > > >> -- This makes 10 individual API calls
> > >> > > > > > > > > > >> SELECT id, ML_PREDICT('llm_model', text) as
> result
> > >> > > > > > > > > > >> FROM (VALUES
> > >> > > > > > > > > > >>     (1, 'text1'), (2, 'text2'), ..., (10,
> 'text10')
> > >> > > > > > > > > > >> ) AS t(id, text);
> > >> > > > > > > > > > >> ```
> > >> > > > > > > > > > >> **Result**: 10 separate HTTP requests, 10x
> latency,
> > >> 10x
> > >> > > > > overhead
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> ## Proposed Solution: Application-Level Batching
> > with
> > >> > > Prompt
> > >> > > > > > > > > Engineering
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> Since most LLM APIs (OpenAI, Anthropic Claude,
> > etc.)
> > >> > don't
> > >> > > > > > provide
> > >> > > > > > > > > > native
> > >> > > > > > > > > > >> batch
> > >> > > > > > > > > > >> endpoints, we propose implementing batching at
> the
> > >> > > > application
> > >> > > > > > > level
> > >> > > > > > > > > by:
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> 1. **Accumulating N records** into a single batch
> > >> > > > > > > > > > >> 2. **Injecting records into a structured prompt**
> > >> that
> > >> > > > > instructs
> > >> > > > > > > the
> > >> > > > > > > > > LLM
> > >> > > > > > > > > > >> to
> > >> > > > > > > > > > >>    process multiple items
> > >> > > > > > > > > > >> 3. **Parsing structured responses** to extract
> > >> results
> > >> > for
> > >> > > > > each
> > >> > > > > > > > record
> > >> > > > > > > > > > >> 4. **Emitting individual results** back to the
> > Flink
> > >> > > > pipeline
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> ### How It Works
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> **Step 1: Batch Accumulation**
> > >> > > > > > > > > > >> Collect up to `batch.size` records or wait up to
> `
> > >> > > > > > > batch.timeout.ms`
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> **Step 2: Prompt Construction**
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> System: You are a sentiment analyzer. Process
> each
> > >> item
> > >> > > and
> > >> > > > > > > respond
> > >> > > > > > > > > with
> > >> > > > > > > > > > >> JSON.
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> User: Analyze the sentiment of these texts.
> Return
> > a
> > >> > JSON
> > >> > > > > array
> > >> > > > > > > with
> > >> > > > > > > > > one
> > >> > > > > > > > > > >> object per input containing "index" and
> "sentiment"
> > >> > > fields.
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> Input 1: "This product is amazing!" Input 2:
> > >> "Terrible
> > >> > > > > > experience,
> > >> > > > > > > > > very
> > >> > > > > > > > > > >> disappointed" Input 3: "It's okay, nothing
> special"
> > >> ...
> > >> > > > Input
> > >> > > > > > 10:
> > >> > > > > > > > > "Best
> > >> > > > > > > > > > >> purchase ever!"
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> Respond with: [{"index": 1, "sentiment": "..."},
> > >> > {"index":
> > >> > > > 2,
> > >> > > > > > > > > > >> "sentiment": "..."}, ...]
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> **Step 3: Response Parsing**
> > >> > > > > > > > > > >> ```json
> > >> > > > > > > > > > >> [
> > >> > > > > > > > > > >>   {"index": 1, "sentiment": "positive"},
> > >> > > > > > > > > > >>   {"index": 2, "sentiment": "negative"},
> > >> > > > > > > > > > >>   {"index": 3, "sentiment": "neutral"},
> > >> > > > > > > > > > >>   ...
> > >> > > > > > > > > > >>   {"index": 10, "sentiment": "positive"}
> > >> > > > > > > > > > >> ]
> > >> > > > > > > > > > >> ```
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> **Step 4: Result Distribution**
> > >> > > > > > > > > > >> Parse JSON and emit individual results back to
> > >> > > corresponding
> > >> > > > > > > records
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> ### Model Configuration (Defaults)
> > >> > > > > > > > > > >> ```sql
> > >> > > > > > > > > > >> CREATE MODEL llm_sentiment WITH (
> > >> > > > > > > > > > >>     'provider' = 'openai',
> > >> > > > > > > > > > >>     'model' = 'gpt-4',
> > >> > > > > > > > > > >>     'api_key' = '${API_KEY}',
> > >> > > > > > > > > > >>     'batch.size' = '20',
> > >> > > > > > > > > > >>     'batch.timeout.ms' = '1000',
> > >> > > > > > > > > > >>     'system.prompt' = 'You are a sentiment
> > analyzer.
> > >> > > Always
> > >> > > > > > > respond
> > >> > > > > > > > > with
> > >> > > > > > > > > > >> valid JSON.',
> > >> > > > > > > > > > >>     'batch.prompt.template' = 'Analyze sentiment
> > for
> > >> > these
> > >> > > > > > texts.
> > >> > > > > > > > > Return
> > >> > > > > > > > > > >> JSON array: [{"index": <n>, "sentiment":
> > >> > > > > > > > > > "<positive|negative|neutral>"}]',
> > >> > > > > > > > > > >>     'response.format' = 'json',
> > >> > > > > > > > > > >>     'response.path' = '$[*]',  -- JSONPath to
> > extract
> > >> > > array
> > >> > > > of
> > >> > > > > > > > results
> > >> > > > > > > > > > >>     'response.index.field' = 'index',  -- Field
> > >> > containing
> > >> > > > > > record
> > >> > > > > > > > > index
> > >> > > > > > > > > > >>     'response.value.field' = 'sentiment'  --
> Field
> > >> > > > containing
> > >> > > > > > > result
> > >> > > > > > > > > > >> );
> > >> > > > > > > > > > >> ```
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> ### Query Usage (Use Defaults)
> > >> > > > > > > > > > >> ```sql
> > >> > > > > > > > > > >> -- Uses batch_size=20 from model definition
> > >> > > > > > > > > > >> SELECT id, text, ML_PREDICT('llm_sentiment',
> text)
> > as
> > >> > > > > sentiment
> > >> > > > > > > > > > >> FROM customer_reviews;
> > >> > > > > > > > > > >> ```
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> ### Query Usage (Override for Custom Analysis)
> > >> > > > > > > > > > >> ```sql
> > >> > > > > > > > > > >> -- Override prompt and batch size for different
> use
> > >> case
> > >> > > > > > > > > > >> SELECT id, text, ML_PREDICT('llm_sentiment',
> text,
> > >> > > > > > > > > > >>     MAP['batch.size', '50',
> > >> > > > > > > > > > >>         'batch.prompt.template', 'Extract key
> > >> entities.
> > >> > > > Return
> > >> > > > > > > JSON:
> > >> > > > > > > > > > >> [{"index": <n>, "entities": [...]}]',
> > >> > > > > > > > > > >>         'response.value.field', 'entities']) as
> > >> entities
> > >> > > > > > > > > > >> FROM documents;
> > >> > > > > > > > > > >> ```
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> ## Performance and Cost Impact
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> ### Example: Processing 10,000 customer reviews
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> **Current (unbatched)**:
> > >> > > > > > > > > > >> - 10,000 API calls
> > >> > > > > > > > > > >> - ~10,000 x 200ms latency = 2,000 seconds total
> > >> > processing
> > >> > > > > time
> > >> > > > > > > > > > >> (serialized)
> > >> > > > > > > > > > >> - ~10,000 x $0.002 = $20 in API costs
> > >> > > > > > > > > > >> - High rate limit pressure
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> **With batching (batch_size=20)**:
> > >> > > > > > > > > > >> - 500 API calls (10,000 / 20)
> > >> > > > > > > > > > >> - ~500 x 300ms latency = 150 seconds total
> > processing
> > >> > time
> > >> > > > > > > > > > >> - ~500 x $0.006 = $3 in API costs (slightly
> higher
> > >> per
> > >> > > call
> > >> > > > > due
> > >> > > > > > to
> > >> > > > > > > > > > larger
> > >> > > > > > > > > > >> prompts,
> > >> > > > > > > > > > >>   but still 85% cheaper overall)
> > >> > > > > > > > > > >> - **20x fewer API calls**
> > >> > > > > > > > > > >> - **13x faster processing**
> > >> > > > > > > > > > >> - **85% cost reduction**
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> ## Proposed Implementation
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> ### Configuration Parameters
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> **Model-level (defaults)**:
> > >> > > > > > > > > > >> - `batch.size`: Maximum records per batch
> > (default: 1
> > >> > for
> > >> > > > > > backward
> > >> > > > > > > > > > >> compatibility)
> > >> > > > > > > > > > >> - `batch.timeout.ms`: Max time to wait before
> > >> flushing
> > >> > > > > > incomplete
> > >> > > > > > > > > batch
> > >> > > > > > > > > > >> (default: 1000ms)
> > >> > > > > > > > > > >> - `system.prompt`: System-level instruction for
> the
> > >> LLM
> > >> > > > > > > > > > >> - `batch.prompt.template`: Template explaining
> how
> > to
> > >> > > > process
> > >> > > > > > > > batched
> > >> > > > > > > > > > >> inputs
> > >> > > > > > > > > > >> - `response.format`: Expected response format
> > >> ('json',
> > >> > > > 'xml',
> > >> > > > > > > > > > 'delimited')
> > >> > > > > > > > > > >> - `response.path`: JSONPath or XPath to extract
> > >> results
> > >> > > > array
> > >> > > > > > > > > > >> - `response.index.field`: Field name containing
> the
> > >> > record
> > >> > > > > index
> > >> > > > > > > > > > >> - `response.value.field`: Field name containing
> the
> > >> > actual
> > >> > > > > > result
> > >> > > > > > > > > > >> - `max.retries`: Retry attempts for failed
> batches
> > >> > > (default:
> > >> > > > > 3)
> > >> > > > > > > > > > >> - `request.timeout.ms`: Timeout for API calls
> > >> (default:
> > >> > > > > > 30000ms)
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> **Query-level (overrides)**:
> > >> > > > > > > > > > >> - Any of the above can be overridden via MAP
> > >> parameter
> > >> > in
> > >> > > > > > > ML_PREDICT
> > >> > > > > > > > > > >> - Per-query customization for different analysis
> > >> tasks
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> ### Key Features
> > >> > > > > > > > > > >> 1. **Prompt injection**: Automatically construct
> > >> batch
> > >> > > > prompts
> > >> > > > > > > with
> > >> > > > > > > > > > >> indexed inputs
> > >> > > > > > > > > > >> 2. **Structured response parsing**: Support JSON,
> > >> XML,
> > >> > or
> > >> > > > > > > delimited
> > >> > > > > > > > > > >> formats
> > >> > > > > > > > > > >> 3. **Index tracking**: Maintain record-to-result
> > >> mapping
> > >> > > > > through
> > >> > > > > > > the
> > >> > > > > > > > > > batch
> > >> > > > > > > > > > >> 4. **Error handling**: Handle parsing failures,
> > >> missing
> > >> > > > > indices,
> > >> > > > > > > > > > >> malformed responses
> > >> > > > > > > > > > >> 5. **Fallback to individual calls**: If batch
> > fails,
> > >> > > > > optionally
> > >> > > > > > > > retry
> > >> > > > > > > > > > >> records individually
> > >> > > > > > > > > > >> 6. **Provider-agnostic**: Works with any LLM API
> > >> > (OpenAI,
> > >> > > > > > > Anthropic,
> > >> > > > > > > > > > >> Azure, self-hosted)
> > >> > > > > > > > > > >> 7. **Async processing**: Non-blocking batch
> > requests
> > >> > > > > > > > > > >> 8. **Back-pressure**: Proper flow control when
> API
> > is
> > >> > slow
> > >> > > > > > > > > > >> 9. **Backward compatible**: batch.size=1
> maintains
> > >> > current
> > >> > > > > > > behavior
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> ### Technical Approach
> > >> > > > > > > > > > >> - Extend existing ML_PREDICT infrastructure
> > >> > > > > > > > > > >> - Add batching buffer in the ML_PREDICT operator
> > >> > > > > > > > > > >> - Implement prompt template engine for batch
> > >> > construction:
> > >> > > > > > > > > > >>   - Inject record index + content into template
> > >> > > > > > > > > > >>   - Support various templating formats (JSON,
> XML,
> > >> plain
> > >> > > > text)
> > >> > > > > > > > > > >> - Implement response parser:
> > >> > > > > > > > > > >>   - Extract structured data (JSONPath, XPath,
> > regex)
> > >> > > > > > > > > > >>   - Map results back to original records by index
> > >> > > > > > > > > > >>   - Handle missing or malformed responses
> > >> > > > > > > > > > >> - Maintain record ordering and error attribution
> > >> > > > > > > > > > >> - Support parameter override mechanism in
> > ML_PREDICT
> > >> > > > function
> > >> > > > > > > > > signature
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> ### Response Parsing Strategy
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> The implementation must handle:
> > >> > > > > > > > > > >> 1. **Successful batch response**: Parse and
> > >> distribute
> > >> > > > results
> > >> > > > > > > > > > >> 2. **Partial failure**: Some records missing from
> > >> > > response →
> > >> > > > > > emit
> > >> > > > > > > > > errors
> > >> > > > > > > > > > >> for those
> > >> > > > > > > > > > >> 3. **Complete parse failure**: Optionally
> fallback
> > to
> > >> > > > > individual
> > >> > > > > > > > calls
> > >> > > > > > > > > > >> 4. **Index mismatch**: Response indices don't
> match
> > >> > input
> > >> > > →
> > >> > > > > log
> > >> > > > > > > > > warning
> > >> > > > > > > > > > >> and best-effort match
> > >> > > > > > > > > > >> 5. **Malformed JSON**: Retry with error handling
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> Example error handling:
> > >> > > > > > > > > > >> ```sql
> > >> > > > > > > > > > >> -- Records that fail parsing get null results
> with
> > >> error
> > >> > > > > > metadata
> > >> > > > > > > > > > >> SELECT
> > >> > > > > > > > > > >>     id,
> > >> > > > > > > > > > >>     text,
> > >> > > > > > > > > > >>     result.value as sentiment,
> > >> > > > > > > > > > >>     result.error as error_msg
> > >> > > > > > > > > > >> FROM source_table,
> > >> > > > > > > > > > >> LATERAL TABLE(ML_PREDICT('llm_sentiment', text));
> > >> > > > > > > > > > >> ```
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> ## Limitations and Considerations
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> 1. **LLM instruction following**: Depends on
> > model's
> > >> > > ability
> > >> > > > > to
> > >> > > > > > > > follow
> > >> > > > > > > > > > >> structured
> > >> > > > > > > > > > >>    output instructions. GPT-4 and Claude are
> > >> reliable;
> > >> > > older
> > >> > > > > > > models
> > >> > > > > > > > > may
> > >> > > > > > > > > > >> struggle.
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> 2. **Prompt size limits**: Batching too many
> > records
> > >> may
> > >> > > > > exceed
> > >> > > > > > > > > context
> > >> > > > > > > > > > >> windows
> > >> > > > > > > > > > >>    - GPT-4: ~8K tokens input limit
> > >> > > > > > > > > > >>    - Claude: ~200K tokens but practical batches
> > >> smaller
> > >> > > > > > > > > > >>    - Need configurable max batch size based on
> > >> average
> > >> > > > record
> > >> > > > > > > length
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> 3. **Token cost trade-off**: Larger batches mean:
> > >> > > > > > > > > > >>    - Fewer API calls (good)
> > >> > > > > > > > > > >>    - But larger prompts with
> > instructions/formatting
> > >> > > (slight
> > >> > > > > > > > overhead)
> > >> > > > > > > > > > >>    - Net savings still 80-90% in practice
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> 4. **Parsing reliability**: Small risk of
> malformed
> > >> > > > responses
> > >> > > > > > > > > > >>    - Mitigated by: clear instructions, JSON mode
> > >> > (GPT-4),
> > >> > > > > retry
> > >> > > > > > > > logic
> > >> > > > > > > > > > >>    - Fallback to individual calls if batch
> parsing
> > >> fails
> > >> > > > > > > repeatedly
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> 5. **Latency characteristics**:
> > >> > > > > > > > > > >>    - Individual records see slightly higher
> latency
> > >> > > (waiting
> > >> > > > > for
> > >> > > > > > > > > batch)
> > >> > > > > > > > > > >>    - Overall throughput dramatically improved
> > >> > > > > > > > > > >>    - Use `batch.timeout.ms` to balance latency
> vs
> > >> > > > throughput
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> ## Future Extensions
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> This batching architecture would support:
> > >> > > > > > > > > > >> 1. **Stateful chat sessions**: Batch multiple
> turns
> > >> of a
> > >> > > > > > > > conversation
> > >> > > > > > > > > > >> with
> > >> > > > > > > > > > >>    maintained history per session key
> > >> > > > > > > > > > >> 2. **Embedding generation**: Some providers
> > (OpenAI)
> > >> do
> > >> > > have
> > >> > > > > > batch
> > >> > > > > > > > > > >> embedding APIs
> > >> > > > > > > > > > >> 3. **Multi-modal batching**: Batch image + text
> > >> > processing
> > >> > > > > with
> > >> > > > > > > > > > >> structured outputs
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> ## Questions for the Community
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> 1. **Architecture**: Should this extend
> ML_PREDICT
> > or
> > >> > be a
> > >> > > > new
> > >> > > > > > > > > function?
> > >> > > > > > > > > > >>    (I propose extending ML_PREDICT for backward
> > >> > > > compatibility)
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> 2. **FLIP Required?**: Does this enhancement
> > warrant
> > >> a
> > >> > > FLIP?
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> 3. **Existing Work**: Is anyone working on
> batching
> > >> for
> > >> > > > > > ML_PREDICT
> > >> > > > > > > > or
> > >> > > > > > > > > > >> similar
> > >> > > > > > > > > > >>    functionality?
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> 4. **Prompt Template Engine**: Should we:
> > >> > > > > > > > > > >>    - Build a custom template engine?
> > >> > > > > > > > > > >>    - Use existing library (e.g., StringTemplate,
> > >> > > Mustache)?
> > >> > > > > > > > > > >>    - Keep it simple with String.format initially?
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> 5. **Response Parsing**: Preferred approach:
> > >> > > > > > > > > > >>    - JSONPath library (flexible but adds
> > dependency)
> > >> > > > > > > > > > >>    - Simple JSON parsing with field names
> > >> > > > > > > > > > >>    - Pluggable parser interface for
> extensibility?
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> 6. **Error Handling**: If parsing fails for
> entire
> > >> > batch:
> > >> > > > > > > > > > >>    - Fail all records in batch?
> > >> > > > > > > > > > >>    - Retry batch once more?
> > >> > > > > > > > > > >>    - Fallback to individual calls (with circuit
> > >> > breaker)?
> > >> > > > > > > > > > >>    - Make strategy configurable?
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> 7. **Batch Assembly**: Should batching happen:
> > >> > > > > > > > > > >>    - Per parallel instance (each task maintains
> its
> > >> own
> > >> > > > > batch)?
> > >> > > > > > > > > > >>    - Globally coordinated (shuffle to batch
> > >> > coordinator)?
> > >> > > > > > > > > > >>    - I propose per-instance for simplicity and
> > lower
> > >> > > latency
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> 8. **Compatibility**: Default batch.size=1 to
> > >> maintain
> > >> > > > current
> > >> > > > > > > > > behavior,
> > >> > > > > > > > > > >> users
> > >> > > > > > > > > > >>    opt-in to batching?
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> ## Why This Matters
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> LLM inference is becoming a critical part of
> > >> real-time
> > >> > > data
> > >> > > > > > > > pipelines.
> > >> > > > > > > > > > >> Without
> > >> > > > > > > > > > >> batching:
> > >> > > > > > > > > > >> - Users face prohibitive costs for
> high-throughput
> > >> > > workloads
> > >> > > > > > > > > > >> - Rate limits block production deployments
> > >> > > > > > > > > > >> - Latency makes real-time processing impractical
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> While LLM providers don't offer native batch
> APIs,
> > >> > > > > > > application-level
> > >> > > > > > > > > > >> batching
> > >> > > > > > > > > > >> through prompt engineering is a proven pattern
> used
> > >> in
> > >> > > > > > production
> > >> > > > > > > by
> > >> > > > > > > > > > many
> > >> > > > > > > > > > >> organizations. This proposal brings that
> capability
> > >> > > natively
> > >> > > > > > into
> > >> > > > > > > > > Flink.
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> The hybrid configuration approach provides:
> > >> > > > > > > > > > >> - **Sensible defaults** for common use cases
> > >> (sentiment
> > >> > > > > > analysis,
> > >> > > > > > > > > > >> classification)
> > >> > > > > > > > > > >> - **Flexibility** to customize prompts and
> parsing
> > >> for
> > >> > > > > specific
> > >> > > > > > > > needs
> > >> > > > > > > > > > >> - **Easy migration** for existing queries
> > >> (batch.size=1
> > >> > > > > default)
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> ## Next Steps
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> If there's interest from the community, I'm happy
> > to:
> > >> > > > > > > > > > >> 1. Prepare a detailed design document with prompt
> > >> > > templates
> > >> > > > > and
> > >> > > > > > > > > parsing
> > >> > > > > > > > > > >> examples
> > >> > > > > > > > > > >> 2. Create a JIRA ticket
> > >> > > > > > > > > > >> 3. Develop a prototype demonstrating the batching
> > and
> > >> > > > parsing
> > >> > > > > > > logic
> > >> > > > > > > > > > >> 4. Write a FLIP if required
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >> Looking forward to your feedback and guidance on
> > how
> > >> > best
> > >> > > to
> > >> > > > > > > > > proceed!--
> > >> > > > > > > > > > >> Thanks And Regards
> > >> > > > > > > > > > >> Rahul
> > >> > > > > > > > > > >>
> > >> > > > > > > > > > >
> > >> > > > > > > > > > >
> > >> > > > > > > > > > > --
> > >> > > > > > > > > > > Thanks And Regards
> > >> > > > > > > > > > > Rahul
> > >> > > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > > >
> > >> > > > > > > > > > --
> > >> > > > > > > > > > Thanks And Regards
> > >> > > > > > > > > > Rahul
> > >> > > > > > > > > >
> > >> > > > > > > > >
> > >> > > > > > > >
> > >> > > > > > >
> > >> > > > > >
> > >> > > > >
> > >> > > >
> > >> > > >
> > >> > > > --
> > >> > > > Thanks And Regards
> > >> > > > Rahul
> > >> > > >
> > >> > >
> > >> >
> > >> >
> > >> > --
> > >> > Thanks And Regards
> > >> > Rahul
> > >> >
> > >>
> > >
> > >
> > > --
> > > Thanks And Regards
> > > Rahul
> > >
> >
> >
> > --
> > Thanks And Regards
> > Rahul
> >
>


-- 
Thanks And Regards
Rahul

Reply via email to