Hi Hao,
the custom_id field is not mandatory.
it's a field to match the answer to the question, but you can't use very
long strings in custom_id.
my suggestion will be to not pass any custom_id, but use the metadata field
instead which does not have any restrictions.

Sample jsonl request file

{
  "method": "POST",
  "url": "/v1/chat/completions",
  "body": {
    "model": "gpt-4.1-mini",
    "metadata": {
      "topic": "questions-topic",
      "partition": 5,
      "offset": 10223,
      "row_id": 789
    },
    "messages": [
      { "role": "user", "content": "what is 2+5" }
    ]
  }
}

Sample response for above request

{
  "response": {
    "metadata": {
      "topic": "questions-topic",
      "partition": 5,
      "offset": 10223,
      "row_id": 789
    },
    "choices": [
      { "message": { "role": "assistant", "content": "2+5 = 7" } }
    ]
  }
}


For the second question, the status of the batch is for the entire batch of
records. Once the batch shows completed users can download the response
file which will have many lines/records like the above json


regards
Rahul

On Thu, Nov 20, 2025 at 4:49 PM Hao Li via dev <[email protected]> wrote:

> Hi Rahul,
>
> FYI, the FLIP process involves a voting process as well [1] before
> implementation. I have two more question regarding sending batch id
> downstream:
>
> 1. For openai batch api specifically, does it needs a `custom_id` for each
> request? Where is this custom_id from?
> 2. If you batch several requests, they will get the same batch_id as the
> response. Does downstream need to poll them for each request even though
> they all have the same batch_id?
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-Accepted
>
> Thanks,
> Hao
>
> On Thu, Nov 20, 2025 at 2:09 PM Rahul Bhattacharya <[email protected]>
> wrote:
>
> > i created a Jira for this
> > https://issues.apache.org/jira/browse/FLINK-38708
> >
> > On Fri, Nov 14, 2025 at 2:04 PM Hao Li <[email protected]> wrote:
> >
> > > Hi Shengkai,
> > >
> > > > process them together, and complete the future objects sequentially
> as
> > > they finish
> > >
> > > The batch api Rahul proposed is [1] which sends batch request from
> file,
> > > return an id and we need to poll the id for results. The important part
> > is
> > > it can take 24 hours to finish. So Rahua is proposing to just send the
> > > result id to downstream.
> > >
> > > [1] https://platform.openai.com/docs/guides/batch
> > >
> > > On Fri, Nov 14, 2025 at 8:35 AM Rahul Bhattacharya <
> [email protected]>
> > > wrote:
> > >
> > > > Hi Shengkai,
> > > > so i am understanding we will go with option 1 and send the batchid
> > > > downstream to do whatever the user needs to do with the batchids?
> > > >
> > > > i also in the opinion that option 1 is a better option for now than
> > > option
> > > > 2.
> > > > Based on a parameter setting we should batch n records , create a
> > > > jsonl file and post it to the LLM batch api.
> > > > The LLM will immediately return the batch id which we can just send
> it
> > > > downstream. this implementation will be stateless and really simple
> to
> > > > implement
> > > >
> > > >
> > > > regards
> > > >
> > > >
> > > >
> > > > On Fri, Nov 14, 2025 at 4:18 AM Shengkai Fang <[email protected]>
> > wrote:
> > > >
> > > > > Hi. Rahul
> > > > >
> > > > > First, I believe we don't need to modify the framework. Instead, we
> > can
> > > > > have the async predict function collect records in batches, process
> > > them
> > > > > together, and complete the future objects sequentially as they
> > finish.
> > > > This
> > > > > approach allows us to move forward quickly.
> > > > >
> > > > > Second, I have concerns about introducing state or external
> storage.
> > On
> > > > one
> > > > > hand, the current design is stateless, and transitioning to a
> > stateful
> > > > > architecture would require significant refactoring. On the other
> > hand,
> > > I
> > > > > don't see clear advantages to storing batch IDs in state, since we
> > > cannot
> > > > > guarantee that elements will arrive in the same order after
> restoring
> > > > from
> > > > > a checkpoint. For example, if the ML predictor receives elements
> e1,
> > > e2,
> > > > e3
> > > > > in the first run, it might receive e2, e3, e1 after recovery. With
> a
> > > > batch
> > > > > size of 2, we wouldn't be able to reuse the in-flight requests
> > > > effectively.
> > > > >
> > > > > Finally, I suggest we leverage the IOManager to handle JSON file
> > > > > management.
> > > > >
> > > > > Best,
> > > > > Shengkai
> > > > >
> > > > >
> > > > > Rahul Bhattacharya <[email protected]> 于2025年11月11日周二 09:03写道:
> > > > >
> > > > > > Hi Hao
> > > > > > For option 2 the guarantees like exactly once and other things
> are
> > > not
> > > > > > guaranteed as it’s not in the same flink process
> > > > > >
> > > > > > The flink process finishes after the submission and getting
> > batchids
> > > > > which
> > > > > > it sends downstream
> > > > > >
> > > > > > There has to be another process(doesn’t have to be flink)  which
> > > takes
> > > > > > these batchids and polls the OpenAI endpoint for status
> completed.
> > > > > >
> > > > > > Once it gets completed it downloads the results and sends
> > downstream
> > > > > >
> > > > > > This secondary process is on client discretion , for Kafka
> > probably a
> > > > > http
> > > > > > sink connector or Kafka consumer
> > > > > >
> > > > > > Thanks And Regards
> > > > > > Rahul
> > > > > >
> > > > > >
> > > > > > On Mon, Nov 10, 2025 at 6:45 PM Hao Li <[email protected]
> >
> > > > wrote:
> > > > > >
> > > > > > > Hi Rahul,
> > > > > > >
> > > > > > > Thanks for the proposal. From some offline discussion, the
> > endpoint
> > > > you
> > > > > > > have in mind to support is OpenAI batch API [1]. The doc states
> > > that
> > > > > > "Each
> > > > > > > batch completes within 24 hours (and often more quickly)". With
> > > this
> > > > > > > context, I have some questions:
> > > > > > >
> > > > > > > 1. For design option 1, does the operator always wait for batch
> > > > > response
> > > > > > > until processing next batch? This can take 24 hours which isn't
> > > > > feasible
> > > > > > > for streaming job I think.
> > > > > > >
> > > > > > > 2. For design option 2, why it loses exact-once and have higher
> > > > latency
> > > > > > > compared to 1?
> > > > > > >
> > > > > > > 3. Also for the public interface section, are the parameters in
> > > > > > > `ml_predict` config or in options when `create model`?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Hao
> > > > > > >
> > > > > > >
> > > > > > > [1] https://platform.openai.com/docs/guides/batch/batch-api
> > > > > > >
> > > > > > > On Mon, Nov 10, 2025 at 10:19 AM Asimansu Bera <
> > > > > [email protected]>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > +1
> > > > > > > >
> > > > > > > > This proposal is needed for optimizing network calls and
> > > processing
> > > > > > asyc.
> > > > > > > >
> > > > > > > > Thanks
> > > > > > > > Asimansu
> > > > > > > >
> > > > > > > > On Mon, Nov 10, 2025 at 4:04 AM Shengkai Fang <
> > [email protected]
> > > >
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi, Rahul.
> > > > > > > > >
> > > > > > > > > +1 for this proposal. However, due to my current workload,
> > I'll
> > > > > need
> > > > > > > > until
> > > > > > > > > the end of this week to review it thoroughly.
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Shengkai
> > > > > > > > >
> > > > > > > > > Rahul Bhattacharya <[email protected]> 于2025年11月9日周日
> > > 13:08写道:
> > > > > > > > >
> > > > > > > > > > Hi ,
> > > > > > > > > > i have created a draft FLIP for it
> > > > > > > > > >
> > > > > > >
> > > >
> https://docs.google.com/document/d/1U-eSuKwi5vIgAPt6ZBvb-RcbcRJiRy0e/
> > > > > > > > > >
> > > > > > > > > > Please let me know your thoughts
> > > > > > > > > >
> > > > > > > > > > regards
> > > > > > > > > > Rahul
> > > > > > > > > >
> > > > > > > > > > On Sat, Nov 8, 2025 at 5:54 PM Rahul Bhattacharya <
> > > > > > > [email protected]
> > > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi,
> > > > > > > > > > > I actually thought of reworking my previous response. I
> > > want
> > > > > the
> > > > > > > > table
> > > > > > > > > > api
> > > > > > > > > > > to create jsonl files and call openai/claude batch
> apis.
> > > > > > > > > > > The implementation I am doing is going to batch the
> > records
> > > > > into
> > > > > > a
> > > > > > > > file
> > > > > > > > > > > and call the api with the file and then continuously
> poll
> > > the
> > > > > > > repose
> > > > > > > > to
> > > > > > > > > > see
> > > > > > > > > > > the status of the batch and then use that to write the
> > > > response
> > > > > > > > > records.
> > > > > > > > > > > The ML_Predict in its current form is not usable as
> > people
> > > > are
> > > > > > not
> > > > > > > > > > looking
> > > > > > > > > > > for synchronous response which is twice as expensive as
> > the
> > > > > > > > > asynchronous
> > > > > > > > > > > response.
> > > > > > > > > > > let me know you thoughts and i can create a FLIP for it
> > > > > > > > > > > regards
> > > > > > > > > > >
> > > > > > > > > > > On Sat, Nov 8, 2025 at 3:14 PM Rahul Bhattacharya <
> > > > > > > > [email protected]
> > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > >> Hi Flink Community,
> > > > > > > > > > >>
> > > > > > > > > > >> I'm interested in contributing an enhancement to
> Apache
> > > > > Flink's
> > > > > > > > > > >> ML_PREDICT
> > > > > > > > > > >> functionality for LLM interactions. I'd like to gauge
> > > > > community
> > > > > > > > > interest
> > > > > > > > > > >> and get
> > > > > > > > > > >> early feedback before proceeding with detailed design
> > or a
> > > > > FLIP.
> > > > > > > > > > >>
> > > > > > > > > > >> ## Problem Statement
> > > > > > > > > > >>
> > > > > > > > > > >> Currently, when using Flink SQL's ML_PREDICT with LLM
> > > > > endpoints,
> > > > > > > > each
> > > > > > > > > > >> record
> > > > > > > > > > >> triggers an individual API call. For a stream
> processing
> > > > 1000
> > > > > > > > > > >> records/second,
> > > > > > > > > > >> this results in:
> > > > > > > > > > >>
> > > > > > > > > > >> - **1000 separate API calls per second**
> > > > > > > > > > >> - **High latency**: Each call has network overhead +
> API
> > > > > > > processing
> > > > > > > > > time
> > > > > > > > > > >> - **High cost**: Most LLM providers charge per token,
> > and
> > > > lack
> > > > > > of
> > > > > > > > > > >> batching means
> > > > > > > > > > >>   no cost optimization
> > > > > > > > > > >> - **Rate limiting issues**: Hitting provider rate
> limits
> > > > > quickly
> > > > > > > > > > >> - **Poor throughput**: API calls are serialized per
> > record
> > > > > > > > > > >>
> > > > > > > > > > >> ### Current Behavior (Inefficient)
> > > > > > > > > > >> ```sql
> > > > > > > > > > >> -- This makes 10 individual API calls
> > > > > > > > > > >> SELECT id, ML_PREDICT('llm_model', text) as result
> > > > > > > > > > >> FROM (VALUES
> > > > > > > > > > >>     (1, 'text1'), (2, 'text2'), ..., (10, 'text10')
> > > > > > > > > > >> ) AS t(id, text);
> > > > > > > > > > >> ```
> > > > > > > > > > >> **Result**: 10 separate HTTP requests, 10x latency,
> 10x
> > > > > overhead
> > > > > > > > > > >>
> > > > > > > > > > >> ## Proposed Solution: Application-Level Batching with
> > > Prompt
> > > > > > > > > Engineering
> > > > > > > > > > >>
> > > > > > > > > > >> Since most LLM APIs (OpenAI, Anthropic Claude, etc.)
> > don't
> > > > > > provide
> > > > > > > > > > native
> > > > > > > > > > >> batch
> > > > > > > > > > >> endpoints, we propose implementing batching at the
> > > > application
> > > > > > > level
> > > > > > > > > by:
> > > > > > > > > > >>
> > > > > > > > > > >> 1. **Accumulating N records** into a single batch
> > > > > > > > > > >> 2. **Injecting records into a structured prompt** that
> > > > > instructs
> > > > > > > the
> > > > > > > > > LLM
> > > > > > > > > > >> to
> > > > > > > > > > >>    process multiple items
> > > > > > > > > > >> 3. **Parsing structured responses** to extract results
> > for
> > > > > each
> > > > > > > > record
> > > > > > > > > > >> 4. **Emitting individual results** back to the Flink
> > > > pipeline
> > > > > > > > > > >>
> > > > > > > > > > >> ### How It Works
> > > > > > > > > > >>
> > > > > > > > > > >> **Step 1: Batch Accumulation**
> > > > > > > > > > >> Collect up to `batch.size` records or wait up to `
> > > > > > > batch.timeout.ms`
> > > > > > > > > > >>
> > > > > > > > > > >> **Step 2: Prompt Construction**
> > > > > > > > > > >>
> > > > > > > > > > >> System: You are a sentiment analyzer. Process each
> item
> > > and
> > > > > > > respond
> > > > > > > > > with
> > > > > > > > > > >> JSON.
> > > > > > > > > > >>
> > > > > > > > > > >> User: Analyze the sentiment of these texts. Return a
> > JSON
> > > > > array
> > > > > > > with
> > > > > > > > > one
> > > > > > > > > > >> object per input containing "index" and "sentiment"
> > > fields.
> > > > > > > > > > >>
> > > > > > > > > > >> Input 1: "This product is amazing!" Input 2: "Terrible
> > > > > > experience,
> > > > > > > > > very
> > > > > > > > > > >> disappointed" Input 3: "It's okay, nothing special"
> ...
> > > > Input
> > > > > > 10:
> > > > > > > > > "Best
> > > > > > > > > > >> purchase ever!"
> > > > > > > > > > >>
> > > > > > > > > > >> Respond with: [{"index": 1, "sentiment": "..."},
> > {"index":
> > > > 2,
> > > > > > > > > > >> "sentiment": "..."}, ...]
> > > > > > > > > > >>
> > > > > > > > > > >> **Step 3: Response Parsing**
> > > > > > > > > > >> ```json
> > > > > > > > > > >> [
> > > > > > > > > > >>   {"index": 1, "sentiment": "positive"},
> > > > > > > > > > >>   {"index": 2, "sentiment": "negative"},
> > > > > > > > > > >>   {"index": 3, "sentiment": "neutral"},
> > > > > > > > > > >>   ...
> > > > > > > > > > >>   {"index": 10, "sentiment": "positive"}
> > > > > > > > > > >> ]
> > > > > > > > > > >> ```
> > > > > > > > > > >>
> > > > > > > > > > >> **Step 4: Result Distribution**
> > > > > > > > > > >> Parse JSON and emit individual results back to
> > > corresponding
> > > > > > > records
> > > > > > > > > > >>
> > > > > > > > > > >> ### Model Configuration (Defaults)
> > > > > > > > > > >> ```sql
> > > > > > > > > > >> CREATE MODEL llm_sentiment WITH (
> > > > > > > > > > >>     'provider' = 'openai',
> > > > > > > > > > >>     'model' = 'gpt-4',
> > > > > > > > > > >>     'api_key' = '${API_KEY}',
> > > > > > > > > > >>     'batch.size' = '20',
> > > > > > > > > > >>     'batch.timeout.ms' = '1000',
> > > > > > > > > > >>     'system.prompt' = 'You are a sentiment analyzer.
> > > Always
> > > > > > > respond
> > > > > > > > > with
> > > > > > > > > > >> valid JSON.',
> > > > > > > > > > >>     'batch.prompt.template' = 'Analyze sentiment for
> > these
> > > > > > texts.
> > > > > > > > > Return
> > > > > > > > > > >> JSON array: [{"index": <n>, "sentiment":
> > > > > > > > > > "<positive|negative|neutral>"}]',
> > > > > > > > > > >>     'response.format' = 'json',
> > > > > > > > > > >>     'response.path' = '$[*]',  -- JSONPath to extract
> > > array
> > > > of
> > > > > > > > results
> > > > > > > > > > >>     'response.index.field' = 'index',  -- Field
> > containing
> > > > > > record
> > > > > > > > > index
> > > > > > > > > > >>     'response.value.field' = 'sentiment'  -- Field
> > > > containing
> > > > > > > result
> > > > > > > > > > >> );
> > > > > > > > > > >> ```
> > > > > > > > > > >>
> > > > > > > > > > >> ### Query Usage (Use Defaults)
> > > > > > > > > > >> ```sql
> > > > > > > > > > >> -- Uses batch_size=20 from model definition
> > > > > > > > > > >> SELECT id, text, ML_PREDICT('llm_sentiment', text) as
> > > > > sentiment
> > > > > > > > > > >> FROM customer_reviews;
> > > > > > > > > > >> ```
> > > > > > > > > > >>
> > > > > > > > > > >> ### Query Usage (Override for Custom Analysis)
> > > > > > > > > > >> ```sql
> > > > > > > > > > >> -- Override prompt and batch size for different use
> case
> > > > > > > > > > >> SELECT id, text, ML_PREDICT('llm_sentiment', text,
> > > > > > > > > > >>     MAP['batch.size', '50',
> > > > > > > > > > >>         'batch.prompt.template', 'Extract key
> entities.
> > > > Return
> > > > > > > JSON:
> > > > > > > > > > >> [{"index": <n>, "entities": [...]}]',
> > > > > > > > > > >>         'response.value.field', 'entities']) as
> entities
> > > > > > > > > > >> FROM documents;
> > > > > > > > > > >> ```
> > > > > > > > > > >>
> > > > > > > > > > >> ## Performance and Cost Impact
> > > > > > > > > > >>
> > > > > > > > > > >> ### Example: Processing 10,000 customer reviews
> > > > > > > > > > >>
> > > > > > > > > > >> **Current (unbatched)**:
> > > > > > > > > > >> - 10,000 API calls
> > > > > > > > > > >> - ~10,000 x 200ms latency = 2,000 seconds total
> > processing
> > > > > time
> > > > > > > > > > >> (serialized)
> > > > > > > > > > >> - ~10,000 x $0.002 = $20 in API costs
> > > > > > > > > > >> - High rate limit pressure
> > > > > > > > > > >>
> > > > > > > > > > >> **With batching (batch_size=20)**:
> > > > > > > > > > >> - 500 API calls (10,000 / 20)
> > > > > > > > > > >> - ~500 x 300ms latency = 150 seconds total processing
> > time
> > > > > > > > > > >> - ~500 x $0.006 = $3 in API costs (slightly higher per
> > > call
> > > > > due
> > > > > > to
> > > > > > > > > > larger
> > > > > > > > > > >> prompts,
> > > > > > > > > > >>   but still 85% cheaper overall)
> > > > > > > > > > >> - **20x fewer API calls**
> > > > > > > > > > >> - **13x faster processing**
> > > > > > > > > > >> - **85% cost reduction**
> > > > > > > > > > >>
> > > > > > > > > > >> ## Proposed Implementation
> > > > > > > > > > >>
> > > > > > > > > > >> ### Configuration Parameters
> > > > > > > > > > >>
> > > > > > > > > > >> **Model-level (defaults)**:
> > > > > > > > > > >> - `batch.size`: Maximum records per batch (default: 1
> > for
> > > > > > backward
> > > > > > > > > > >> compatibility)
> > > > > > > > > > >> - `batch.timeout.ms`: Max time to wait before
> flushing
> > > > > > incomplete
> > > > > > > > > batch
> > > > > > > > > > >> (default: 1000ms)
> > > > > > > > > > >> - `system.prompt`: System-level instruction for the
> LLM
> > > > > > > > > > >> - `batch.prompt.template`: Template explaining how to
> > > > process
> > > > > > > > batched
> > > > > > > > > > >> inputs
> > > > > > > > > > >> - `response.format`: Expected response format ('json',
> > > > 'xml',
> > > > > > > > > > 'delimited')
> > > > > > > > > > >> - `response.path`: JSONPath or XPath to extract
> results
> > > > array
> > > > > > > > > > >> - `response.index.field`: Field name containing the
> > record
> > > > > index
> > > > > > > > > > >> - `response.value.field`: Field name containing the
> > actual
> > > > > > result
> > > > > > > > > > >> - `max.retries`: Retry attempts for failed batches
> > > (default:
> > > > > 3)
> > > > > > > > > > >> - `request.timeout.ms`: Timeout for API calls
> (default:
> > > > > > 30000ms)
> > > > > > > > > > >>
> > > > > > > > > > >> **Query-level (overrides)**:
> > > > > > > > > > >> - Any of the above can be overridden via MAP parameter
> > in
> > > > > > > ML_PREDICT
> > > > > > > > > > >> - Per-query customization for different analysis tasks
> > > > > > > > > > >>
> > > > > > > > > > >> ### Key Features
> > > > > > > > > > >> 1. **Prompt injection**: Automatically construct batch
> > > > prompts
> > > > > > > with
> > > > > > > > > > >> indexed inputs
> > > > > > > > > > >> 2. **Structured response parsing**: Support JSON, XML,
> > or
> > > > > > > delimited
> > > > > > > > > > >> formats
> > > > > > > > > > >> 3. **Index tracking**: Maintain record-to-result
> mapping
> > > > > through
> > > > > > > the
> > > > > > > > > > batch
> > > > > > > > > > >> 4. **Error handling**: Handle parsing failures,
> missing
> > > > > indices,
> > > > > > > > > > >> malformed responses
> > > > > > > > > > >> 5. **Fallback to individual calls**: If batch fails,
> > > > > optionally
> > > > > > > > retry
> > > > > > > > > > >> records individually
> > > > > > > > > > >> 6. **Provider-agnostic**: Works with any LLM API
> > (OpenAI,
> > > > > > > Anthropic,
> > > > > > > > > > >> Azure, self-hosted)
> > > > > > > > > > >> 7. **Async processing**: Non-blocking batch requests
> > > > > > > > > > >> 8. **Back-pressure**: Proper flow control when API is
> > slow
> > > > > > > > > > >> 9. **Backward compatible**: batch.size=1 maintains
> > current
> > > > > > > behavior
> > > > > > > > > > >>
> > > > > > > > > > >> ### Technical Approach
> > > > > > > > > > >> - Extend existing ML_PREDICT infrastructure
> > > > > > > > > > >> - Add batching buffer in the ML_PREDICT operator
> > > > > > > > > > >> - Implement prompt template engine for batch
> > construction:
> > > > > > > > > > >>   - Inject record index + content into template
> > > > > > > > > > >>   - Support various templating formats (JSON, XML,
> plain
> > > > text)
> > > > > > > > > > >> - Implement response parser:
> > > > > > > > > > >>   - Extract structured data (JSONPath, XPath, regex)
> > > > > > > > > > >>   - Map results back to original records by index
> > > > > > > > > > >>   - Handle missing or malformed responses
> > > > > > > > > > >> - Maintain record ordering and error attribution
> > > > > > > > > > >> - Support parameter override mechanism in ML_PREDICT
> > > > function
> > > > > > > > > signature
> > > > > > > > > > >>
> > > > > > > > > > >> ### Response Parsing Strategy
> > > > > > > > > > >>
> > > > > > > > > > >> The implementation must handle:
> > > > > > > > > > >> 1. **Successful batch response**: Parse and distribute
> > > > results
> > > > > > > > > > >> 2. **Partial failure**: Some records missing from
> > > response →
> > > > > > emit
> > > > > > > > > errors
> > > > > > > > > > >> for those
> > > > > > > > > > >> 3. **Complete parse failure**: Optionally fallback to
> > > > > individual
> > > > > > > > calls
> > > > > > > > > > >> 4. **Index mismatch**: Response indices don't match
> > input
> > > →
> > > > > log
> > > > > > > > > warning
> > > > > > > > > > >> and best-effort match
> > > > > > > > > > >> 5. **Malformed JSON**: Retry with error handling
> > > > > > > > > > >>
> > > > > > > > > > >> Example error handling:
> > > > > > > > > > >> ```sql
> > > > > > > > > > >> -- Records that fail parsing get null results with
> error
> > > > > > metadata
> > > > > > > > > > >> SELECT
> > > > > > > > > > >>     id,
> > > > > > > > > > >>     text,
> > > > > > > > > > >>     result.value as sentiment,
> > > > > > > > > > >>     result.error as error_msg
> > > > > > > > > > >> FROM source_table,
> > > > > > > > > > >> LATERAL TABLE(ML_PREDICT('llm_sentiment', text));
> > > > > > > > > > >> ```
> > > > > > > > > > >>
> > > > > > > > > > >> ## Limitations and Considerations
> > > > > > > > > > >>
> > > > > > > > > > >> 1. **LLM instruction following**: Depends on model's
> > > ability
> > > > > to
> > > > > > > > follow
> > > > > > > > > > >> structured
> > > > > > > > > > >>    output instructions. GPT-4 and Claude are reliable;
> > > older
> > > > > > > models
> > > > > > > > > may
> > > > > > > > > > >> struggle.
> > > > > > > > > > >>
> > > > > > > > > > >> 2. **Prompt size limits**: Batching too many records
> may
> > > > > exceed
> > > > > > > > > context
> > > > > > > > > > >> windows
> > > > > > > > > > >>    - GPT-4: ~8K tokens input limit
> > > > > > > > > > >>    - Claude: ~200K tokens but practical batches
> smaller
> > > > > > > > > > >>    - Need configurable max batch size based on average
> > > > record
> > > > > > > length
> > > > > > > > > > >>
> > > > > > > > > > >> 3. **Token cost trade-off**: Larger batches mean:
> > > > > > > > > > >>    - Fewer API calls (good)
> > > > > > > > > > >>    - But larger prompts with instructions/formatting
> > > (slight
> > > > > > > > overhead)
> > > > > > > > > > >>    - Net savings still 80-90% in practice
> > > > > > > > > > >>
> > > > > > > > > > >> 4. **Parsing reliability**: Small risk of malformed
> > > > responses
> > > > > > > > > > >>    - Mitigated by: clear instructions, JSON mode
> > (GPT-4),
> > > > > retry
> > > > > > > > logic
> > > > > > > > > > >>    - Fallback to individual calls if batch parsing
> fails
> > > > > > > repeatedly
> > > > > > > > > > >>
> > > > > > > > > > >> 5. **Latency characteristics**:
> > > > > > > > > > >>    - Individual records see slightly higher latency
> > > (waiting
> > > > > for
> > > > > > > > > batch)
> > > > > > > > > > >>    - Overall throughput dramatically improved
> > > > > > > > > > >>    - Use `batch.timeout.ms` to balance latency vs
> > > > throughput
> > > > > > > > > > >>
> > > > > > > > > > >> ## Future Extensions
> > > > > > > > > > >>
> > > > > > > > > > >> This batching architecture would support:
> > > > > > > > > > >> 1. **Stateful chat sessions**: Batch multiple turns
> of a
> > > > > > > > conversation
> > > > > > > > > > >> with
> > > > > > > > > > >>    maintained history per session key
> > > > > > > > > > >> 2. **Embedding generation**: Some providers (OpenAI)
> do
> > > have
> > > > > > batch
> > > > > > > > > > >> embedding APIs
> > > > > > > > > > >> 3. **Multi-modal batching**: Batch image + text
> > processing
> > > > > with
> > > > > > > > > > >> structured outputs
> > > > > > > > > > >>
> > > > > > > > > > >> ## Questions for the Community
> > > > > > > > > > >>
> > > > > > > > > > >> 1. **Architecture**: Should this extend ML_PREDICT or
> > be a
> > > > new
> > > > > > > > > function?
> > > > > > > > > > >>    (I propose extending ML_PREDICT for backward
> > > > compatibility)
> > > > > > > > > > >>
> > > > > > > > > > >> 2. **FLIP Required?**: Does this enhancement warrant a
> > > FLIP?
> > > > > > > > > > >>
> > > > > > > > > > >> 3. **Existing Work**: Is anyone working on batching
> for
> > > > > > ML_PREDICT
> > > > > > > > or
> > > > > > > > > > >> similar
> > > > > > > > > > >>    functionality?
> > > > > > > > > > >>
> > > > > > > > > > >> 4. **Prompt Template Engine**: Should we:
> > > > > > > > > > >>    - Build a custom template engine?
> > > > > > > > > > >>    - Use existing library (e.g., StringTemplate,
> > > Mustache)?
> > > > > > > > > > >>    - Keep it simple with String.format initially?
> > > > > > > > > > >>
> > > > > > > > > > >> 5. **Response Parsing**: Preferred approach:
> > > > > > > > > > >>    - JSONPath library (flexible but adds dependency)
> > > > > > > > > > >>    - Simple JSON parsing with field names
> > > > > > > > > > >>    - Pluggable parser interface for extensibility?
> > > > > > > > > > >>
> > > > > > > > > > >> 6. **Error Handling**: If parsing fails for entire
> > batch:
> > > > > > > > > > >>    - Fail all records in batch?
> > > > > > > > > > >>    - Retry batch once more?
> > > > > > > > > > >>    - Fallback to individual calls (with circuit
> > breaker)?
> > > > > > > > > > >>    - Make strategy configurable?
> > > > > > > > > > >>
> > > > > > > > > > >> 7. **Batch Assembly**: Should batching happen:
> > > > > > > > > > >>    - Per parallel instance (each task maintains its
> own
> > > > > batch)?
> > > > > > > > > > >>    - Globally coordinated (shuffle to batch
> > coordinator)?
> > > > > > > > > > >>    - I propose per-instance for simplicity and lower
> > > latency
> > > > > > > > > > >>
> > > > > > > > > > >> 8. **Compatibility**: Default batch.size=1 to maintain
> > > > current
> > > > > > > > > behavior,
> > > > > > > > > > >> users
> > > > > > > > > > >>    opt-in to batching?
> > > > > > > > > > >>
> > > > > > > > > > >> ## Why This Matters
> > > > > > > > > > >>
> > > > > > > > > > >> LLM inference is becoming a critical part of real-time
> > > data
> > > > > > > > pipelines.
> > > > > > > > > > >> Without
> > > > > > > > > > >> batching:
> > > > > > > > > > >> - Users face prohibitive costs for high-throughput
> > > workloads
> > > > > > > > > > >> - Rate limits block production deployments
> > > > > > > > > > >> - Latency makes real-time processing impractical
> > > > > > > > > > >>
> > > > > > > > > > >> While LLM providers don't offer native batch APIs,
> > > > > > > application-level
> > > > > > > > > > >> batching
> > > > > > > > > > >> through prompt engineering is a proven pattern used in
> > > > > > production
> > > > > > > by
> > > > > > > > > > many
> > > > > > > > > > >> organizations. This proposal brings that capability
> > > natively
> > > > > > into
> > > > > > > > > Flink.
> > > > > > > > > > >>
> > > > > > > > > > >> The hybrid configuration approach provides:
> > > > > > > > > > >> - **Sensible defaults** for common use cases
> (sentiment
> > > > > > analysis,
> > > > > > > > > > >> classification)
> > > > > > > > > > >> - **Flexibility** to customize prompts and parsing for
> > > > > specific
> > > > > > > > needs
> > > > > > > > > > >> - **Easy migration** for existing queries
> (batch.size=1
> > > > > default)
> > > > > > > > > > >>
> > > > > > > > > > >> ## Next Steps
> > > > > > > > > > >>
> > > > > > > > > > >> If there's interest from the community, I'm happy to:
> > > > > > > > > > >> 1. Prepare a detailed design document with prompt
> > > templates
> > > > > and
> > > > > > > > > parsing
> > > > > > > > > > >> examples
> > > > > > > > > > >> 2. Create a JIRA ticket
> > > > > > > > > > >> 3. Develop a prototype demonstrating the batching and
> > > > parsing
> > > > > > > logic
> > > > > > > > > > >> 4. Write a FLIP if required
> > > > > > > > > > >>
> > > > > > > > > > >> Looking forward to your feedback and guidance on how
> > best
> > > to
> > > > > > > > > proceed!--
> > > > > > > > > > >> Thanks And Regards
> > > > > > > > > > >> Rahul
> > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > --
> > > > > > > > > > > Thanks And Regards
> > > > > > > > > > > Rahul
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > --
> > > > > > > > > > Thanks And Regards
> > > > > > > > > > Rahul
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > >
> > > > --
> > > > Thanks And Regards
> > > > Rahul
> > > >
> > >
> >
> >
> > --
> > Thanks And Regards
> > Rahul
> >
>


-- 
Thanks And Regards
Rahul

Reply via email to