[
https://issues.apache.org/jira/browse/CAMEL-23240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Guillaume Nodet updated CAMEL-23240:
------------------------------------
Description:
h2. Rationale
Processing collections of items is one of the most common patterns in Camel.
Today, users must manually assemble a Splitter EIP, Aggregator EIP, and custom
error handling logic to handle chunking, failure tracking, and resume
capability. This is verbose, error-prone, and hard to get right — especially
around edge cases like partial failures, abort thresholds, and watermark-based
resume.
The {{camel-bulk}} component provides this as a single, configurable endpoint.
h2. Features
* *Chunking* — splits collections into configurable-size chunks
* *Error thresholds* — aborts when failure ratio ({{errorThreshold}}) or
absolute count ({{maxFailedRecords}}) exceeds limits
* *Multi-step pipelines* — items flow through multiple processing steps with
accept policies ({{ALL}}, {{NO_FAILURES}}, {{FAILURES_ONLY}})
* *Watermark tracking* — resume-from-last-position for incremental processing
(index-based and value-based)
* *Parallel processing* — concurrent item processing within each chunk
* *onComplete callback* — notification endpoint called after every bulk
execution with full statistics
* *Aggregation strategy* — optional result collection via
{{AggregationStrategy}}
* *Result reporting* — structured {{BulkResult}} with total/success/failure
counts, timing, and per-item failure details
Uses {{CamelBulk*}} header prefix to avoid collision with core
{{Exchange.BATCH_*}} headers used by batch consumers. Unrelated to
{{camel-spring-batch}} (which integrates with Spring Batch jobs).
Producer-only component with URI format: {{bulk:jobName[?options]}}
h2. Examples
h3. Basic bulk processing
{code:java}
from("direct:start")
.to("bulk:myJob?chunkSize=100&processorRef=direct:processItem");
from("direct:processItem")
.log("Processing item ${header.CamelBulkIndex}/${header.CamelBulkSize}:
${body}");
{code}
h3. Multi-step ETL with failure filtering
{code:java}
from("direct:start")
.to("bulk:etlJob?steps=direct:validate,direct:transform,direct:load&acceptPolicy=NO_FAILURES");
{code}
h3. Error recovery pattern
{code:java}
from("direct:start")
.doTry()
.to("bulk:importJob?processorRef=direct:import&errorThreshold=0.1&maxFailedRecords=50")
.doCatch(BulkException.class)
.log("Bulk aborted: ${exception.message}")
.end();
{code}
h3. Watermark-based resume
{code:java}
from("timer:poll?period=60000")
.bean("myDataSource", "fetchAllRecords")
.to("bulk:syncJob?processorRef=direct:sync&watermarkStore=#myWatermarkStore");
{code}
h3. YAML DSL
{code:yaml}
- route:
from:
uri: direct:start
steps:
- to:
uri: bulk:myJob
parameters:
chunkSize: 100
processorRef: direct:processItem
{code}
h2. Configuration Options
|| Option || Default || Description ||
| {{chunkSize}} | 100 | Number of items per chunk |
| {{processorRef}} | — | Endpoint URI or bean reference that processes each
item |
| {{steps}} | — | Comma-separated endpoint URIs for multi-step processing
(mutually exclusive with {{processorRef}}) |
| {{acceptPolicy}} | {{ALL}} | Which items are eligible for subsequent steps:
{{ALL}}, {{NO_FAILURES}}, {{FAILURES_ONLY}} |
| {{errorThreshold}} | 1.0 | Fraction of failures (0.0–1.0) before aborting |
| {{maxFailedRecords}} | -1 | Absolute failure count before aborting (-1 =
disabled) |
| {{watermarkStore}} | — | Bean reference to a {{Map<String, String>}} for
watermark tracking |
| {{watermarkKey}} | jobName | Key used in the watermark store |
| {{watermarkExpression}} | — | Simple expression to extract a watermark value
from each item |
| {{parallelProcessing}} | false | Process items within each chunk in parallel |
| {{aggregationStrategy}} | — | Bean reference to an {{AggregationStrategy}} |
| {{onCompleteRef}} | — | Endpoint called after every bulk execution with
{{BulkResult}} |
was:
Add a new camel-bulk component that provides structured bulk processing for
collections of items.
Features:
- Configurable chunk sizes for processing large collections in manageable chunks
- Error thresholds: abort processing when failure ratio exceeds a configurable
limit
- Max failed records: abort after an absolute number of failures
- Multi-step pipelines with accept policies (ALL, NO_FAILURES, FAILURES_ONLY)
- Watermark tracking for resume-from-last-position capability (index-based and
value-based)
- Parallel processing support within chunks
- Aggregation strategies for combining results
- onComplete callback for post-processing notifications
- Producer-only component with URI format bulk:jobName[?options]
- Uses CamelBulk* header prefix to avoid collision with core Exchange.BATCH_*
headers
> Add camel-bulk component for structured bulk processing
> -------------------------------------------------------
>
> Key: CAMEL-23240
> URL: https://issues.apache.org/jira/browse/CAMEL-23240
> Project: Camel
> Issue Type: New Feature
> Reporter: Guillaume Nodet
> Assignee: Guillaume Nodet
> Priority: Major
> Fix For: 4.19.0
>
>
> h2. Rationale
> Processing collections of items is one of the most common patterns in Camel.
> Today, users must manually assemble a Splitter EIP, Aggregator EIP, and
> custom error handling logic to handle chunking, failure tracking, and resume
> capability. This is verbose, error-prone, and hard to get right — especially
> around edge cases like partial failures, abort thresholds, and
> watermark-based resume.
> The {{camel-bulk}} component provides this as a single, configurable endpoint.
> h2. Features
> * *Chunking* — splits collections into configurable-size chunks
> * *Error thresholds* — aborts when failure ratio ({{errorThreshold}}) or
> absolute count ({{maxFailedRecords}}) exceeds limits
> * *Multi-step pipelines* — items flow through multiple processing steps with
> accept policies ({{ALL}}, {{NO_FAILURES}}, {{FAILURES_ONLY}})
> * *Watermark tracking* — resume-from-last-position for incremental processing
> (index-based and value-based)
> * *Parallel processing* — concurrent item processing within each chunk
> * *onComplete callback* — notification endpoint called after every bulk
> execution with full statistics
> * *Aggregation strategy* — optional result collection via
> {{AggregationStrategy}}
> * *Result reporting* — structured {{BulkResult}} with total/success/failure
> counts, timing, and per-item failure details
> Uses {{CamelBulk*}} header prefix to avoid collision with core
> {{Exchange.BATCH_*}} headers used by batch consumers. Unrelated to
> {{camel-spring-batch}} (which integrates with Spring Batch jobs).
> Producer-only component with URI format: {{bulk:jobName[?options]}}
> h2. Examples
> h3. Basic bulk processing
> {code:java}
> from("direct:start")
> .to("bulk:myJob?chunkSize=100&processorRef=direct:processItem");
> from("direct:processItem")
> .log("Processing item ${header.CamelBulkIndex}/${header.CamelBulkSize}:
> ${body}");
> {code}
> h3. Multi-step ETL with failure filtering
> {code:java}
> from("direct:start")
>
> .to("bulk:etlJob?steps=direct:validate,direct:transform,direct:load&acceptPolicy=NO_FAILURES");
> {code}
> h3. Error recovery pattern
> {code:java}
> from("direct:start")
> .doTry()
>
> .to("bulk:importJob?processorRef=direct:import&errorThreshold=0.1&maxFailedRecords=50")
> .doCatch(BulkException.class)
> .log("Bulk aborted: ${exception.message}")
> .end();
> {code}
> h3. Watermark-based resume
> {code:java}
> from("timer:poll?period=60000")
> .bean("myDataSource", "fetchAllRecords")
>
> .to("bulk:syncJob?processorRef=direct:sync&watermarkStore=#myWatermarkStore");
> {code}
> h3. YAML DSL
> {code:yaml}
> - route:
> from:
> uri: direct:start
> steps:
> - to:
> uri: bulk:myJob
> parameters:
> chunkSize: 100
> processorRef: direct:processItem
> {code}
> h2. Configuration Options
> || Option || Default || Description ||
> | {{chunkSize}} | 100 | Number of items per chunk |
> | {{processorRef}} | — | Endpoint URI or bean reference that processes each
> item |
> | {{steps}} | — | Comma-separated endpoint URIs for multi-step processing
> (mutually exclusive with {{processorRef}}) |
> | {{acceptPolicy}} | {{ALL}} | Which items are eligible for subsequent steps:
> {{ALL}}, {{NO_FAILURES}}, {{FAILURES_ONLY}} |
> | {{errorThreshold}} | 1.0 | Fraction of failures (0.0–1.0) before aborting |
> | {{maxFailedRecords}} | -1 | Absolute failure count before aborting (-1 =
> disabled) |
> | {{watermarkStore}} | — | Bean reference to a {{Map<String, String>}} for
> watermark tracking |
> | {{watermarkKey}} | jobName | Key used in the watermark store |
> | {{watermarkExpression}} | — | Simple expression to extract a watermark
> value from each item |
> | {{parallelProcessing}} | false | Process items within each chunk in
> parallel |
> | {{aggregationStrategy}} | — | Bean reference to an {{AggregationStrategy}} |
> | {{onCompleteRef}} | — | Endpoint called after every bulk execution with
> {{BulkResult}} |
--
This message was sent by Atlassian Jira
(v8.20.10#820010)