[ 
https://issues.apache.org/jira/browse/CAMEL-23240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guillaume Nodet updated CAMEL-23240:
------------------------------------
    Fix Version/s:     (was: 4.19.0)

> Add camel-bulk component for structured bulk processing
> -------------------------------------------------------
>
>                 Key: CAMEL-23240
>                 URL: https://issues.apache.org/jira/browse/CAMEL-23240
>             Project: Camel
>          Issue Type: New Feature
>            Reporter: Guillaume Nodet
>            Assignee: Guillaume Nodet
>            Priority: Major
>
> h2. Rationale
> Processing collections of items is one of the most common patterns in Camel. 
> Today, users must manually assemble a Splitter EIP, Aggregator EIP, and 
> custom error handling logic to handle chunking, failure tracking, and resume 
> capability. This is verbose, error-prone, and hard to get right — especially 
> around edge cases like partial failures, abort thresholds, and 
> watermark-based resume.
> The {{camel-bulk}} component provides this as a single, configurable endpoint.
> h2. Features
> * *Chunking* — splits collections into configurable-size chunks
> * *Error thresholds* — aborts when failure ratio ({{errorThreshold}}) or 
> absolute count ({{maxFailedRecords}}) exceeds limits
> * *Multi-step pipelines* — items flow through multiple processing steps with 
> accept policies ({{ALL}}, {{NO_FAILURES}}, {{FAILURES_ONLY}})
> * *Watermark tracking* — resume-from-last-position for incremental processing 
> (index-based and value-based)
> * *Parallel processing* — concurrent item processing within each chunk
> * *onComplete callback* — notification endpoint called after every bulk 
> execution with full statistics
> * *Aggregation strategy* — optional result collection via 
> {{AggregationStrategy}}
> * *Result reporting* — structured {{BulkResult}} with total/success/failure 
> counts, timing, and per-item failure details
> Uses {{CamelBulk*}} header prefix to avoid collision with core 
> {{Exchange.BATCH_*}} headers used by batch consumers. Unrelated to 
> {{camel-spring-batch}} (which integrates with Spring Batch jobs).
> Producer-only component with URI format: {{bulk:jobName[?options]}}
> h2. Examples
> h3. Basic bulk processing
> {code:java}
> from("direct:start")
>     .to("bulk:myJob?chunkSize=100&processorRef=direct:processItem");
> from("direct:processItem")
>     .log("Processing item ${header.CamelBulkIndex}/${header.CamelBulkSize}: 
> ${body}");
> {code}
> h3. Multi-step ETL with failure filtering
> {code:java}
> from("direct:start")
>     
> .to("bulk:etlJob?steps=direct:validate,direct:transform,direct:load&acceptPolicy=NO_FAILURES");
> {code}
> h3. Error recovery pattern
> {code:java}
> from("direct:start")
>     .doTry()
>         
> .to("bulk:importJob?processorRef=direct:import&errorThreshold=0.1&maxFailedRecords=50")
>     .doCatch(BulkException.class)
>         .log("Bulk aborted: ${exception.message}")
>     .end();
> {code}
> h3. Watermark-based resume
> {code:java}
> from("timer:poll?period=60000")
>     .bean("myDataSource", "fetchAllRecords")
>     
> .to("bulk:syncJob?processorRef=direct:sync&watermarkStore=#myWatermarkStore");
> {code}
> h3. YAML DSL
> {code:yaml}
> - route:
>     from:
>       uri: direct:start
>     steps:
>       - to:
>           uri: bulk:myJob
>           parameters:
>             chunkSize: 100
>             processorRef: direct:processItem
> {code}
> h2. Configuration Options
> || Option || Default || Description ||
> | {{chunkSize}} | 100 | Number of items per chunk |
> | {{processorRef}} | — | Endpoint URI or bean reference that processes each 
> item |
> | {{steps}} | — | Comma-separated endpoint URIs for multi-step processing 
> (mutually exclusive with {{processorRef}}) |
> | {{acceptPolicy}} | {{ALL}} | Which items are eligible for subsequent steps: 
> {{ALL}}, {{NO_FAILURES}}, {{FAILURES_ONLY}} |
> | {{errorThreshold}} | 1.0 | Fraction of failures (0.0–1.0) before aborting |
> | {{maxFailedRecords}} | -1 | Absolute failure count before aborting (-1 = 
> disabled) |
> | {{watermarkStore}} | — | Bean reference to a {{Map<String, String>}} for 
> watermark tracking |
> | {{watermarkKey}} | jobName | Key used in the watermark store |
> | {{watermarkExpression}} | — | Simple expression to extract a watermark 
> value from each item |
> | {{parallelProcessing}} | false | Process items within each chunk in 
> parallel |
> | {{aggregationStrategy}} | — | Bean reference to an {{AggregationStrategy}} |
> | {{onCompleteRef}} | — | Endpoint called after every bulk execution with 
> {{BulkResult}} |



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to