[ 
https://issues.apache.org/jira/browse/CAMEL-23240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guillaume Nodet updated CAMEL-23240:
------------------------------------
    Description: 
h2. Rationale

Processing collections of items is one of the most common patterns in Camel. 
Today, users must manually assemble a Splitter EIP, Aggregator EIP, and custom 
error handling logic to handle chunking, failure tracking, and resume 
capability. This is verbose, error-prone, and hard to get right — especially 
around edge cases like partial failures, abort thresholds, and watermark-based 
resume.

The {{camel-bulk}} component provides this as a single, configurable endpoint.

h2. Features

* *Chunking* — splits collections into configurable-size chunks
* *Error thresholds* — aborts when failure ratio ({{errorThreshold}}) or 
absolute count ({{maxFailedRecords}}) exceeds limits
* *Multi-step pipelines* — items flow through multiple processing steps with 
accept policies ({{ALL}}, {{NO_FAILURES}}, {{FAILURES_ONLY}})
* *Watermark tracking* — resume-from-last-position for incremental processing 
(index-based and value-based)
* *Parallel processing* — concurrent item processing within each chunk
* *onComplete callback* — notification endpoint called after every bulk 
execution with full statistics
* *Aggregation strategy* — optional result collection via 
{{AggregationStrategy}}
* *Result reporting* — structured {{BulkResult}} with total/success/failure 
counts, timing, and per-item failure details

Uses {{CamelBulk*}} header prefix to avoid collision with core 
{{Exchange.BATCH_*}} headers used by batch consumers. Unrelated to 
{{camel-spring-batch}} (which integrates with Spring Batch jobs).

Producer-only component with URI format: {{bulk:jobName[?options]}}

h2. Examples

h3. Basic bulk processing
{code:java}
from("direct:start")
    .to("bulk:myJob?chunkSize=100&processorRef=direct:processItem");

from("direct:processItem")
    .log("Processing item ${header.CamelBulkIndex}/${header.CamelBulkSize}: 
${body}");
{code}

h3. Multi-step ETL with failure filtering
{code:java}
from("direct:start")
    
.to("bulk:etlJob?steps=direct:validate,direct:transform,direct:load&acceptPolicy=NO_FAILURES");
{code}

h3. Error recovery pattern
{code:java}
from("direct:start")
    .doTry()
        
.to("bulk:importJob?processorRef=direct:import&errorThreshold=0.1&maxFailedRecords=50")
    .doCatch(BulkException.class)
        .log("Bulk aborted: ${exception.message}")
    .end();
{code}

h3. Watermark-based resume
{code:java}
from("timer:poll?period=60000")
    .bean("myDataSource", "fetchAllRecords")
    
.to("bulk:syncJob?processorRef=direct:sync&watermarkStore=#myWatermarkStore");
{code}

h3. YAML DSL
{code:yaml}
- route:
    from:
      uri: direct:start
    steps:
      - to:
          uri: bulk:myJob
          parameters:
            chunkSize: 100
            processorRef: direct:processItem
{code}

h2. Configuration Options

|| Option || Default || Description ||
| {{chunkSize}} | 100 | Number of items per chunk |
| {{processorRef}} | — | Endpoint URI or bean reference that processes each 
item |
| {{steps}} | — | Comma-separated endpoint URIs for multi-step processing 
(mutually exclusive with {{processorRef}}) |
| {{acceptPolicy}} | {{ALL}} | Which items are eligible for subsequent steps: 
{{ALL}}, {{NO_FAILURES}}, {{FAILURES_ONLY}} |
| {{errorThreshold}} | 1.0 | Fraction of failures (0.0–1.0) before aborting |
| {{maxFailedRecords}} | -1 | Absolute failure count before aborting (-1 = 
disabled) |
| {{watermarkStore}} | — | Bean reference to a {{Map<String, String>}} for 
watermark tracking |
| {{watermarkKey}} | jobName | Key used in the watermark store |
| {{watermarkExpression}} | — | Simple expression to extract a watermark value 
from each item |
| {{parallelProcessing}} | false | Process items within each chunk in parallel |
| {{aggregationStrategy}} | — | Bean reference to an {{AggregationStrategy}} |
| {{onCompleteRef}} | — | Endpoint called after every bulk execution with 
{{BulkResult}} |

  was:
Add a new camel-bulk component that provides structured bulk processing for 
collections of items.

Features:
- Configurable chunk sizes for processing large collections in manageable chunks
- Error thresholds: abort processing when failure ratio exceeds a configurable 
limit
- Max failed records: abort after an absolute number of failures
- Multi-step pipelines with accept policies (ALL, NO_FAILURES, FAILURES_ONLY)
- Watermark tracking for resume-from-last-position capability (index-based and 
value-based)
- Parallel processing support within chunks
- Aggregation strategies for combining results
- onComplete callback for post-processing notifications
- Producer-only component with URI format bulk:jobName[?options]
- Uses CamelBulk* header prefix to avoid collision with core Exchange.BATCH_* 
headers


> Add camel-bulk component for structured bulk processing
> -------------------------------------------------------
>
>                 Key: CAMEL-23240
>                 URL: https://issues.apache.org/jira/browse/CAMEL-23240
>             Project: Camel
>          Issue Type: New Feature
>            Reporter: Guillaume Nodet
>            Assignee: Guillaume Nodet
>            Priority: Major
>             Fix For: 4.19.0
>
>
> h2. Rationale
> Processing collections of items is one of the most common patterns in Camel. 
> Today, users must manually assemble a Splitter EIP, Aggregator EIP, and 
> custom error handling logic to handle chunking, failure tracking, and resume 
> capability. This is verbose, error-prone, and hard to get right — especially 
> around edge cases like partial failures, abort thresholds, and 
> watermark-based resume.
> The {{camel-bulk}} component provides this as a single, configurable endpoint.
> h2. Features
> * *Chunking* — splits collections into configurable-size chunks
> * *Error thresholds* — aborts when failure ratio ({{errorThreshold}}) or 
> absolute count ({{maxFailedRecords}}) exceeds limits
> * *Multi-step pipelines* — items flow through multiple processing steps with 
> accept policies ({{ALL}}, {{NO_FAILURES}}, {{FAILURES_ONLY}})
> * *Watermark tracking* — resume-from-last-position for incremental processing 
> (index-based and value-based)
> * *Parallel processing* — concurrent item processing within each chunk
> * *onComplete callback* — notification endpoint called after every bulk 
> execution with full statistics
> * *Aggregation strategy* — optional result collection via 
> {{AggregationStrategy}}
> * *Result reporting* — structured {{BulkResult}} with total/success/failure 
> counts, timing, and per-item failure details
> Uses {{CamelBulk*}} header prefix to avoid collision with core 
> {{Exchange.BATCH_*}} headers used by batch consumers. Unrelated to 
> {{camel-spring-batch}} (which integrates with Spring Batch jobs).
> Producer-only component with URI format: {{bulk:jobName[?options]}}
> h2. Examples
> h3. Basic bulk processing
> {code:java}
> from("direct:start")
>     .to("bulk:myJob?chunkSize=100&processorRef=direct:processItem");
> from("direct:processItem")
>     .log("Processing item ${header.CamelBulkIndex}/${header.CamelBulkSize}: 
> ${body}");
> {code}
> h3. Multi-step ETL with failure filtering
> {code:java}
> from("direct:start")
>     
> .to("bulk:etlJob?steps=direct:validate,direct:transform,direct:load&acceptPolicy=NO_FAILURES");
> {code}
> h3. Error recovery pattern
> {code:java}
> from("direct:start")
>     .doTry()
>         
> .to("bulk:importJob?processorRef=direct:import&errorThreshold=0.1&maxFailedRecords=50")
>     .doCatch(BulkException.class)
>         .log("Bulk aborted: ${exception.message}")
>     .end();
> {code}
> h3. Watermark-based resume
> {code:java}
> from("timer:poll?period=60000")
>     .bean("myDataSource", "fetchAllRecords")
>     
> .to("bulk:syncJob?processorRef=direct:sync&watermarkStore=#myWatermarkStore");
> {code}
> h3. YAML DSL
> {code:yaml}
> - route:
>     from:
>       uri: direct:start
>     steps:
>       - to:
>           uri: bulk:myJob
>           parameters:
>             chunkSize: 100
>             processorRef: direct:processItem
> {code}
> h2. Configuration Options
> || Option || Default || Description ||
> | {{chunkSize}} | 100 | Number of items per chunk |
> | {{processorRef}} | — | Endpoint URI or bean reference that processes each 
> item |
> | {{steps}} | — | Comma-separated endpoint URIs for multi-step processing 
> (mutually exclusive with {{processorRef}}) |
> | {{acceptPolicy}} | {{ALL}} | Which items are eligible for subsequent steps: 
> {{ALL}}, {{NO_FAILURES}}, {{FAILURES_ONLY}} |
> | {{errorThreshold}} | 1.0 | Fraction of failures (0.0–1.0) before aborting |
> | {{maxFailedRecords}} | -1 | Absolute failure count before aborting (-1 = 
> disabled) |
> | {{watermarkStore}} | — | Bean reference to a {{Map<String, String>}} for 
> watermark tracking |
> | {{watermarkKey}} | jobName | Key used in the watermark store |
> | {{watermarkExpression}} | — | Simple expression to extract a watermark 
> value from each item |
> | {{parallelProcessing}} | false | Process items within each chunk in 
> parallel |
> | {{aggregationStrategy}} | — | Bean reference to an {{AggregationStrategy}} |
> | {{onCompleteRef}} | — | Endpoint called after every bulk execution with 
> {{BulkResult}} |



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to