This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new c96049dec6ab CAMEL-23120 - camel-docling - Implement batchSize
sub-batch partitioning in batch processing (#21689)
c96049dec6ab is described below
commit c96049dec6abed15ebf6897459d3a09501f17d06
Author: Andrea Cosentino <[email protected]>
AuthorDate: Tue Mar 3 10:36:47 2026 +0100
CAMEL-23120 - camel-docling - Implement batchSize sub-batch partitioning in
batch processing (#21689)
The batchSize configuration parameter (default 10) was declared and read
from headers in processBatchConversion() and processBatchStructuredData(),
but the value was never actually applied. Both convertDocumentsBatch() and
convertStructuredDataBatch() submitted all documents to the executor at
once regardless of batchSize, making the parameter a no-op.
This change makes batchSize control how many documents are submitted per
sub-batch. Documents are partitioned into chunks of batchSize and each
sub-batch is processed to completion before starting the next one. Within
each sub-batch, up to batchParallelism threads run concurrently. The
overall batchTimeout is tracked across sub-batches so remaining time
decreases as sub-batches complete, and failOnFirstError stops processing
across sub-batch boundaries.
This provides back-pressure and controls memory usage when processing
large document sets, preventing the creation of unbounded numbers of
CompletableFutures.
Signed-off-by: Andrea Cosentino <[email protected]>
---
.../apache/camel/catalog/components/docling.json | 4 +-
.../apache/camel/component/docling/docling.json | 4 +-
.../component/docling/DoclingConfiguration.java | 5 +-
.../camel/component/docling/DoclingProducer.java | 344 ++++++++++++---------
.../component/docling/BatchProcessingTest.java | 10 +
.../dsl/DoclingComponentBuilderFactory.java | 7 +-
.../dsl/DoclingEndpointBuilderFactory.java | 14 +-
7 files changed, 226 insertions(+), 162 deletions(-)
diff --git
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/docling.json
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/docling.json
index 362c01fdb5d0..e10b6a98e0c2 100644
---
a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/docling.json
+++
b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/docling.json
@@ -61,7 +61,7 @@
"workingDirectory": { "index": 34, "kind": "property", "displayName":
"Working Directory", "group": "advanced", "label": "advanced", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"deprecationNote": "", "autowired": false, "secret": false,
"configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Working directory for
Docling execution" },
"batchFailOnFirstError": { "index": 35, "kind": "property", "displayName":
"Batch Fail On First Error", "group": "batch", "label": "batch", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"deprecationNote": "", "autowired": false, "secret": false, "defaultValue":
true, "configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Fail entire batch on
first error (true) or conti [...]
"batchParallelism": { "index": 36, "kind": "property", "displayName":
"Batch Parallelism", "group": "batch", "label": "batch", "required": false,
"type": "integer", "javaType": "int", "deprecated": false, "deprecationNote":
"", "autowired": false, "secret": false, "defaultValue": 4,
"configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Number of parallel
threads for batch processing" },
- "batchSize": { "index": 37, "kind": "property", "displayName": "Batch
Size", "group": "batch", "label": "batch", "required": false, "type":
"integer", "javaType": "int", "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "defaultValue": 10, "configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Maximum number of
documents to process in a single batch (batch operations only)" },
+ "batchSize": { "index": 37, "kind": "property", "displayName": "Batch
Size", "group": "batch", "label": "batch", "required": false, "type":
"integer", "javaType": "int", "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "defaultValue": 10, "configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Number of documents to
submit per sub-batch. Documents are partitioned into sub-b [...]
"batchTimeout": { "index": 38, "kind": "property", "displayName": "Batch
Timeout", "group": "batch", "label": "batch", "required": false, "type":
"integer", "javaType": "long", "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "defaultValue": 300000,
"configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Maximum time to wait for
batch completion in milliseconds" },
"splitBatchResults": { "index": 39, "kind": "property", "displayName":
"Split Batch Results", "group": "batch", "label": "batch", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false,
"deprecationNote": "", "autowired": false, "secret": false, "defaultValue":
false, "configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Split batch results into
individual exchanges (one per do [...]
"chunkingIncludeRawText": { "index": 40, "kind": "property",
"displayName": "Chunking Include Raw Text", "group": "chunking", "label":
"chunking", "required": false, "type": "boolean", "javaType":
"java.lang.Boolean", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Include raw text in
chunk output" },
@@ -147,7 +147,7 @@
"workingDirectory": { "index": 33, "kind": "parameter", "displayName":
"Working Directory", "group": "advanced", "label": "advanced", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"deprecationNote": "", "autowired": false, "secret": false,
"configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Working directory for
Docling execution" },
"batchFailOnFirstError": { "index": 34, "kind": "parameter",
"displayName": "Batch Fail On First Error", "group": "batch", "label": "batch",
"required": false, "type": "boolean", "javaType": "boolean", "deprecated":
false, "deprecationNote": "", "autowired": false, "secret": false,
"defaultValue": true, "configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Fail entire batch on
first error (true) or cont [...]
"batchParallelism": { "index": 35, "kind": "parameter", "displayName":
"Batch Parallelism", "group": "batch", "label": "batch", "required": false,
"type": "integer", "javaType": "int", "deprecated": false, "deprecationNote":
"", "autowired": false, "secret": false, "defaultValue": 4,
"configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Number of parallel
threads for batch processing" },
- "batchSize": { "index": 36, "kind": "parameter", "displayName": "Batch
Size", "group": "batch", "label": "batch", "required": false, "type":
"integer", "javaType": "int", "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "defaultValue": 10, "configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Maximum number of
documents to process in a single batch (batch operations only)" },
+ "batchSize": { "index": 36, "kind": "parameter", "displayName": "Batch
Size", "group": "batch", "label": "batch", "required": false, "type":
"integer", "javaType": "int", "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "defaultValue": 10, "configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Number of documents to
submit per sub-batch. Documents are partitioned into sub- [...]
"batchTimeout": { "index": 37, "kind": "parameter", "displayName": "Batch
Timeout", "group": "batch", "label": "batch", "required": false, "type":
"integer", "javaType": "long", "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "defaultValue": 300000,
"configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Maximum time to wait for
batch completion in milliseconds" },
"splitBatchResults": { "index": 38, "kind": "parameter", "displayName":
"Split Batch Results", "group": "batch", "label": "batch", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false,
"deprecationNote": "", "autowired": false, "secret": false, "defaultValue":
false, "configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Split batch results into
individual exchanges (one per d [...]
"chunkingIncludeRawText": { "index": 39, "kind": "parameter",
"displayName": "Chunking Include Raw Text", "group": "chunking", "label":
"chunking", "required": false, "type": "boolean", "javaType":
"java.lang.Boolean", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Include raw text in
chunk output" },
diff --git
a/components/camel-ai/camel-docling/src/generated/resources/META-INF/org/apache/camel/component/docling/docling.json
b/components/camel-ai/camel-docling/src/generated/resources/META-INF/org/apache/camel/component/docling/docling.json
index 362c01fdb5d0..e10b6a98e0c2 100644
---
a/components/camel-ai/camel-docling/src/generated/resources/META-INF/org/apache/camel/component/docling/docling.json
+++
b/components/camel-ai/camel-docling/src/generated/resources/META-INF/org/apache/camel/component/docling/docling.json
@@ -61,7 +61,7 @@
"workingDirectory": { "index": 34, "kind": "property", "displayName":
"Working Directory", "group": "advanced", "label": "advanced", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"deprecationNote": "", "autowired": false, "secret": false,
"configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Working directory for
Docling execution" },
"batchFailOnFirstError": { "index": 35, "kind": "property", "displayName":
"Batch Fail On First Error", "group": "batch", "label": "batch", "required":
false, "type": "boolean", "javaType": "boolean", "deprecated": false,
"deprecationNote": "", "autowired": false, "secret": false, "defaultValue":
true, "configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Fail entire batch on
first error (true) or conti [...]
"batchParallelism": { "index": 36, "kind": "property", "displayName":
"Batch Parallelism", "group": "batch", "label": "batch", "required": false,
"type": "integer", "javaType": "int", "deprecated": false, "deprecationNote":
"", "autowired": false, "secret": false, "defaultValue": 4,
"configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Number of parallel
threads for batch processing" },
- "batchSize": { "index": 37, "kind": "property", "displayName": "Batch
Size", "group": "batch", "label": "batch", "required": false, "type":
"integer", "javaType": "int", "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "defaultValue": 10, "configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Maximum number of
documents to process in a single batch (batch operations only)" },
+ "batchSize": { "index": 37, "kind": "property", "displayName": "Batch
Size", "group": "batch", "label": "batch", "required": false, "type":
"integer", "javaType": "int", "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "defaultValue": 10, "configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Number of documents to
submit per sub-batch. Documents are partitioned into sub-b [...]
"batchTimeout": { "index": 38, "kind": "property", "displayName": "Batch
Timeout", "group": "batch", "label": "batch", "required": false, "type":
"integer", "javaType": "long", "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "defaultValue": 300000,
"configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Maximum time to wait for
batch completion in milliseconds" },
"splitBatchResults": { "index": 39, "kind": "property", "displayName":
"Split Batch Results", "group": "batch", "label": "batch", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false,
"deprecationNote": "", "autowired": false, "secret": false, "defaultValue":
false, "configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Split batch results into
individual exchanges (one per do [...]
"chunkingIncludeRawText": { "index": 40, "kind": "property",
"displayName": "Chunking Include Raw Text", "group": "chunking", "label":
"chunking", "required": false, "type": "boolean", "javaType":
"java.lang.Boolean", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Include raw text in
chunk output" },
@@ -147,7 +147,7 @@
"workingDirectory": { "index": 33, "kind": "parameter", "displayName":
"Working Directory", "group": "advanced", "label": "advanced", "required":
false, "type": "string", "javaType": "java.lang.String", "deprecated": false,
"deprecationNote": "", "autowired": false, "secret": false,
"configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Working directory for
Docling execution" },
"batchFailOnFirstError": { "index": 34, "kind": "parameter",
"displayName": "Batch Fail On First Error", "group": "batch", "label": "batch",
"required": false, "type": "boolean", "javaType": "boolean", "deprecated":
false, "deprecationNote": "", "autowired": false, "secret": false,
"defaultValue": true, "configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Fail entire batch on
first error (true) or cont [...]
"batchParallelism": { "index": 35, "kind": "parameter", "displayName":
"Batch Parallelism", "group": "batch", "label": "batch", "required": false,
"type": "integer", "javaType": "int", "deprecated": false, "deprecationNote":
"", "autowired": false, "secret": false, "defaultValue": 4,
"configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Number of parallel
threads for batch processing" },
- "batchSize": { "index": 36, "kind": "parameter", "displayName": "Batch
Size", "group": "batch", "label": "batch", "required": false, "type":
"integer", "javaType": "int", "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "defaultValue": 10, "configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Maximum number of
documents to process in a single batch (batch operations only)" },
+ "batchSize": { "index": 36, "kind": "parameter", "displayName": "Batch
Size", "group": "batch", "label": "batch", "required": false, "type":
"integer", "javaType": "int", "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "defaultValue": 10, "configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Number of documents to
submit per sub-batch. Documents are partitioned into sub- [...]
"batchTimeout": { "index": 37, "kind": "parameter", "displayName": "Batch
Timeout", "group": "batch", "label": "batch", "required": false, "type":
"integer", "javaType": "long", "deprecated": false, "deprecationNote": "",
"autowired": false, "secret": false, "defaultValue": 300000,
"configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Maximum time to wait for
batch completion in milliseconds" },
"splitBatchResults": { "index": 38, "kind": "parameter", "displayName":
"Split Batch Results", "group": "batch", "label": "batch", "required": false,
"type": "boolean", "javaType": "boolean", "deprecated": false,
"deprecationNote": "", "autowired": false, "secret": false, "defaultValue":
false, "configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Split batch results into
individual exchanges (one per d [...]
"chunkingIncludeRawText": { "index": 39, "kind": "parameter",
"displayName": "Chunking Include Raw Text", "group": "chunking", "label":
"chunking", "required": false, "type": "boolean", "javaType":
"java.lang.Boolean", "deprecated": false, "deprecationNote": "", "autowired":
false, "secret": false, "defaultValue": false, "configurationClass":
"org.apache.camel.component.docling.DoclingConfiguration",
"configurationField": "configuration", "description": "Include raw text in
chunk output" },
diff --git
a/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingConfiguration.java
b/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingConfiguration.java
index a67ae6113b3a..3270a9adde80 100644
---
a/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingConfiguration.java
+++
b/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingConfiguration.java
@@ -113,7 +113,10 @@ public class DoclingConfiguration implements Cloneable {
private long asyncTimeout = 300000; // 5 minutes
@UriParam(label = "batch")
- @Metadata(description = "Maximum number of documents to process in a
single batch (batch operations only)",
+ @Metadata(description = "Number of documents to submit per sub-batch.
Documents are partitioned into sub-batches of this size"
+ + " and each sub-batch is processed before
starting the next one. Within each sub-batch, up to"
+ + " batchParallelism threads are used
concurrently. This controls memory usage and back-pressure"
+ + " when processing large document sets.",
defaultValue = "10")
private int batchSize = 10;
diff --git
a/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java
b/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java
index 8be170af8d10..df0f851be49d 100644
---
a/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java
+++
b/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java
@@ -764,8 +764,9 @@ public class DoclingProducer extends DefaultProducer {
List<String> inputSources, String outputFormat, int batchSize, int
parallelism,
boolean failOnFirstError, boolean useAsync, long batchTimeout) {
- LOG.info("Starting batch conversion of {} documents with
parallelism={}, failOnFirstError={}, timeout={}ms",
- inputSources.size(), parallelism, failOnFirstError,
batchTimeout);
+ LOG.info(
+ "Starting batch conversion of {} documents with batchSize={},
parallelism={}, failOnFirstError={}, timeout={}ms",
+ inputSources.size(), batchSize, parallelism, failOnFirstError,
batchTimeout);
BatchProcessingResults results = new BatchProcessingResults();
results.setStartTimeMs(System.currentTimeMillis());
@@ -776,101 +777,118 @@ public class DoclingProducer extends DefaultProducer {
AtomicBoolean shouldCancel = new AtomicBoolean(false);
try {
- // Create CompletableFutures for all conversion tasks
- List<CompletableFuture<BatchConversionResult>> futures = new
ArrayList<>();
-
- for (String inputSource : inputSources) {
- final int currentIndex = index.getAndIncrement();
- final String documentId = "doc-" + currentIndex;
-
- CompletableFuture<BatchConversionResult> future =
CompletableFuture.supplyAsync(() -> {
- // Check if we should skip this task due to early
termination
- if (failOnFirstError && shouldCancel.get()) {
- BatchConversionResult cancelledResult = new
BatchConversionResult(documentId, inputSource);
- cancelledResult.setBatchIndex(currentIndex);
- cancelledResult.setSuccess(false);
- cancelledResult.setErrorMessage("Cancelled due to
previous failure");
- return cancelledResult;
- }
+ // Partition documents into sub-batches of batchSize
+ for (int batchStart = 0; batchStart < inputSources.size();
batchStart += batchSize) {
+ if (failOnFirstError && shouldCancel.get()) {
+ break;
+ }
- BatchConversionResult result = new
BatchConversionResult(documentId, inputSource);
- result.setBatchIndex(currentIndex);
- long startTime = System.currentTimeMillis();
+ int batchEnd = Math.min(batchStart + batchSize,
inputSources.size());
+ List<String> subBatch = inputSources.subList(batchStart,
batchEnd);
- try {
- LOG.debug("Processing document {} (index {}): {}",
documentId, currentIndex, inputSource);
+ LOG.debug("Processing sub-batch [{}-{}] of {} total documents",
+ batchStart, batchEnd - 1, inputSources.size());
- String converted;
- if (useAsync) {
- converted =
convertDocumentAsyncAndWait(inputSource, outputFormat);
- } else {
- converted = convertDocumentSync(inputSource,
outputFormat);
+ List<CompletableFuture<BatchConversionResult>> futures = new
ArrayList<>();
+
+ for (String inputSource : subBatch) {
+ final int currentIndex = index.getAndIncrement();
+ final String documentId = "doc-" + currentIndex;
+
+ CompletableFuture<BatchConversionResult> future =
CompletableFuture.supplyAsync(() -> {
+ // Check if we should skip this task due to early
termination
+ if (failOnFirstError && shouldCancel.get()) {
+ BatchConversionResult cancelledResult = new
BatchConversionResult(documentId, inputSource);
+ cancelledResult.setBatchIndex(currentIndex);
+ cancelledResult.setSuccess(false);
+ cancelledResult.setErrorMessage("Cancelled due to
previous failure");
+ return cancelledResult;
}
- result.setResult(converted);
- result.setSuccess(true);
- result.setProcessingTimeMs(System.currentTimeMillis()
- startTime);
+ BatchConversionResult result = new
BatchConversionResult(documentId, inputSource);
+ result.setBatchIndex(currentIndex);
+ long startTime = System.currentTimeMillis();
- LOG.debug("Successfully processed document {} in
{}ms", documentId,
- result.getProcessingTimeMs());
+ try {
+ LOG.debug("Processing document {} (index {}): {}",
documentId, currentIndex, inputSource);
- } catch (Exception e) {
- result.setSuccess(false);
- result.setErrorMessage(e.getMessage());
- result.setProcessingTimeMs(System.currentTimeMillis()
- startTime);
+ String converted;
+ if (useAsync) {
+ converted =
convertDocumentAsyncAndWait(inputSource, outputFormat);
+ } else {
+ converted = convertDocumentSync(inputSource,
outputFormat);
+ }
+
+ result.setResult(converted);
+ result.setSuccess(true);
+
result.setProcessingTimeMs(System.currentTimeMillis() - startTime);
+
+ LOG.debug("Successfully processed document {} in
{}ms", documentId,
+ result.getProcessingTimeMs());
- LOG.error("Failed to process document {} (index {}):
{}", documentId, currentIndex,
- e.getMessage(), e);
+ } catch (Exception e) {
+ result.setSuccess(false);
+ result.setErrorMessage(e.getMessage());
+
result.setProcessingTimeMs(System.currentTimeMillis() - startTime);
- // Signal other tasks to cancel if failOnFirstError is
enabled
- if (failOnFirstError) {
- shouldCancel.set(true);
+ LOG.error("Failed to process document {} (index
{}): {}", documentId, currentIndex,
+ e.getMessage(), e);
+
+ // Signal other tasks to cancel if
failOnFirstError is enabled
+ if (failOnFirstError) {
+ shouldCancel.set(true);
+ }
}
- }
- return result;
- }, executor);
+ return result;
+ }, executor);
- futures.add(future);
- }
+ futures.add(future);
+ }
- // Wait for all futures to complete with timeout
- CompletableFuture<Void> allOf =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+ // Wait for sub-batch to complete with remaining timeout
+ long elapsed = System.currentTimeMillis() -
results.getStartTimeMs();
+ long remainingTimeout = batchTimeout - elapsed;
+ if (remainingTimeout <= 0) {
+ futures.forEach(f -> f.cancel(true));
+ throw new RuntimeException("Batch processing timed out
after " + batchTimeout + "ms");
+ }
- try {
- allOf.get(batchTimeout, TimeUnit.MILLISECONDS);
- } catch (TimeoutException e) {
- LOG.error("Batch processing timed out after {}ms",
batchTimeout);
- // Cancel all incomplete futures
- futures.forEach(f -> f.cancel(true));
- throw new RuntimeException("Batch processing timed out after "
+ batchTimeout + "ms", e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.error("Batch processing interrupted", e);
- futures.forEach(f -> f.cancel(true));
- throw new RuntimeException("Batch processing interrupted", e);
- } catch (Exception e) {
- LOG.error("Batch processing failed", e);
- futures.forEach(f -> f.cancel(true));
- throw new RuntimeException("Batch processing failed", e);
- }
+ CompletableFuture<Void> allOf =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
- // Collect all results
- for (CompletableFuture<BatchConversionResult> future : futures) {
try {
- BatchConversionResult result = future.getNow(null);
- if (result != null) {
- results.addResult(result);
-
- // If failOnFirstError and we hit a failure, stop
adding more results
- if (failOnFirstError && !result.isSuccess()) {
- LOG.warn("Failing batch due to error in document
{}: {}", result.getDocumentId(),
- result.getErrorMessage());
- break;
+ allOf.get(remainingTimeout, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ LOG.error("Batch processing timed out after {}ms",
batchTimeout);
+ futures.forEach(f -> f.cancel(true));
+ throw new RuntimeException("Batch processing timed out
after " + batchTimeout + "ms", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.error("Batch processing interrupted", e);
+ futures.forEach(f -> f.cancel(true));
+ throw new RuntimeException("Batch processing interrupted",
e);
+ } catch (Exception e) {
+ LOG.error("Batch processing failed", e);
+ futures.forEach(f -> f.cancel(true));
+ throw new RuntimeException("Batch processing failed", e);
+ }
+
+ // Collect results from this sub-batch
+ for (CompletableFuture<BatchConversionResult> future :
futures) {
+ try {
+ BatchConversionResult result = future.getNow(null);
+ if (result != null) {
+ results.addResult(result);
+
+ if (failOnFirstError && !result.isSuccess()) {
+ LOG.warn("Failing batch due to error in
document {}: {}", result.getDocumentId(),
+ result.getErrorMessage());
+ break;
+ }
}
+ } catch (Exception e) {
+ LOG.error("Error retrieving result", e);
}
- } catch (Exception e) {
- LOG.error("Error retrieving result", e);
}
}
@@ -1115,8 +1133,8 @@ public class DoclingProducer extends DefaultProducer {
boolean failOnFirstError, boolean useAsync, long batchTimeout) {
LOG.info(
- "Starting batch structured data extraction of {} documents
with parallelism={}, failOnFirstError={}, timeout={}ms",
- inputSources.size(), parallelism, failOnFirstError,
batchTimeout);
+ "Starting batch structured data extraction of {} documents
with batchSize={}, parallelism={}, failOnFirstError={}, timeout={}ms",
+ inputSources.size(), batchSize, parallelism, failOnFirstError,
batchTimeout);
BatchProcessingResults results = new BatchProcessingResults();
results.setStartTimeMs(System.currentTimeMillis());
@@ -1127,93 +1145,117 @@ public class DoclingProducer extends DefaultProducer {
AtomicBoolean shouldCancel = new AtomicBoolean(false);
try {
- List<CompletableFuture<BatchConversionResult>> futures = new
ArrayList<>();
-
- for (String inputSource : inputSources) {
- final int currentIndex = index.getAndIncrement();
- final String documentId = "doc-" + currentIndex;
-
- CompletableFuture<BatchConversionResult> future =
CompletableFuture.supplyAsync(() -> {
- if (failOnFirstError && shouldCancel.get()) {
- BatchConversionResult cancelledResult = new
BatchConversionResult(documentId, inputSource);
- cancelledResult.setBatchIndex(currentIndex);
- cancelledResult.setSuccess(false);
- cancelledResult.setErrorMessage("Cancelled due to
previous failure");
- return cancelledResult;
- }
+ // Partition documents into sub-batches of batchSize
+ for (int batchStart = 0; batchStart < inputSources.size();
batchStart += batchSize) {
+ if (failOnFirstError && shouldCancel.get()) {
+ break;
+ }
- BatchConversionResult result = new
BatchConversionResult(documentId, inputSource);
- result.setBatchIndex(currentIndex);
- long startTime = System.currentTimeMillis();
+ int batchEnd = Math.min(batchStart + batchSize,
inputSources.size());
+ List<String> subBatch = inputSources.subList(batchStart,
batchEnd);
- try {
- LOG.debug("Extracting structured data from document {}
(index {}): {}", documentId, currentIndex,
- inputSource);
+ LOG.debug("Processing sub-batch [{}-{}] of {} total documents",
+ batchStart, batchEnd - 1, inputSources.size());
- ConvertDocumentRequest request =
buildStructuredDataRequest(inputSource);
- String converted;
- if (useAsync) {
- converted = convertDocumentAsyncAndWait(request);
- } else {
- converted = convertDocumentSync(request);
+ List<CompletableFuture<BatchConversionResult>> futures = new
ArrayList<>();
+
+ for (String inputSource : subBatch) {
+ final int currentIndex = index.getAndIncrement();
+ final String documentId = "doc-" + currentIndex;
+
+ CompletableFuture<BatchConversionResult> future =
CompletableFuture.supplyAsync(() -> {
+ if (failOnFirstError && shouldCancel.get()) {
+ BatchConversionResult cancelledResult = new
BatchConversionResult(documentId, inputSource);
+ cancelledResult.setBatchIndex(currentIndex);
+ cancelledResult.setSuccess(false);
+ cancelledResult.setErrorMessage("Cancelled due to
previous failure");
+ return cancelledResult;
}
- result.setResult(converted);
- result.setSuccess(true);
- result.setProcessingTimeMs(System.currentTimeMillis()
- startTime);
+ BatchConversionResult result = new
BatchConversionResult(documentId, inputSource);
+ result.setBatchIndex(currentIndex);
+ long startTime = System.currentTimeMillis();
- } catch (Exception e) {
- result.setSuccess(false);
- result.setErrorMessage(e.getMessage());
- result.setProcessingTimeMs(System.currentTimeMillis()
- startTime);
+ try {
+ LOG.debug("Extracting structured data from
document {} (index {}): {}", documentId,
+ currentIndex, inputSource);
- LOG.error("Failed to extract structured data from
document {} (index {}): {}", documentId,
- currentIndex, e.getMessage(), e);
+ ConvertDocumentRequest request =
buildStructuredDataRequest(inputSource);
+ String converted;
+ if (useAsync) {
+ converted =
convertDocumentAsyncAndWait(request);
+ } else {
+ converted = convertDocumentSync(request);
+ }
+
+ result.setResult(converted);
+ result.setSuccess(true);
+
result.setProcessingTimeMs(System.currentTimeMillis() - startTime);
+
+ } catch (Exception e) {
+ result.setSuccess(false);
+ result.setErrorMessage(e.getMessage());
+
result.setProcessingTimeMs(System.currentTimeMillis() - startTime);
+
+ LOG.error("Failed to extract structured data from
document {} (index {}): {}", documentId,
+ currentIndex, e.getMessage(), e);
- if (failOnFirstError) {
- shouldCancel.set(true);
+ if (failOnFirstError) {
+ shouldCancel.set(true);
+ }
}
- }
- return result;
- }, executor);
+ return result;
+ }, executor);
- futures.add(future);
- }
+ futures.add(future);
+ }
- CompletableFuture<Void> allOf =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+ // Wait for sub-batch to complete with remaining timeout
+ long elapsed = System.currentTimeMillis() -
results.getStartTimeMs();
+ long remainingTimeout = batchTimeout - elapsed;
+ if (remainingTimeout <= 0) {
+ futures.forEach(f -> f.cancel(true));
+ throw new RuntimeException(
+ "Batch structured data extraction timed out after
" + batchTimeout + "ms");
+ }
- try {
- allOf.get(batchTimeout, TimeUnit.MILLISECONDS);
- } catch (TimeoutException e) {
- LOG.error("Batch structured data extraction timed out after
{}ms", batchTimeout);
- futures.forEach(f -> f.cancel(true));
- throw new RuntimeException("Batch structured data extraction
timed out after " + batchTimeout + "ms", e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.error("Batch structured data extraction interrupted", e);
- futures.forEach(f -> f.cancel(true));
- throw new RuntimeException("Batch structured data extraction
interrupted", e);
- } catch (Exception e) {
- LOG.error("Batch structured data extraction failed", e);
- futures.forEach(f -> f.cancel(true));
- throw new RuntimeException("Batch structured data extraction
failed", e);
- }
+ CompletableFuture<Void> allOf =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
- for (CompletableFuture<BatchConversionResult> future : futures) {
try {
- BatchConversionResult result = future.getNow(null);
- if (result != null) {
- results.addResult(result);
-
- if (failOnFirstError && !result.isSuccess()) {
- LOG.warn("Failing batch due to error in document
{}: {}", result.getDocumentId(),
- result.getErrorMessage());
- break;
+ allOf.get(remainingTimeout, TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ LOG.error("Batch structured data extraction timed out
after {}ms", batchTimeout);
+ futures.forEach(f -> f.cancel(true));
+ throw new RuntimeException(
+ "Batch structured data extraction timed out after
" + batchTimeout + "ms", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.error("Batch structured data extraction interrupted",
e);
+ futures.forEach(f -> f.cancel(true));
+ throw new RuntimeException("Batch structured data
extraction interrupted", e);
+ } catch (Exception e) {
+ LOG.error("Batch structured data extraction failed", e);
+ futures.forEach(f -> f.cancel(true));
+ throw new RuntimeException("Batch structured data
extraction failed", e);
+ }
+
+ // Collect results from this sub-batch
+ for (CompletableFuture<BatchConversionResult> future :
futures) {
+ try {
+ BatchConversionResult result = future.getNow(null);
+ if (result != null) {
+ results.addResult(result);
+
+ if (failOnFirstError && !result.isSuccess()) {
+ LOG.warn("Failing batch due to error in
document {}: {}", result.getDocumentId(),
+ result.getErrorMessage());
+ break;
+ }
}
+ } catch (Exception e) {
+ LOG.error("Error retrieving result", e);
}
- } catch (Exception e) {
- LOG.error("Error retrieving result", e);
}
}
diff --git
a/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/BatchProcessingTest.java
b/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/BatchProcessingTest.java
index e1d902ce1c29..a70bda0e10bd 100644
---
a/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/BatchProcessingTest.java
+++
b/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/BatchProcessingTest.java
@@ -141,6 +141,16 @@ public class BatchProcessingTest extends CamelTestSupport {
assertTrue(config.isSplitBatchResults());
}
+ @Test
+ public void testBatchSizeParsedFromEndpointUri() throws Exception {
+ DoclingEndpoint endpoint = (DoclingEndpoint) context.getEndpoint(
+
"docling:convert?useDoclingServe=true&batchSize=5&batchParallelism=2");
+ DoclingConfiguration config = endpoint.getConfiguration();
+
+ assertEquals(5, config.getBatchSize());
+ assertEquals(2, config.getBatchParallelism());
+ }
+
@Test
public void testBatchTimeoutConfiguration() {
DoclingConfiguration config = new DoclingConfiguration();
diff --git
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DoclingComponentBuilderFactory.java
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DoclingComponentBuilderFactory.java
index ddfac58453aa..97365d57ab3a 100644
---
a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DoclingComponentBuilderFactory.java
+++
b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DoclingComponentBuilderFactory.java
@@ -678,8 +678,11 @@ public interface DoclingComponentBuilderFactory {
/**
- * Maximum number of documents to process in a single batch (batch
- * operations only).
+ * Number of documents to submit per sub-batch. Documents are
+ * partitioned into sub-batches of this size and each sub-batch is
+ * processed before starting the next one. Within each sub-batch, up to
+ * batchParallelism threads are used concurrently. This controls memory
+ * usage and back-pressure when processing large document sets.
*
* The option is a: <code>int</code> type.
*
diff --git
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DoclingEndpointBuilderFactory.java
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DoclingEndpointBuilderFactory.java
index 0ee94f1e577f..8801cda626ee 100644
---
a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DoclingEndpointBuilderFactory.java
+++
b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DoclingEndpointBuilderFactory.java
@@ -310,8 +310,11 @@ public interface DoclingEndpointBuilderFactory {
return this;
}
/**
- * Maximum number of documents to process in a single batch (batch
- * operations only).
+ * Number of documents to submit per sub-batch. Documents are
+ * partitioned into sub-batches of this size and each sub-batch is
+ * processed before starting the next one. Within each sub-batch, up to
+ * batchParallelism threads are used concurrently. This controls memory
+ * usage and back-pressure when processing large document sets.
*
* The option is a: <code>int</code> type.
*
@@ -326,8 +329,11 @@ public interface DoclingEndpointBuilderFactory {
return this;
}
/**
- * Maximum number of documents to process in a single batch (batch
- * operations only).
+ * Number of documents to submit per sub-batch. Documents are
+ * partitioned into sub-batches of this size and each sub-batch is
+ * processed before starting the next one. Within each sub-batch, up to
+ * batchParallelism threads are used concurrently. This controls memory
+ * usage and back-pressure when processing large document sets.
*
* The option will be converted to a <code>int</code> type.
*