[
https://issues.apache.org/jira/browse/CAMEL-23264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Claus Ibsen updated CAMEL-23264:
--------------------------------
Fix Version/s: (was: 4.20.0)
> Enhance Splitter EIP with chunking, error threshold, failure tracking, and
> watermark support
> --------------------------------------------------------------------------------------------
>
> Key: CAMEL-23264
> URL: https://issues.apache.org/jira/browse/CAMEL-23264
> Project: Camel
> Issue Type: Improvement
> Components: camel-core
> Reporter: Guillaume Nodet
> Assignee: Guillaume Nodet
> Priority: Major
> Fix For: 4.21.0
>
>
> Enhance the Splitter EIP to support common bulk processing patterns that
> currently require significant manual assembly.
> This replaces the proposed camel-bulk component (CAMEL-23240), which was
> closed after analysis showed these features are better served as Splitter EIP
> enhancements rather than a standalone component. See PR #22159 for the full
> analysis: https://github.com/apache/camel/pull/22159#issuecomment-4144168967
> h3. Enhancements
> *1. Chunking — {{group(int)}}*
> Add a {{group}} option to the Splitter that works with any Iterable (not just
> tokenized strings). The iterator is wrapped with a chunking iterator, so each
> split exchange body becomes a {{List}} of up to N items.
> {code:java}
> .split(body()).group(100)
> .to("direct:processChunk") // body is List of up to 100 items
> .end();
> {code}
> *2. Error Threshold — {{errorThreshold(double)}} / {{maxFailedRecords(int)}}*
> Generalizes {{stopOnException()}} from boolean (stop on first failure) to
> ratio/count-based abort. The Splitter tracks failures internally and aborts
> mid-stream when the threshold is exceeded.
> {code:java}
> .split(body()).errorThreshold(0.1).maxFailedRecords(50)
> .to("direct:process")
> .end();
> {code}
> * {{errorThreshold(0.1)}} — abort if more than 10% of items fail
> * {{maxFailedRecords(50)}} — abort after more than 50 failures
> * Both can be combined — abort when either is exceeded
> *3. Failure Tracking — built-in {{SplitResult}}*
> When error threshold is configured and no custom {{AggregationStrategy}} is
> set, the Splitter produces a {{SplitResult}} containing:
> * {{totalItems}}, {{successCount}}, {{failureCount}}, {{duration}},
> {{aborted}}
> * {{failures}} list with {{(index, item, exception)}} for each failed item
> * Output headers: {{CamelSplitTotal}}, {{CamelSplitSuccess}},
> {{CamelSplitFailed}}, etc.
> *4. Watermark Tracking via ResumeStrategy*
> Add {{resumeStrategy}}, {{watermarkKey}}, and {{watermarkExpression}} options
> for resume-from-last-position patterns. Uses Camel's existing
> {{ResumeStrategy}} SPI (from {{camel-api}}) for offset persistence.
> {code:java}
> ResumeStrategy strategy = ...; // any ResumeStrategy implementation
> .split(body())
> .resumeStrategy(strategy, "importJob")
> .to("direct:process")
> .end();
> {code}
> * *Index-based*: tracks how many items have been processed, skips
> already-processed items on subsequent runs (assumes stable ordering)
> * *Value-based* (with {{watermarkExpression}}): evaluates an expression on
> each successful item (e.g., a timestamp), stores the last value via
> {{ResumeStrategy.updateLastOffset()}} for upstream filtering
> * Watermark reads use lazy loading from the strategy's {{ResumeCache}} on
> each exchange
> * Watermark is not updated on abort, allowing retry from the same position
> h3. What This Replaces
> The proposed camel-bulk component (CAMEL-23240) provided these features as a
> standalone component. After review, we concluded:
> * Most features are natural extensions of the Splitter EIP
> * Users benefit from enhancing what they already know rather than learning a
> new component
> * The features are independently useful and compose with existing Splitter
> options ({{parallelProcessing}}, {{streaming}}, {{shareUnitOfWork}}, etc.)
> * The Bulk component's multi-step accept policy is already expressible with
> {{doTry/doCatch}} inside the split route
> h3. PR
> https://github.com/apache/camel/pull/22300
--
This message was sent by Atlassian Jira
(v8.20.10#820010)