Hi Flink Community,

I'm interested in contributing an enhancement to Apache Flink's ML_PREDICT
functionality for LLM interactions. I'd like to gauge community interest
and get
early feedback before proceeding with detailed design or a FLIP.

## Problem Statement

Currently, when using Flink SQL's ML_PREDICT with LLM endpoints, each
record
triggers an individual API call. For a stream processing 1000
records/second,
this results in:

- **1000 separate API calls per second**
- **High latency**: Each call has network overhead + API processing time
- **High cost**: Most LLM providers charge per token, and lack of batching
means
  no cost optimization
- **Rate limiting issues**: Hitting provider rate limits quickly
- **Poor throughput**: API calls are serialized per record

### Current Behavior (Inefficient)
```sql
-- This makes 10 individual API calls
SELECT id, ML_PREDICT('llm_model', text) as result
FROM (VALUES
    (1, 'text1'), (2, 'text2'), ..., (10, 'text10')
) AS t(id, text);
```
**Result**: 10 separate HTTP requests, 10x latency, 10x overhead

## Proposed Solution: Application-Level Batching with Prompt Engineering

Since most LLM APIs (OpenAI, Anthropic Claude, etc.) don't provide native
batch
endpoints, we propose implementing batching at the application level by:

1. **Accumulating N records** into a single batch
2. **Injecting records into a structured prompt** that instructs the LLM to
   process multiple items
3. **Parsing structured responses** to extract results for each record
4. **Emitting individual results** back to the Flink pipeline

### How It Works

**Step 1: Batch Accumulation**
Collect up to `batch.size` records or wait up to `batch.timeout.ms`

**Step 2: Prompt Construction**

System: You are a sentiment analyzer. Process each item and respond with
JSON.

User: Analyze the sentiment of these texts. Return a JSON array with one
object per input containing "index" and "sentiment" fields.

Input 1: "This product is amazing!" Input 2: "Terrible experience, very
disappointed" Input 3: "It's okay, nothing special" ... Input 10: "Best
purchase ever!"

Respond with: [{"index": 1, "sentiment": "..."}, {"index": 2, "sentiment":
"..."}, ...]

**Step 3: Response Parsing**
```json
[
  {"index": 1, "sentiment": "positive"},
  {"index": 2, "sentiment": "negative"},
  {"index": 3, "sentiment": "neutral"},
  ...
  {"index": 10, "sentiment": "positive"}
]
```

**Step 4: Result Distribution**
Parse JSON and emit individual results back to corresponding records

### Model Configuration (Defaults)
```sql
CREATE MODEL llm_sentiment WITH (
    'provider' = 'openai',
    'model' = 'gpt-4',
    'api_key' = '${API_KEY}',
    'batch.size' = '20',
    'batch.timeout.ms' = '1000',
    'system.prompt' = 'You are a sentiment analyzer. Always respond with
valid JSON.',
    'batch.prompt.template' = 'Analyze sentiment for these texts. Return
JSON array: [{"index": <n>, "sentiment": "<positive|negative|neutral>"}]',
    'response.format' = 'json',
    'response.path' = '$[*]',  -- JSONPath to extract array of results
    'response.index.field' = 'index',  -- Field containing record index
    'response.value.field' = 'sentiment'  -- Field containing result
);
```

### Query Usage (Use Defaults)
```sql
-- Uses batch_size=20 from model definition
SELECT id, text, ML_PREDICT('llm_sentiment', text) as sentiment
FROM customer_reviews;
```

### Query Usage (Override for Custom Analysis)
```sql
-- Override prompt and batch size for different use case
SELECT id, text, ML_PREDICT('llm_sentiment', text,
    MAP['batch.size', '50',
        'batch.prompt.template', 'Extract key entities. Return JSON:
[{"index": <n>, "entities": [...]}]',
        'response.value.field', 'entities']) as entities
FROM documents;
```

## Performance and Cost Impact

### Example: Processing 10,000 customer reviews

**Current (unbatched)**:
- 10,000 API calls
- ~10,000 x 200ms latency = 2,000 seconds total processing time (serialized)
- ~10,000 x $0.002 = $20 in API costs
- High rate limit pressure

**With batching (batch_size=20)**:
- 500 API calls (10,000 / 20)
- ~500 x 300ms latency = 150 seconds total processing time
- ~500 x $0.006 = $3 in API costs (slightly higher per call due to larger
prompts,
  but still 85% cheaper overall)
- **20x fewer API calls**
- **13x faster processing**
- **85% cost reduction**

## Proposed Implementation

### Configuration Parameters

**Model-level (defaults)**:
- `batch.size`: Maximum records per batch (default: 1 for backward
compatibility)
- `batch.timeout.ms`: Max time to wait before flushing incomplete batch
(default: 1000ms)
- `system.prompt`: System-level instruction for the LLM
- `batch.prompt.template`: Template explaining how to process batched inputs
- `response.format`: Expected response format ('json', 'xml', 'delimited')
- `response.path`: JSONPath or XPath to extract results array
- `response.index.field`: Field name containing the record index
- `response.value.field`: Field name containing the actual result
- `max.retries`: Retry attempts for failed batches (default: 3)
- `request.timeout.ms`: Timeout for API calls (default: 30000ms)

**Query-level (overrides)**:
- Any of the above can be overridden via MAP parameter in ML_PREDICT
- Per-query customization for different analysis tasks

### Key Features
1. **Prompt injection**: Automatically construct batch prompts with indexed
inputs
2. **Structured response parsing**: Support JSON, XML, or delimited formats
3. **Index tracking**: Maintain record-to-result mapping through the batch
4. **Error handling**: Handle parsing failures, missing indices, malformed
responses
5. **Fallback to individual calls**: If batch fails, optionally retry
records individually
6. **Provider-agnostic**: Works with any LLM API (OpenAI, Anthropic, Azure,
self-hosted)
7. **Async processing**: Non-blocking batch requests
8. **Back-pressure**: Proper flow control when API is slow
9. **Backward compatible**: batch.size=1 maintains current behavior

### Technical Approach
- Extend existing ML_PREDICT infrastructure
- Add batching buffer in the ML_PREDICT operator
- Implement prompt template engine for batch construction:
  - Inject record index + content into template
  - Support various templating formats (JSON, XML, plain text)
- Implement response parser:
  - Extract structured data (JSONPath, XPath, regex)
  - Map results back to original records by index
  - Handle missing or malformed responses
- Maintain record ordering and error attribution
- Support parameter override mechanism in ML_PREDICT function signature

### Response Parsing Strategy

The implementation must handle:
1. **Successful batch response**: Parse and distribute results
2. **Partial failure**: Some records missing from response → emit errors
for those
3. **Complete parse failure**: Optionally fallback to individual calls
4. **Index mismatch**: Response indices don't match input → log warning and
best-effort match
5. **Malformed JSON**: Retry with error handling

Example error handling:
```sql
-- Records that fail parsing get null results with error metadata
SELECT
    id,
    text,
    result.value as sentiment,
    result.error as error_msg
FROM source_table,
LATERAL TABLE(ML_PREDICT('llm_sentiment', text));
```

## Limitations and Considerations

1. **LLM instruction following**: Depends on model's ability to follow
structured
   output instructions. GPT-4 and Claude are reliable; older models may
struggle.

2. **Prompt size limits**: Batching too many records may exceed context
windows
   - GPT-4: ~8K tokens input limit
   - Claude: ~200K tokens but practical batches smaller
   - Need configurable max batch size based on average record length

3. **Token cost trade-off**: Larger batches mean:
   - Fewer API calls (good)
   - But larger prompts with instructions/formatting (slight overhead)
   - Net savings still 80-90% in practice

4. **Parsing reliability**: Small risk of malformed responses
   - Mitigated by: clear instructions, JSON mode (GPT-4), retry logic
   - Fallback to individual calls if batch parsing fails repeatedly

5. **Latency characteristics**:
   - Individual records see slightly higher latency (waiting for batch)
   - Overall throughput dramatically improved
   - Use `batch.timeout.ms` to balance latency vs throughput

## Future Extensions

This batching architecture would support:
1. **Stateful chat sessions**: Batch multiple turns of a conversation with
   maintained history per session key
2. **Embedding generation**: Some providers (OpenAI) do have batch
embedding APIs
3. **Multi-modal batching**: Batch image + text processing with structured
outputs

## Questions for the Community

1. **Architecture**: Should this extend ML_PREDICT or be a new function?
   (I propose extending ML_PREDICT for backward compatibility)

2. **FLIP Required?**: Does this enhancement warrant a FLIP?

3. **Existing Work**: Is anyone working on batching for ML_PREDICT or
similar
   functionality?

4. **Prompt Template Engine**: Should we:
   - Build a custom template engine?
   - Use existing library (e.g., StringTemplate, Mustache)?
   - Keep it simple with String.format initially?

5. **Response Parsing**: Preferred approach:
   - JSONPath library (flexible but adds dependency)
   - Simple JSON parsing with field names
   - Pluggable parser interface for extensibility?

6. **Error Handling**: If parsing fails for entire batch:
   - Fail all records in batch?
   - Retry batch once more?
   - Fallback to individual calls (with circuit breaker)?
   - Make strategy configurable?

7. **Batch Assembly**: Should batching happen:
   - Per parallel instance (each task maintains its own batch)?
   - Globally coordinated (shuffle to batch coordinator)?
   - I propose per-instance for simplicity and lower latency

8. **Compatibility**: Default batch.size=1 to maintain current behavior,
users
   opt-in to batching?

## Why This Matters

LLM inference is becoming a critical part of real-time data pipelines.
Without
batching:
- Users face prohibitive costs for high-throughput workloads
- Rate limits block production deployments
- Latency makes real-time processing impractical

While LLM providers don't offer native batch APIs, application-level
batching
through prompt engineering is a proven pattern used in production by many
organizations. This proposal brings that capability natively into Flink.

The hybrid configuration approach provides:
- **Sensible defaults** for common use cases (sentiment analysis,
classification)
- **Flexibility** to customize prompts and parsing for specific needs
- **Easy migration** for existing queries (batch.size=1 default)

## Next Steps

If there's interest from the community, I'm happy to:
1. Prepare a detailed design document with prompt templates and parsing
examples
2. Create a JIRA ticket
3. Develop a prototype demonstrating the batching and parsing logic
4. Write a FLIP if required

Looking forward to your feedback and guidance on how best to proceed!--
Thanks And Regards
Rahul

Reply via email to