[
https://issues.apache.org/jira/browse/CAMEL-23264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Guillaume Nodet updated CAMEL-23264:
------------------------------------
Description:
Enhance the Splitter EIP to support common bulk processing patterns that
currently require significant manual assembly.
This replaces the proposed camel-bulk component (CAMEL-23240), which was closed
after analysis showed these features are better served as Splitter EIP
enhancements rather than a standalone component. See PR #22159 for the full
analysis: https://github.com/apache/camel/pull/22159#issuecomment-4144168967
h3. Enhancements
*1. Chunking — {{group(int)}}*
Add a {{group}} option to the Splitter that works with any Iterable (not just
tokenized strings). The iterator is wrapped with a chunking iterator, so each
split exchange body becomes a {{List}} of up to N items.
{code:java}
.split(body()).group(100)
.to("direct:processChunk") // body is List of up to 100 items
.end();
{code}
*2. Error Threshold — {{errorThreshold(double)}} / {{maxFailedRecords(int)}}*
Generalizes {{stopOnException()}} from boolean (stop on first failure) to
ratio/count-based abort. The Splitter tracks failures internally and aborts
mid-stream when the threshold is exceeded.
{code:java}
.split(body()).errorThreshold(0.1).maxFailedRecords(50)
.to("direct:process")
.end();
{code}
* {{errorThreshold(0.1)}} — abort if more than 10% of items fail
* {{maxFailedRecords(50)}} — abort after more than 50 failures
* Both can be combined — abort when either is exceeded
*3. Failure Tracking — built-in {{SplitResult}}*
When error threshold is configured and no custom {{AggregationStrategy}} is
set, the Splitter produces a {{SplitResult}} containing:
* {{totalItems}}, {{successCount}}, {{failureCount}}, {{duration}}, {{aborted}}
* {{failures}} list with {{(index, item, exception)}} for each failed item
* Output headers: {{CamelSplitTotal}}, {{CamelSplitSuccess}},
{{CamelSplitFailed}}, etc.
*4. Watermark Tracking via ResumeStrategy*
Add {{resumeStrategy}}, {{watermarkKey}}, and {{watermarkExpression}} options
for resume-from-last-position patterns. Uses Camel's existing
{{ResumeStrategy}} SPI (from {{camel-api}}) for offset persistence.
{code:java}
ResumeStrategy strategy = ...; // any ResumeStrategy implementation
.split(body())
.resumeStrategy(strategy, "importJob")
.to("direct:process")
.end();
{code}
* *Index-based*: tracks how many items have been processed, skips
already-processed items on subsequent runs (assumes stable ordering)
* *Value-based* (with {{watermarkExpression}}): evaluates an expression on each
successful item (e.g., a timestamp), stores the last value via
{{ResumeStrategy.updateLastOffset()}} for upstream filtering
* Watermark reads use lazy loading from the strategy's {{ResumeCache}} on each
exchange
* Watermark is not updated on abort, allowing retry from the same position
h3. What This Replaces
The proposed camel-bulk component (CAMEL-23240) provided these features as a
standalone component. After review, we concluded:
* Most features are natural extensions of the Splitter EIP
* Users benefit from enhancing what they already know rather than learning a
new component
* The features are independently useful and compose with existing Splitter
options ({{parallelProcessing}}, {{streaming}}, {{shareUnitOfWork}}, etc.)
* The Bulk component's multi-step accept policy is already expressible with
{{doTry/doCatch}} inside the split route
h3. PR
https://github.com/apache/camel/pull/22300
was:
Enhance the Splitter EIP to support common bulk processing patterns that
currently require significant manual assembly.
This replaces the proposed camel-bulk component (CAMEL-23240), which was closed
after analysis showed these features are better served as Splitter EIP
enhancements rather than a standalone component. See PR #22159 for the full
analysis: https://github.com/apache/camel/pull/22159#issuecomment-4144168967
h3. Proposed Enhancements
*1. Chunking — {{group(int)}}*
Add a {{group}} option to the Splitter that works with any Iterable (not just
tokenized strings). The iterator is wrapped with a chunking iterator, so each
split exchange body becomes a {{List}} of up to N items.
{code:java}
.split(body()).group(100)
.to("direct:processChunk") // body is List of up to 100 items
.end();
{code}
*2. Error Threshold — {{errorThreshold(double)}} / {{maxFailedRecords(int)}}*
Generalizes {{stopOnException()}} from boolean (stop on first failure) to
ratio/count-based abort. The Splitter tracks failures internally and aborts
mid-stream when the threshold is exceeded.
{code:java}
.split(body()).errorThreshold(0.1).maxFailedRecords(50)
.to("direct:process")
.end();
{code}
* {{errorThreshold(0.1)}} — abort if more than 10% of items fail
* {{maxFailedRecords(50)}} — abort after more than 50 failures
* Both can be combined — abort when either is exceeded
*3. Failure Tracking — built-in {{SplitResult}}*
When error threshold is configured and no custom {{AggregationStrategy}} is
set, the Splitter produces a {{SplitResult}} containing:
* {{totalItems}}, {{successCount}}, {{failureCount}}, {{duration}}, {{aborted}}
* {{failures}} list with {{(index, item, exception)}} for each failed item
* Output headers: {{CamelSplitTotal}}, {{CamelSplitSuccess}},
{{CamelSplitFailed}}, etc.
*4. Watermark Tracking*
Add {{watermarkStore}}, {{watermarkKey}}, and {{watermarkExpression}} options
for resume-from-last-position patterns:
{code:java}
.split(body())
.watermarkStore("#myStore").watermarkKey("importJob")
.to("direct:process")
.end();
{code}
* *Index-based*: tracks how many items have been processed, skips
already-processed items on subsequent runs (assumes stable ordering)
* *Value-based* (with {{watermarkExpression}}): evaluates an expression on each
successful item (e.g., a timestamp), stores the last value for upstream
filtering
h3. What This Replaces
The proposed camel-bulk component (CAMEL-23240) provided these features as a
standalone component. After review, we concluded:
* Most features are natural extensions of the Splitter EIP
* Users benefit from enhancing what they already know rather than learning a
new component
* The features are independently useful and compose with existing Splitter
options ({{parallelProcessing}}, {{streaming}}, {{shareUnitOfWork}}, etc.)
* The Bulk component's multi-step accept policy is already expressible with
{{doTry/doCatch}} inside the split route
> Enhance Splitter EIP with chunking, error threshold, failure tracking, and
> watermark support
> --------------------------------------------------------------------------------------------
>
> Key: CAMEL-23264
> URL: https://issues.apache.org/jira/browse/CAMEL-23264
> Project: Camel
> Issue Type: Improvement
> Components: camel-core
> Reporter: Guillaume Nodet
> Assignee: Guillaume Nodet
> Priority: Major
> Fix For: 4.19.0
>
>
> Enhance the Splitter EIP to support common bulk processing patterns that
> currently require significant manual assembly.
> This replaces the proposed camel-bulk component (CAMEL-23240), which was
> closed after analysis showed these features are better served as Splitter EIP
> enhancements rather than a standalone component. See PR #22159 for the full
> analysis: https://github.com/apache/camel/pull/22159#issuecomment-4144168967
> h3. Enhancements
> *1. Chunking — {{group(int)}}*
> Add a {{group}} option to the Splitter that works with any Iterable (not just
> tokenized strings). The iterator is wrapped with a chunking iterator, so each
> split exchange body becomes a {{List}} of up to N items.
> {code:java}
> .split(body()).group(100)
> .to("direct:processChunk") // body is List of up to 100 items
> .end();
> {code}
> *2. Error Threshold — {{errorThreshold(double)}} / {{maxFailedRecords(int)}}*
> Generalizes {{stopOnException()}} from boolean (stop on first failure) to
> ratio/count-based abort. The Splitter tracks failures internally and aborts
> mid-stream when the threshold is exceeded.
> {code:java}
> .split(body()).errorThreshold(0.1).maxFailedRecords(50)
> .to("direct:process")
> .end();
> {code}
> * {{errorThreshold(0.1)}} — abort if more than 10% of items fail
> * {{maxFailedRecords(50)}} — abort after more than 50 failures
> * Both can be combined — abort when either is exceeded
> *3. Failure Tracking — built-in {{SplitResult}}*
> When error threshold is configured and no custom {{AggregationStrategy}} is
> set, the Splitter produces a {{SplitResult}} containing:
> * {{totalItems}}, {{successCount}}, {{failureCount}}, {{duration}},
> {{aborted}}
> * {{failures}} list with {{(index, item, exception)}} for each failed item
> * Output headers: {{CamelSplitTotal}}, {{CamelSplitSuccess}},
> {{CamelSplitFailed}}, etc.
> *4. Watermark Tracking via ResumeStrategy*
> Add {{resumeStrategy}}, {{watermarkKey}}, and {{watermarkExpression}} options
> for resume-from-last-position patterns. Uses Camel's existing
> {{ResumeStrategy}} SPI (from {{camel-api}}) for offset persistence.
> {code:java}
> ResumeStrategy strategy = ...; // any ResumeStrategy implementation
> .split(body())
> .resumeStrategy(strategy, "importJob")
> .to("direct:process")
> .end();
> {code}
> * *Index-based*: tracks how many items have been processed, skips
> already-processed items on subsequent runs (assumes stable ordering)
> * *Value-based* (with {{watermarkExpression}}): evaluates an expression on
> each successful item (e.g., a timestamp), stores the last value via
> {{ResumeStrategy.updateLastOffset()}} for upstream filtering
> * Watermark reads use lazy loading from the strategy's {{ResumeCache}} on
> each exchange
> * Watermark is not updated on abort, allowing retry from the same position
> h3. What This Replaces
> The proposed camel-bulk component (CAMEL-23240) provided these features as a
> standalone component. After review, we concluded:
> * Most features are natural extensions of the Splitter EIP
> * Users benefit from enhancing what they already know rather than learning a
> new component
> * The features are independently useful and compose with existing Splitter
> options ({{parallelProcessing}}, {{streaming}}, {{shareUnitOfWork}}, etc.)
> * The Bulk component's multi-step accept policy is already expressible with
> {{doTry/doCatch}} inside the split route
> h3. PR
> https://github.com/apache/camel/pull/22300
--
This message was sent by Atlassian Jira
(v8.20.10#820010)