This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new c96049dec6ab CAMEL-23120 - camel-docling - Implement batchSize 
sub-batch partitioning in batch processing (#21689)
c96049dec6ab is described below

commit c96049dec6abed15ebf6897459d3a09501f17d06
Author: Andrea Cosentino <[email protected]>
AuthorDate: Tue Mar 3 10:36:47 2026 +0100

    CAMEL-23120 - camel-docling - Implement batchSize sub-batch partitioning in 
batch processing (#21689)
    
    The batchSize configuration parameter (default 10) was declared and read
    from headers in processBatchConversion() and processBatchStructuredData(),
    but the value was never actually applied. Both convertDocumentsBatch() and
    convertStructuredDataBatch() submitted all documents to the executor at
    once regardless of batchSize, making the parameter a no-op.
    
    This change makes batchSize control how many documents are submitted per
    sub-batch. Documents are partitioned into chunks of batchSize and each
    sub-batch is processed to completion before starting the next one. Within
    each sub-batch, up to batchParallelism threads run concurrently. The
    overall batchTimeout is tracked across sub-batches so remaining time
    decreases as sub-batches complete, and failOnFirstError stops processing
    across sub-batch boundaries.
    
    This provides back-pressure and controls memory usage when processing
    large document sets, preventing the creation of unbounded numbers of
    CompletableFutures.
    
    Signed-off-by: Andrea Cosentino <[email protected]>
---
 .../apache/camel/catalog/components/docling.json   |   4 +-
 .../apache/camel/component/docling/docling.json    |   4 +-
 .../component/docling/DoclingConfiguration.java    |   5 +-
 .../camel/component/docling/DoclingProducer.java   | 344 ++++++++++++---------
 .../component/docling/BatchProcessingTest.java     |  10 +
 .../dsl/DoclingComponentBuilderFactory.java        |   7 +-
 .../dsl/DoclingEndpointBuilderFactory.java         |  14 +-
 7 files changed, 226 insertions(+), 162 deletions(-)

diff --git 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/docling.json
 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/docling.json
index 362c01fdb5d0..e10b6a98e0c2 100644
--- 
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/docling.json
+++ 
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/docling.json
@@ -61,7 +61,7 @@
     "workingDirectory": { "index": 34, "kind": "property", "displayName": 
"Working Directory", "group": "advanced", "label": "advanced", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"deprecationNote": "", "autowired": false, "secret": false, 
"configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Working directory for 
Docling execution" },
     "batchFailOnFirstError": { "index": 35, "kind": "property", "displayName": 
"Batch Fail On First Error", "group": "batch", "label": "batch", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 
true, "configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Fail entire batch on 
first error (true) or conti [...]
     "batchParallelism": { "index": 36, "kind": "property", "displayName": 
"Batch Parallelism", "group": "batch", "label": "batch", "required": false, 
"type": "integer", "javaType": "int", "deprecated": false, "deprecationNote": 
"", "autowired": false, "secret": false, "defaultValue": 4, 
"configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Number of parallel 
threads for batch processing" },
-    "batchSize": { "index": 37, "kind": "property", "displayName": "Batch 
Size", "group": "batch", "label": "batch", "required": false, "type": 
"integer", "javaType": "int", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": false, "defaultValue": 10, "configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Maximum number of 
documents to process in a single batch (batch operations only)" },
+    "batchSize": { "index": 37, "kind": "property", "displayName": "Batch 
Size", "group": "batch", "label": "batch", "required": false, "type": 
"integer", "javaType": "int", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": false, "defaultValue": 10, "configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Number of documents to 
submit per sub-batch. Documents are partitioned into sub-b [...]
     "batchTimeout": { "index": 38, "kind": "property", "displayName": "Batch 
Timeout", "group": "batch", "label": "batch", "required": false, "type": 
"integer", "javaType": "long", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": false, "defaultValue": 300000, 
"configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Maximum time to wait for 
batch completion in milliseconds" },
     "splitBatchResults": { "index": 39, "kind": "property", "displayName": 
"Split Batch Results", "group": "batch", "label": "batch", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, 
"deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 
false, "configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Split batch results into 
individual exchanges (one per do [...]
     "chunkingIncludeRawText": { "index": 40, "kind": "property", 
"displayName": "Chunking Include Raw Text", "group": "chunking", "label": 
"chunking", "required": false, "type": "boolean", "javaType": 
"java.lang.Boolean", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Include raw text in 
chunk output" },
@@ -147,7 +147,7 @@
     "workingDirectory": { "index": 33, "kind": "parameter", "displayName": 
"Working Directory", "group": "advanced", "label": "advanced", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"deprecationNote": "", "autowired": false, "secret": false, 
"configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Working directory for 
Docling execution" },
     "batchFailOnFirstError": { "index": 34, "kind": "parameter", 
"displayName": "Batch Fail On First Error", "group": "batch", "label": "batch", 
"required": false, "type": "boolean", "javaType": "boolean", "deprecated": 
false, "deprecationNote": "", "autowired": false, "secret": false, 
"defaultValue": true, "configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Fail entire batch on 
first error (true) or cont [...]
     "batchParallelism": { "index": 35, "kind": "parameter", "displayName": 
"Batch Parallelism", "group": "batch", "label": "batch", "required": false, 
"type": "integer", "javaType": "int", "deprecated": false, "deprecationNote": 
"", "autowired": false, "secret": false, "defaultValue": 4, 
"configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Number of parallel 
threads for batch processing" },
-    "batchSize": { "index": 36, "kind": "parameter", "displayName": "Batch 
Size", "group": "batch", "label": "batch", "required": false, "type": 
"integer", "javaType": "int", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": false, "defaultValue": 10, "configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Maximum number of 
documents to process in a single batch (batch operations only)" },
+    "batchSize": { "index": 36, "kind": "parameter", "displayName": "Batch 
Size", "group": "batch", "label": "batch", "required": false, "type": 
"integer", "javaType": "int", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": false, "defaultValue": 10, "configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Number of documents to 
submit per sub-batch. Documents are partitioned into sub- [...]
     "batchTimeout": { "index": 37, "kind": "parameter", "displayName": "Batch 
Timeout", "group": "batch", "label": "batch", "required": false, "type": 
"integer", "javaType": "long", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": false, "defaultValue": 300000, 
"configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Maximum time to wait for 
batch completion in milliseconds" },
     "splitBatchResults": { "index": 38, "kind": "parameter", "displayName": 
"Split Batch Results", "group": "batch", "label": "batch", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, 
"deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 
false, "configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Split batch results into 
individual exchanges (one per d [...]
     "chunkingIncludeRawText": { "index": 39, "kind": "parameter", 
"displayName": "Chunking Include Raw Text", "group": "chunking", "label": 
"chunking", "required": false, "type": "boolean", "javaType": 
"java.lang.Boolean", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Include raw text in 
chunk output" },
diff --git 
a/components/camel-ai/camel-docling/src/generated/resources/META-INF/org/apache/camel/component/docling/docling.json
 
b/components/camel-ai/camel-docling/src/generated/resources/META-INF/org/apache/camel/component/docling/docling.json
index 362c01fdb5d0..e10b6a98e0c2 100644
--- 
a/components/camel-ai/camel-docling/src/generated/resources/META-INF/org/apache/camel/component/docling/docling.json
+++ 
b/components/camel-ai/camel-docling/src/generated/resources/META-INF/org/apache/camel/component/docling/docling.json
@@ -61,7 +61,7 @@
     "workingDirectory": { "index": 34, "kind": "property", "displayName": 
"Working Directory", "group": "advanced", "label": "advanced", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"deprecationNote": "", "autowired": false, "secret": false, 
"configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Working directory for 
Docling execution" },
     "batchFailOnFirstError": { "index": 35, "kind": "property", "displayName": 
"Batch Fail On First Error", "group": "batch", "label": "batch", "required": 
false, "type": "boolean", "javaType": "boolean", "deprecated": false, 
"deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 
true, "configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Fail entire batch on 
first error (true) or conti [...]
     "batchParallelism": { "index": 36, "kind": "property", "displayName": 
"Batch Parallelism", "group": "batch", "label": "batch", "required": false, 
"type": "integer", "javaType": "int", "deprecated": false, "deprecationNote": 
"", "autowired": false, "secret": false, "defaultValue": 4, 
"configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Number of parallel 
threads for batch processing" },
-    "batchSize": { "index": 37, "kind": "property", "displayName": "Batch 
Size", "group": "batch", "label": "batch", "required": false, "type": 
"integer", "javaType": "int", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": false, "defaultValue": 10, "configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Maximum number of 
documents to process in a single batch (batch operations only)" },
+    "batchSize": { "index": 37, "kind": "property", "displayName": "Batch 
Size", "group": "batch", "label": "batch", "required": false, "type": 
"integer", "javaType": "int", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": false, "defaultValue": 10, "configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Number of documents to 
submit per sub-batch. Documents are partitioned into sub-b [...]
     "batchTimeout": { "index": 38, "kind": "property", "displayName": "Batch 
Timeout", "group": "batch", "label": "batch", "required": false, "type": 
"integer", "javaType": "long", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": false, "defaultValue": 300000, 
"configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Maximum time to wait for 
batch completion in milliseconds" },
     "splitBatchResults": { "index": 39, "kind": "property", "displayName": 
"Split Batch Results", "group": "batch", "label": "batch", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, 
"deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 
false, "configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Split batch results into 
individual exchanges (one per do [...]
     "chunkingIncludeRawText": { "index": 40, "kind": "property", 
"displayName": "Chunking Include Raw Text", "group": "chunking", "label": 
"chunking", "required": false, "type": "boolean", "javaType": 
"java.lang.Boolean", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Include raw text in 
chunk output" },
@@ -147,7 +147,7 @@
     "workingDirectory": { "index": 33, "kind": "parameter", "displayName": 
"Working Directory", "group": "advanced", "label": "advanced", "required": 
false, "type": "string", "javaType": "java.lang.String", "deprecated": false, 
"deprecationNote": "", "autowired": false, "secret": false, 
"configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Working directory for 
Docling execution" },
     "batchFailOnFirstError": { "index": 34, "kind": "parameter", 
"displayName": "Batch Fail On First Error", "group": "batch", "label": "batch", 
"required": false, "type": "boolean", "javaType": "boolean", "deprecated": 
false, "deprecationNote": "", "autowired": false, "secret": false, 
"defaultValue": true, "configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Fail entire batch on 
first error (true) or cont [...]
     "batchParallelism": { "index": 35, "kind": "parameter", "displayName": 
"Batch Parallelism", "group": "batch", "label": "batch", "required": false, 
"type": "integer", "javaType": "int", "deprecated": false, "deprecationNote": 
"", "autowired": false, "secret": false, "defaultValue": 4, 
"configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Number of parallel 
threads for batch processing" },
-    "batchSize": { "index": 36, "kind": "parameter", "displayName": "Batch 
Size", "group": "batch", "label": "batch", "required": false, "type": 
"integer", "javaType": "int", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": false, "defaultValue": 10, "configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Maximum number of 
documents to process in a single batch (batch operations only)" },
+    "batchSize": { "index": 36, "kind": "parameter", "displayName": "Batch 
Size", "group": "batch", "label": "batch", "required": false, "type": 
"integer", "javaType": "int", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": false, "defaultValue": 10, "configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Number of documents to 
submit per sub-batch. Documents are partitioned into sub- [...]
     "batchTimeout": { "index": 37, "kind": "parameter", "displayName": "Batch 
Timeout", "group": "batch", "label": "batch", "required": false, "type": 
"integer", "javaType": "long", "deprecated": false, "deprecationNote": "", 
"autowired": false, "secret": false, "defaultValue": 300000, 
"configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Maximum time to wait for 
batch completion in milliseconds" },
     "splitBatchResults": { "index": 38, "kind": "parameter", "displayName": 
"Split Batch Results", "group": "batch", "label": "batch", "required": false, 
"type": "boolean", "javaType": "boolean", "deprecated": false, 
"deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 
false, "configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Split batch results into 
individual exchanges (one per d [...]
     "chunkingIncludeRawText": { "index": 39, "kind": "parameter", 
"displayName": "Chunking Include Raw Text", "group": "chunking", "label": 
"chunking", "required": false, "type": "boolean", "javaType": 
"java.lang.Boolean", "deprecated": false, "deprecationNote": "", "autowired": 
false, "secret": false, "defaultValue": false, "configurationClass": 
"org.apache.camel.component.docling.DoclingConfiguration", 
"configurationField": "configuration", "description": "Include raw text in 
chunk output" },
diff --git 
a/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingConfiguration.java
 
b/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingConfiguration.java
index a67ae6113b3a..3270a9adde80 100644
--- 
a/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingConfiguration.java
+++ 
b/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingConfiguration.java
@@ -113,7 +113,10 @@ public class DoclingConfiguration implements Cloneable {
     private long asyncTimeout = 300000; // 5 minutes
 
     @UriParam(label = "batch")
-    @Metadata(description = "Maximum number of documents to process in a 
single batch (batch operations only)",
+    @Metadata(description = "Number of documents to submit per sub-batch. 
Documents are partitioned into sub-batches of this size"
+                            + " and each sub-batch is processed before 
starting the next one. Within each sub-batch, up to"
+                            + " batchParallelism threads are used 
concurrently. This controls memory usage and back-pressure"
+                            + " when processing large document sets.",
               defaultValue = "10")
     private int batchSize = 10;
 
diff --git 
a/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java
 
b/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java
index 8be170af8d10..df0f851be49d 100644
--- 
a/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java
+++ 
b/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java
@@ -764,8 +764,9 @@ public class DoclingProducer extends DefaultProducer {
             List<String> inputSources, String outputFormat, int batchSize, int 
parallelism,
             boolean failOnFirstError, boolean useAsync, long batchTimeout) {
 
-        LOG.info("Starting batch conversion of {} documents with 
parallelism={}, failOnFirstError={}, timeout={}ms",
-                inputSources.size(), parallelism, failOnFirstError, 
batchTimeout);
+        LOG.info(
+                "Starting batch conversion of {} documents with batchSize={}, 
parallelism={}, failOnFirstError={}, timeout={}ms",
+                inputSources.size(), batchSize, parallelism, failOnFirstError, 
batchTimeout);
 
         BatchProcessingResults results = new BatchProcessingResults();
         results.setStartTimeMs(System.currentTimeMillis());
@@ -776,101 +777,118 @@ public class DoclingProducer extends DefaultProducer {
         AtomicBoolean shouldCancel = new AtomicBoolean(false);
 
         try {
-            // Create CompletableFutures for all conversion tasks
-            List<CompletableFuture<BatchConversionResult>> futures = new 
ArrayList<>();
-
-            for (String inputSource : inputSources) {
-                final int currentIndex = index.getAndIncrement();
-                final String documentId = "doc-" + currentIndex;
-
-                CompletableFuture<BatchConversionResult> future = 
CompletableFuture.supplyAsync(() -> {
-                    // Check if we should skip this task due to early 
termination
-                    if (failOnFirstError && shouldCancel.get()) {
-                        BatchConversionResult cancelledResult = new 
BatchConversionResult(documentId, inputSource);
-                        cancelledResult.setBatchIndex(currentIndex);
-                        cancelledResult.setSuccess(false);
-                        cancelledResult.setErrorMessage("Cancelled due to 
previous failure");
-                        return cancelledResult;
-                    }
+            // Partition documents into sub-batches of batchSize
+            for (int batchStart = 0; batchStart < inputSources.size(); 
batchStart += batchSize) {
+                if (failOnFirstError && shouldCancel.get()) {
+                    break;
+                }
 
-                    BatchConversionResult result = new 
BatchConversionResult(documentId, inputSource);
-                    result.setBatchIndex(currentIndex);
-                    long startTime = System.currentTimeMillis();
+                int batchEnd = Math.min(batchStart + batchSize, 
inputSources.size());
+                List<String> subBatch = inputSources.subList(batchStart, 
batchEnd);
 
-                    try {
-                        LOG.debug("Processing document {} (index {}): {}", 
documentId, currentIndex, inputSource);
+                LOG.debug("Processing sub-batch [{}-{}] of {} total documents",
+                        batchStart, batchEnd - 1, inputSources.size());
 
-                        String converted;
-                        if (useAsync) {
-                            converted = 
convertDocumentAsyncAndWait(inputSource, outputFormat);
-                        } else {
-                            converted = convertDocumentSync(inputSource, 
outputFormat);
+                List<CompletableFuture<BatchConversionResult>> futures = new 
ArrayList<>();
+
+                for (String inputSource : subBatch) {
+                    final int currentIndex = index.getAndIncrement();
+                    final String documentId = "doc-" + currentIndex;
+
+                    CompletableFuture<BatchConversionResult> future = 
CompletableFuture.supplyAsync(() -> {
+                        // Check if we should skip this task due to early 
termination
+                        if (failOnFirstError && shouldCancel.get()) {
+                            BatchConversionResult cancelledResult = new 
BatchConversionResult(documentId, inputSource);
+                            cancelledResult.setBatchIndex(currentIndex);
+                            cancelledResult.setSuccess(false);
+                            cancelledResult.setErrorMessage("Cancelled due to 
previous failure");
+                            return cancelledResult;
                         }
 
-                        result.setResult(converted);
-                        result.setSuccess(true);
-                        result.setProcessingTimeMs(System.currentTimeMillis() 
- startTime);
+                        BatchConversionResult result = new 
BatchConversionResult(documentId, inputSource);
+                        result.setBatchIndex(currentIndex);
+                        long startTime = System.currentTimeMillis();
 
-                        LOG.debug("Successfully processed document {} in 
{}ms", documentId,
-                                result.getProcessingTimeMs());
+                        try {
+                            LOG.debug("Processing document {} (index {}): {}", 
documentId, currentIndex, inputSource);
 
-                    } catch (Exception e) {
-                        result.setSuccess(false);
-                        result.setErrorMessage(e.getMessage());
-                        result.setProcessingTimeMs(System.currentTimeMillis() 
- startTime);
+                            String converted;
+                            if (useAsync) {
+                                converted = 
convertDocumentAsyncAndWait(inputSource, outputFormat);
+                            } else {
+                                converted = convertDocumentSync(inputSource, 
outputFormat);
+                            }
+
+                            result.setResult(converted);
+                            result.setSuccess(true);
+                            
result.setProcessingTimeMs(System.currentTimeMillis() - startTime);
+
+                            LOG.debug("Successfully processed document {} in 
{}ms", documentId,
+                                    result.getProcessingTimeMs());
 
-                        LOG.error("Failed to process document {} (index {}): 
{}", documentId, currentIndex,
-                                e.getMessage(), e);
+                        } catch (Exception e) {
+                            result.setSuccess(false);
+                            result.setErrorMessage(e.getMessage());
+                            
result.setProcessingTimeMs(System.currentTimeMillis() - startTime);
 
-                        // Signal other tasks to cancel if failOnFirstError is 
enabled
-                        if (failOnFirstError) {
-                            shouldCancel.set(true);
+                            LOG.error("Failed to process document {} (index 
{}): {}", documentId, currentIndex,
+                                    e.getMessage(), e);
+
+                            // Signal other tasks to cancel if 
failOnFirstError is enabled
+                            if (failOnFirstError) {
+                                shouldCancel.set(true);
+                            }
                         }
-                    }
 
-                    return result;
-                }, executor);
+                        return result;
+                    }, executor);
 
-                futures.add(future);
-            }
+                    futures.add(future);
+                }
 
-            // Wait for all futures to complete with timeout
-            CompletableFuture<Void> allOf = 
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+                // Wait for sub-batch to complete with remaining timeout
+                long elapsed = System.currentTimeMillis() - 
results.getStartTimeMs();
+                long remainingTimeout = batchTimeout - elapsed;
+                if (remainingTimeout <= 0) {
+                    futures.forEach(f -> f.cancel(true));
+                    throw new RuntimeException("Batch processing timed out 
after " + batchTimeout + "ms");
+                }
 
-            try {
-                allOf.get(batchTimeout, TimeUnit.MILLISECONDS);
-            } catch (TimeoutException e) {
-                LOG.error("Batch processing timed out after {}ms", 
batchTimeout);
-                // Cancel all incomplete futures
-                futures.forEach(f -> f.cancel(true));
-                throw new RuntimeException("Batch processing timed out after " 
+ batchTimeout + "ms", e);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                LOG.error("Batch processing interrupted", e);
-                futures.forEach(f -> f.cancel(true));
-                throw new RuntimeException("Batch processing interrupted", e);
-            } catch (Exception e) {
-                LOG.error("Batch processing failed", e);
-                futures.forEach(f -> f.cancel(true));
-                throw new RuntimeException("Batch processing failed", e);
-            }
+                CompletableFuture<Void> allOf = 
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
 
-            // Collect all results
-            for (CompletableFuture<BatchConversionResult> future : futures) {
                 try {
-                    BatchConversionResult result = future.getNow(null);
-                    if (result != null) {
-                        results.addResult(result);
-
-                        // If failOnFirstError and we hit a failure, stop 
adding more results
-                        if (failOnFirstError && !result.isSuccess()) {
-                            LOG.warn("Failing batch due to error in document 
{}: {}", result.getDocumentId(),
-                                    result.getErrorMessage());
-                            break;
+                    allOf.get(remainingTimeout, TimeUnit.MILLISECONDS);
+                } catch (TimeoutException e) {
+                    LOG.error("Batch processing timed out after {}ms", 
batchTimeout);
+                    futures.forEach(f -> f.cancel(true));
+                    throw new RuntimeException("Batch processing timed out 
after " + batchTimeout + "ms", e);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    LOG.error("Batch processing interrupted", e);
+                    futures.forEach(f -> f.cancel(true));
+                    throw new RuntimeException("Batch processing interrupted", 
e);
+                } catch (Exception e) {
+                    LOG.error("Batch processing failed", e);
+                    futures.forEach(f -> f.cancel(true));
+                    throw new RuntimeException("Batch processing failed", e);
+                }
+
+                // Collect results from this sub-batch
+                for (CompletableFuture<BatchConversionResult> future : 
futures) {
+                    try {
+                        BatchConversionResult result = future.getNow(null);
+                        if (result != null) {
+                            results.addResult(result);
+
+                            if (failOnFirstError && !result.isSuccess()) {
+                                LOG.warn("Failing batch due to error in 
document {}: {}", result.getDocumentId(),
+                                        result.getErrorMessage());
+                                break;
+                            }
                         }
+                    } catch (Exception e) {
+                        LOG.error("Error retrieving result", e);
                     }
-                } catch (Exception e) {
-                    LOG.error("Error retrieving result", e);
                 }
             }
 
@@ -1115,8 +1133,8 @@ public class DoclingProducer extends DefaultProducer {
             boolean failOnFirstError, boolean useAsync, long batchTimeout) {
 
         LOG.info(
-                "Starting batch structured data extraction of {} documents 
with parallelism={}, failOnFirstError={}, timeout={}ms",
-                inputSources.size(), parallelism, failOnFirstError, 
batchTimeout);
+                "Starting batch structured data extraction of {} documents 
with batchSize={}, parallelism={}, failOnFirstError={}, timeout={}ms",
+                inputSources.size(), batchSize, parallelism, failOnFirstError, 
batchTimeout);
 
         BatchProcessingResults results = new BatchProcessingResults();
         results.setStartTimeMs(System.currentTimeMillis());
@@ -1127,93 +1145,117 @@ public class DoclingProducer extends DefaultProducer {
         AtomicBoolean shouldCancel = new AtomicBoolean(false);
 
         try {
-            List<CompletableFuture<BatchConversionResult>> futures = new 
ArrayList<>();
-
-            for (String inputSource : inputSources) {
-                final int currentIndex = index.getAndIncrement();
-                final String documentId = "doc-" + currentIndex;
-
-                CompletableFuture<BatchConversionResult> future = 
CompletableFuture.supplyAsync(() -> {
-                    if (failOnFirstError && shouldCancel.get()) {
-                        BatchConversionResult cancelledResult = new 
BatchConversionResult(documentId, inputSource);
-                        cancelledResult.setBatchIndex(currentIndex);
-                        cancelledResult.setSuccess(false);
-                        cancelledResult.setErrorMessage("Cancelled due to 
previous failure");
-                        return cancelledResult;
-                    }
+            // Partition documents into sub-batches of batchSize
+            for (int batchStart = 0; batchStart < inputSources.size(); 
batchStart += batchSize) {
+                if (failOnFirstError && shouldCancel.get()) {
+                    break;
+                }
 
-                    BatchConversionResult result = new 
BatchConversionResult(documentId, inputSource);
-                    result.setBatchIndex(currentIndex);
-                    long startTime = System.currentTimeMillis();
+                int batchEnd = Math.min(batchStart + batchSize, 
inputSources.size());
+                List<String> subBatch = inputSources.subList(batchStart, 
batchEnd);
 
-                    try {
-                        LOG.debug("Extracting structured data from document {} 
(index {}): {}", documentId, currentIndex,
-                                inputSource);
+                LOG.debug("Processing sub-batch [{}-{}] of {} total documents",
+                        batchStart, batchEnd - 1, inputSources.size());
 
-                        ConvertDocumentRequest request = 
buildStructuredDataRequest(inputSource);
-                        String converted;
-                        if (useAsync) {
-                            converted = convertDocumentAsyncAndWait(request);
-                        } else {
-                            converted = convertDocumentSync(request);
+                List<CompletableFuture<BatchConversionResult>> futures = new 
ArrayList<>();
+
+                for (String inputSource : subBatch) {
+                    final int currentIndex = index.getAndIncrement();
+                    final String documentId = "doc-" + currentIndex;
+
+                    CompletableFuture<BatchConversionResult> future = 
CompletableFuture.supplyAsync(() -> {
+                        if (failOnFirstError && shouldCancel.get()) {
+                            BatchConversionResult cancelledResult = new 
BatchConversionResult(documentId, inputSource);
+                            cancelledResult.setBatchIndex(currentIndex);
+                            cancelledResult.setSuccess(false);
+                            cancelledResult.setErrorMessage("Cancelled due to 
previous failure");
+                            return cancelledResult;
                         }
 
-                        result.setResult(converted);
-                        result.setSuccess(true);
-                        result.setProcessingTimeMs(System.currentTimeMillis() 
- startTime);
+                        BatchConversionResult result = new 
BatchConversionResult(documentId, inputSource);
+                        result.setBatchIndex(currentIndex);
+                        long startTime = System.currentTimeMillis();
 
-                    } catch (Exception e) {
-                        result.setSuccess(false);
-                        result.setErrorMessage(e.getMessage());
-                        result.setProcessingTimeMs(System.currentTimeMillis() 
- startTime);
+                        try {
+                            LOG.debug("Extracting structured data from 
document {} (index {}): {}", documentId,
+                                    currentIndex, inputSource);
 
-                        LOG.error("Failed to extract structured data from 
document {} (index {}): {}", documentId,
-                                currentIndex, e.getMessage(), e);
+                            ConvertDocumentRequest request = 
buildStructuredDataRequest(inputSource);
+                            String converted;
+                            if (useAsync) {
+                                converted = 
convertDocumentAsyncAndWait(request);
+                            } else {
+                                converted = convertDocumentSync(request);
+                            }
+
+                            result.setResult(converted);
+                            result.setSuccess(true);
+                            
result.setProcessingTimeMs(System.currentTimeMillis() - startTime);
+
+                        } catch (Exception e) {
+                            result.setSuccess(false);
+                            result.setErrorMessage(e.getMessage());
+                            
result.setProcessingTimeMs(System.currentTimeMillis() - startTime);
+
+                            LOG.error("Failed to extract structured data from 
document {} (index {}): {}", documentId,
+                                    currentIndex, e.getMessage(), e);
 
-                        if (failOnFirstError) {
-                            shouldCancel.set(true);
+                            if (failOnFirstError) {
+                                shouldCancel.set(true);
+                            }
                         }
-                    }
 
-                    return result;
-                }, executor);
+                        return result;
+                    }, executor);
 
-                futures.add(future);
-            }
+                    futures.add(future);
+                }
 
-            CompletableFuture<Void> allOf = 
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+                // Wait for sub-batch to complete with remaining timeout
+                long elapsed = System.currentTimeMillis() - 
results.getStartTimeMs();
+                long remainingTimeout = batchTimeout - elapsed;
+                if (remainingTimeout <= 0) {
+                    futures.forEach(f -> f.cancel(true));
+                    throw new RuntimeException(
+                            "Batch structured data extraction timed out after 
" + batchTimeout + "ms");
+                }
 
-            try {
-                allOf.get(batchTimeout, TimeUnit.MILLISECONDS);
-            } catch (TimeoutException e) {
-                LOG.error("Batch structured data extraction timed out after 
{}ms", batchTimeout);
-                futures.forEach(f -> f.cancel(true));
-                throw new RuntimeException("Batch structured data extraction 
timed out after " + batchTimeout + "ms", e);
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                LOG.error("Batch structured data extraction interrupted", e);
-                futures.forEach(f -> f.cancel(true));
-                throw new RuntimeException("Batch structured data extraction 
interrupted", e);
-            } catch (Exception e) {
-                LOG.error("Batch structured data extraction failed", e);
-                futures.forEach(f -> f.cancel(true));
-                throw new RuntimeException("Batch structured data extraction 
failed", e);
-            }
+                CompletableFuture<Void> allOf = 
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
 
-            for (CompletableFuture<BatchConversionResult> future : futures) {
                 try {
-                    BatchConversionResult result = future.getNow(null);
-                    if (result != null) {
-                        results.addResult(result);
-
-                        if (failOnFirstError && !result.isSuccess()) {
-                            LOG.warn("Failing batch due to error in document 
{}: {}", result.getDocumentId(),
-                                    result.getErrorMessage());
-                            break;
+                    allOf.get(remainingTimeout, TimeUnit.MILLISECONDS);
+                } catch (TimeoutException e) {
+                    LOG.error("Batch structured data extraction timed out 
after {}ms", batchTimeout);
+                    futures.forEach(f -> f.cancel(true));
+                    throw new RuntimeException(
+                            "Batch structured data extraction timed out after 
" + batchTimeout + "ms", e);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    LOG.error("Batch structured data extraction interrupted", 
e);
+                    futures.forEach(f -> f.cancel(true));
+                    throw new RuntimeException("Batch structured data 
extraction interrupted", e);
+                } catch (Exception e) {
+                    LOG.error("Batch structured data extraction failed", e);
+                    futures.forEach(f -> f.cancel(true));
+                    throw new RuntimeException("Batch structured data 
extraction failed", e);
+                }
+
+                // Collect results from this sub-batch
+                for (CompletableFuture<BatchConversionResult> future : 
futures) {
+                    try {
+                        BatchConversionResult result = future.getNow(null);
+                        if (result != null) {
+                            results.addResult(result);
+
+                            if (failOnFirstError && !result.isSuccess()) {
+                                LOG.warn("Failing batch due to error in 
document {}: {}", result.getDocumentId(),
+                                        result.getErrorMessage());
+                                break;
+                            }
                         }
+                    } catch (Exception e) {
+                        LOG.error("Error retrieving result", e);
                     }
-                } catch (Exception e) {
-                    LOG.error("Error retrieving result", e);
                 }
             }
 
diff --git 
a/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/BatchProcessingTest.java
 
b/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/BatchProcessingTest.java
index e1d902ce1c29..a70bda0e10bd 100644
--- 
a/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/BatchProcessingTest.java
+++ 
b/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/BatchProcessingTest.java
@@ -141,6 +141,16 @@ public class BatchProcessingTest extends CamelTestSupport {
         assertTrue(config.isSplitBatchResults());
     }
 
+    @Test
+    public void testBatchSizeParsedFromEndpointUri() throws Exception {
+        DoclingEndpoint endpoint = (DoclingEndpoint) context.getEndpoint(
+                
"docling:convert?useDoclingServe=true&batchSize=5&batchParallelism=2");
+        DoclingConfiguration config = endpoint.getConfiguration();
+
+        assertEquals(5, config.getBatchSize());
+        assertEquals(2, config.getBatchParallelism());
+    }
+
     @Test
     public void testBatchTimeoutConfiguration() {
         DoclingConfiguration config = new DoclingConfiguration();
diff --git 
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DoclingComponentBuilderFactory.java
 
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DoclingComponentBuilderFactory.java
index ddfac58453aa..97365d57ab3a 100644
--- 
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DoclingComponentBuilderFactory.java
+++ 
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DoclingComponentBuilderFactory.java
@@ -678,8 +678,11 @@ public interface DoclingComponentBuilderFactory {
     
         
         /**
-         * Maximum number of documents to process in a single batch (batch
-         * operations only).
+         * Number of documents to submit per sub-batch. Documents are
+         * partitioned into sub-batches of this size and each sub-batch is
+         * processed before starting the next one. Within each sub-batch, up to
+         * batchParallelism threads are used concurrently. This controls memory
+         * usage and back-pressure when processing large document sets.
          * 
          * The option is a: &lt;code&gt;int&lt;/code&gt; type.
          * 
diff --git 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DoclingEndpointBuilderFactory.java
 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DoclingEndpointBuilderFactory.java
index 0ee94f1e577f..8801cda626ee 100644
--- 
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DoclingEndpointBuilderFactory.java
+++ 
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DoclingEndpointBuilderFactory.java
@@ -310,8 +310,11 @@ public interface DoclingEndpointBuilderFactory {
             return this;
         }
         /**
-         * Maximum number of documents to process in a single batch (batch
-         * operations only).
+         * Number of documents to submit per sub-batch. Documents are
+         * partitioned into sub-batches of this size and each sub-batch is
+         * processed before starting the next one. Within each sub-batch, up to
+         * batchParallelism threads are used concurrently. This controls memory
+         * usage and back-pressure when processing large document sets.
          * 
          * The option is a: <code>int</code> type.
          * 
@@ -326,8 +329,11 @@ public interface DoclingEndpointBuilderFactory {
             return this;
         }
         /**
-         * Maximum number of documents to process in a single batch (batch
-         * operations only).
+         * Number of documents to submit per sub-batch. Documents are
+         * partitioned into sub-batches of this size and each sub-batch is
+         * processed before starting the next one. Within each sub-batch, up to
+         * batchParallelism threads are used concurrently. This controls memory
+         * usage and back-pressure when processing large document sets.
          * 
          * The option will be converted to a <code>int</code> type.
          * 


Reply via email to