This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch backport/21689-to-camel-4.18.x in repository https://gitbox.apache.org/repos/asf/camel.git
commit 51b3de851fd256d62794321deb3001e20999c892 Author: Andrea Cosentino <[email protected]> AuthorDate: Tue Mar 3 10:36:47 2026 +0100 CAMEL-23120 - camel-docling - Implement batchSize sub-batch partitioning in batch processing (#21689) The batchSize configuration parameter (default 10) was declared and read from headers in processBatchConversion() and processBatchStructuredData(), but the value was never actually applied. Both convertDocumentsBatch() and convertStructuredDataBatch() submitted all documents to the executor at once regardless of batchSize, making the parameter a no-op. This change makes batchSize control how many documents are submitted per sub-batch. Documents are partitioned into chunks of batchSize and each sub-batch is processed to completion before starting the next one. Within each sub-batch, up to batchParallelism threads run concurrently. The overall batchTimeout is tracked across sub-batches so remaining time decreases as sub-batches complete, and failOnFirstError stops processing across sub-batch boundaries. This provides back-pressure and controls memory usage when processing large document sets, preventing the creation of unbounded numbers of CompletableFutures. Signed-off-by: Andrea Cosentino <[email protected]> --- .../apache/camel/catalog/components/docling.json | 4 +- .../apache/camel/component/docling/docling.json | 4 +- .../component/docling/DoclingConfiguration.java | 5 +- .../camel/component/docling/DoclingProducer.java | 344 ++++++++++++--------- .../component/docling/BatchProcessingTest.java | 10 + .../dsl/DoclingComponentBuilderFactory.java | 7 +- .../dsl/DoclingEndpointBuilderFactory.java | 14 +- 7 files changed, 226 insertions(+), 162 deletions(-) diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/docling.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/docling.json index cbcad6fc5a46..099e68cf265c 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/docling.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/components/docling.json @@ -61,7 +61,7 @@ "workingDirectory": { "index": 34, "kind": "property", "displayName": "Working Directory", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Working directory for Docling execution" }, "batchFailOnFirstError": { "index": 35, "kind": "property", "displayName": "Batch Fail On First Error", "group": "batch", "label": "batch", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Fail entire batch on first error (true) or conti [...] "batchParallelism": { "index": 36, "kind": "property", "displayName": "Batch Parallelism", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 4, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Number of parallel threads for batch processing" }, - "batchSize": { "index": 37, "kind": "property", "displayName": "Batch Size", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Maximum number of documents to process in a single batch (batch operations only)" }, + "batchSize": { "index": 37, "kind": "property", "displayName": "Batch Size", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Number of documents to submit per sub-batch. Documents are partitioned into sub-b [...] "batchTimeout": { "index": 38, "kind": "property", "displayName": "Batch Timeout", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 300000, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Maximum time to wait for batch completion in milliseconds" }, "splitBatchResults": { "index": 39, "kind": "property", "displayName": "Split Batch Results", "group": "batch", "label": "batch", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Split batch results into individual exchanges (one per do [...] "includeMetadataInHeaders": { "index": 40, "kind": "property", "displayName": "Include Metadata In Headers", "group": "metadata", "label": "metadata", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Include metadata in message headers w [...] @@ -138,7 +138,7 @@ "workingDirectory": { "index": 33, "kind": "parameter", "displayName": "Working Directory", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Working directory for Docling execution" }, "batchFailOnFirstError": { "index": 34, "kind": "parameter", "displayName": "Batch Fail On First Error", "group": "batch", "label": "batch", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Fail entire batch on first error (true) or cont [...] "batchParallelism": { "index": 35, "kind": "parameter", "displayName": "Batch Parallelism", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 4, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Number of parallel threads for batch processing" }, - "batchSize": { "index": 36, "kind": "parameter", "displayName": "Batch Size", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Maximum number of documents to process in a single batch (batch operations only)" }, + "batchSize": { "index": 36, "kind": "parameter", "displayName": "Batch Size", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Number of documents to submit per sub-batch. Documents are partitioned into sub- [...] "batchTimeout": { "index": 37, "kind": "parameter", "displayName": "Batch Timeout", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 300000, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Maximum time to wait for batch completion in milliseconds" }, "splitBatchResults": { "index": 38, "kind": "parameter", "displayName": "Split Batch Results", "group": "batch", "label": "batch", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Split batch results into individual exchanges (one per d [...] "includeMetadataInHeaders": { "index": 39, "kind": "parameter", "displayName": "Include Metadata In Headers", "group": "metadata", "label": "metadata", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Include metadata in message headers [...] diff --git a/components/camel-ai/camel-docling/src/generated/resources/META-INF/org/apache/camel/component/docling/docling.json b/components/camel-ai/camel-docling/src/generated/resources/META-INF/org/apache/camel/component/docling/docling.json index cbcad6fc5a46..099e68cf265c 100644 --- a/components/camel-ai/camel-docling/src/generated/resources/META-INF/org/apache/camel/component/docling/docling.json +++ b/components/camel-ai/camel-docling/src/generated/resources/META-INF/org/apache/camel/component/docling/docling.json @@ -61,7 +61,7 @@ "workingDirectory": { "index": 34, "kind": "property", "displayName": "Working Directory", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Working directory for Docling execution" }, "batchFailOnFirstError": { "index": 35, "kind": "property", "displayName": "Batch Fail On First Error", "group": "batch", "label": "batch", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Fail entire batch on first error (true) or conti [...] "batchParallelism": { "index": 36, "kind": "property", "displayName": "Batch Parallelism", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 4, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Number of parallel threads for batch processing" }, - "batchSize": { "index": 37, "kind": "property", "displayName": "Batch Size", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Maximum number of documents to process in a single batch (batch operations only)" }, + "batchSize": { "index": 37, "kind": "property", "displayName": "Batch Size", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Number of documents to submit per sub-batch. Documents are partitioned into sub-b [...] "batchTimeout": { "index": 38, "kind": "property", "displayName": "Batch Timeout", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 300000, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Maximum time to wait for batch completion in milliseconds" }, "splitBatchResults": { "index": 39, "kind": "property", "displayName": "Split Batch Results", "group": "batch", "label": "batch", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Split batch results into individual exchanges (one per do [...] "includeMetadataInHeaders": { "index": 40, "kind": "property", "displayName": "Include Metadata In Headers", "group": "metadata", "label": "metadata", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Include metadata in message headers w [...] @@ -138,7 +138,7 @@ "workingDirectory": { "index": 33, "kind": "parameter", "displayName": "Working Directory", "group": "advanced", "label": "advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Working directory for Docling execution" }, "batchFailOnFirstError": { "index": 34, "kind": "parameter", "displayName": "Batch Fail On First Error", "group": "batch", "label": "batch", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Fail entire batch on first error (true) or cont [...] "batchParallelism": { "index": 35, "kind": "parameter", "displayName": "Batch Parallelism", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 4, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Number of parallel threads for batch processing" }, - "batchSize": { "index": 36, "kind": "parameter", "displayName": "Batch Size", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Maximum number of documents to process in a single batch (batch operations only)" }, + "batchSize": { "index": 36, "kind": "parameter", "displayName": "Batch Size", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "int", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 10, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Number of documents to submit per sub-batch. Documents are partitioned into sub- [...] "batchTimeout": { "index": 37, "kind": "parameter", "displayName": "Batch Timeout", "group": "batch", "label": "batch", "required": false, "type": "integer", "javaType": "long", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": 300000, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Maximum time to wait for batch completion in milliseconds" }, "splitBatchResults": { "index": 38, "kind": "parameter", "displayName": "Split Batch Results", "group": "batch", "label": "batch", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Split batch results into individual exchanges (one per d [...] "includeMetadataInHeaders": { "index": 39, "kind": "parameter", "displayName": "Include Metadata In Headers", "group": "metadata", "label": "metadata", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "deprecationNote": "", "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.docling.DoclingConfiguration", "configurationField": "configuration", "description": "Include metadata in message headers [...] diff --git a/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingConfiguration.java b/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingConfiguration.java index 0a965bbef4bb..c33ee819dd98 100644 --- a/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingConfiguration.java +++ b/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingConfiguration.java @@ -107,7 +107,10 @@ public class DoclingConfiguration implements Cloneable { private long asyncTimeout = 300000; // 5 minutes @UriParam(label = "batch") - @Metadata(description = "Maximum number of documents to process in a single batch (batch operations only)", + @Metadata(description = "Number of documents to submit per sub-batch. Documents are partitioned into sub-batches of this size" + + " and each sub-batch is processed before starting the next one. Within each sub-batch, up to" + + " batchParallelism threads are used concurrently. This controls memory usage and back-pressure" + + " when processing large document sets.", defaultValue = "10") private int batchSize = 10; diff --git a/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java b/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java index f151a4bbb65c..d54886ff815d 100644 --- a/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java +++ b/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java @@ -666,8 +666,9 @@ public class DoclingProducer extends DefaultProducer { List<String> inputSources, String outputFormat, int batchSize, int parallelism, boolean failOnFirstError, boolean useAsync, long batchTimeout) { - LOG.info("Starting batch conversion of {} documents with parallelism={}, failOnFirstError={}, timeout={}ms", - inputSources.size(), parallelism, failOnFirstError, batchTimeout); + LOG.info( + "Starting batch conversion of {} documents with batchSize={}, parallelism={}, failOnFirstError={}, timeout={}ms", + inputSources.size(), batchSize, parallelism, failOnFirstError, batchTimeout); BatchProcessingResults results = new BatchProcessingResults(); results.setStartTimeMs(System.currentTimeMillis()); @@ -678,101 +679,118 @@ public class DoclingProducer extends DefaultProducer { AtomicBoolean shouldCancel = new AtomicBoolean(false); try { - // Create CompletableFutures for all conversion tasks - List<CompletableFuture<BatchConversionResult>> futures = new ArrayList<>(); - - for (String inputSource : inputSources) { - final int currentIndex = index.getAndIncrement(); - final String documentId = "doc-" + currentIndex; - - CompletableFuture<BatchConversionResult> future = CompletableFuture.supplyAsync(() -> { - // Check if we should skip this task due to early termination - if (failOnFirstError && shouldCancel.get()) { - BatchConversionResult cancelledResult = new BatchConversionResult(documentId, inputSource); - cancelledResult.setBatchIndex(currentIndex); - cancelledResult.setSuccess(false); - cancelledResult.setErrorMessage("Cancelled due to previous failure"); - return cancelledResult; - } + // Partition documents into sub-batches of batchSize + for (int batchStart = 0; batchStart < inputSources.size(); batchStart += batchSize) { + if (failOnFirstError && shouldCancel.get()) { + break; + } - BatchConversionResult result = new BatchConversionResult(documentId, inputSource); - result.setBatchIndex(currentIndex); - long startTime = System.currentTimeMillis(); + int batchEnd = Math.min(batchStart + batchSize, inputSources.size()); + List<String> subBatch = inputSources.subList(batchStart, batchEnd); - try { - LOG.debug("Processing document {} (index {}): {}", documentId, currentIndex, inputSource); + LOG.debug("Processing sub-batch [{}-{}] of {} total documents", + batchStart, batchEnd - 1, inputSources.size()); - String converted; - if (useAsync) { - converted = convertDocumentAsyncAndWait(inputSource, outputFormat); - } else { - converted = convertDocumentSync(inputSource, outputFormat); + List<CompletableFuture<BatchConversionResult>> futures = new ArrayList<>(); + + for (String inputSource : subBatch) { + final int currentIndex = index.getAndIncrement(); + final String documentId = "doc-" + currentIndex; + + CompletableFuture<BatchConversionResult> future = CompletableFuture.supplyAsync(() -> { + // Check if we should skip this task due to early termination + if (failOnFirstError && shouldCancel.get()) { + BatchConversionResult cancelledResult = new BatchConversionResult(documentId, inputSource); + cancelledResult.setBatchIndex(currentIndex); + cancelledResult.setSuccess(false); + cancelledResult.setErrorMessage("Cancelled due to previous failure"); + return cancelledResult; } - result.setResult(converted); - result.setSuccess(true); - result.setProcessingTimeMs(System.currentTimeMillis() - startTime); + BatchConversionResult result = new BatchConversionResult(documentId, inputSource); + result.setBatchIndex(currentIndex); + long startTime = System.currentTimeMillis(); - LOG.debug("Successfully processed document {} in {}ms", documentId, - result.getProcessingTimeMs()); + try { + LOG.debug("Processing document {} (index {}): {}", documentId, currentIndex, inputSource); - } catch (Exception e) { - result.setSuccess(false); - result.setErrorMessage(e.getMessage()); - result.setProcessingTimeMs(System.currentTimeMillis() - startTime); + String converted; + if (useAsync) { + converted = convertDocumentAsyncAndWait(inputSource, outputFormat); + } else { + converted = convertDocumentSync(inputSource, outputFormat); + } + + result.setResult(converted); + result.setSuccess(true); + result.setProcessingTimeMs(System.currentTimeMillis() - startTime); + + LOG.debug("Successfully processed document {} in {}ms", documentId, + result.getProcessingTimeMs()); - LOG.error("Failed to process document {} (index {}): {}", documentId, currentIndex, - e.getMessage(), e); + } catch (Exception e) { + result.setSuccess(false); + result.setErrorMessage(e.getMessage()); + result.setProcessingTimeMs(System.currentTimeMillis() - startTime); - // Signal other tasks to cancel if failOnFirstError is enabled - if (failOnFirstError) { - shouldCancel.set(true); + LOG.error("Failed to process document {} (index {}): {}", documentId, currentIndex, + e.getMessage(), e); + + // Signal other tasks to cancel if failOnFirstError is enabled + if (failOnFirstError) { + shouldCancel.set(true); + } } - } - return result; - }, executor); + return result; + }, executor); - futures.add(future); - } + futures.add(future); + } - // Wait for all futures to complete with timeout - CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + // Wait for sub-batch to complete with remaining timeout + long elapsed = System.currentTimeMillis() - results.getStartTimeMs(); + long remainingTimeout = batchTimeout - elapsed; + if (remainingTimeout <= 0) { + futures.forEach(f -> f.cancel(true)); + throw new RuntimeException("Batch processing timed out after " + batchTimeout + "ms"); + } - try { - allOf.get(batchTimeout, TimeUnit.MILLISECONDS); - } catch (TimeoutException e) { - LOG.error("Batch processing timed out after {}ms", batchTimeout); - // Cancel all incomplete futures - futures.forEach(f -> f.cancel(true)); - throw new RuntimeException("Batch processing timed out after " + batchTimeout + "ms", e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.error("Batch processing interrupted", e); - futures.forEach(f -> f.cancel(true)); - throw new RuntimeException("Batch processing interrupted", e); - } catch (Exception e) { - LOG.error("Batch processing failed", e); - futures.forEach(f -> f.cancel(true)); - throw new RuntimeException("Batch processing failed", e); - } + CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); - // Collect all results - for (CompletableFuture<BatchConversionResult> future : futures) { try { - BatchConversionResult result = future.getNow(null); - if (result != null) { - results.addResult(result); - - // If failOnFirstError and we hit a failure, stop adding more results - if (failOnFirstError && !result.isSuccess()) { - LOG.warn("Failing batch due to error in document {}: {}", result.getDocumentId(), - result.getErrorMessage()); - break; + allOf.get(remainingTimeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + LOG.error("Batch processing timed out after {}ms", batchTimeout); + futures.forEach(f -> f.cancel(true)); + throw new RuntimeException("Batch processing timed out after " + batchTimeout + "ms", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Batch processing interrupted", e); + futures.forEach(f -> f.cancel(true)); + throw new RuntimeException("Batch processing interrupted", e); + } catch (Exception e) { + LOG.error("Batch processing failed", e); + futures.forEach(f -> f.cancel(true)); + throw new RuntimeException("Batch processing failed", e); + } + + // Collect results from this sub-batch + for (CompletableFuture<BatchConversionResult> future : futures) { + try { + BatchConversionResult result = future.getNow(null); + if (result != null) { + results.addResult(result); + + if (failOnFirstError && !result.isSuccess()) { + LOG.warn("Failing batch due to error in document {}: {}", result.getDocumentId(), + result.getErrorMessage()); + break; + } } + } catch (Exception e) { + LOG.error("Error retrieving result", e); } - } catch (Exception e) { - LOG.error("Error retrieving result", e); } } @@ -1017,8 +1035,8 @@ public class DoclingProducer extends DefaultProducer { boolean failOnFirstError, boolean useAsync, long batchTimeout) { LOG.info( - "Starting batch structured data extraction of {} documents with parallelism={}, failOnFirstError={}, timeout={}ms", - inputSources.size(), parallelism, failOnFirstError, batchTimeout); + "Starting batch structured data extraction of {} documents with batchSize={}, parallelism={}, failOnFirstError={}, timeout={}ms", + inputSources.size(), batchSize, parallelism, failOnFirstError, batchTimeout); BatchProcessingResults results = new BatchProcessingResults(); results.setStartTimeMs(System.currentTimeMillis()); @@ -1029,93 +1047,117 @@ public class DoclingProducer extends DefaultProducer { AtomicBoolean shouldCancel = new AtomicBoolean(false); try { - List<CompletableFuture<BatchConversionResult>> futures = new ArrayList<>(); - - for (String inputSource : inputSources) { - final int currentIndex = index.getAndIncrement(); - final String documentId = "doc-" + currentIndex; - - CompletableFuture<BatchConversionResult> future = CompletableFuture.supplyAsync(() -> { - if (failOnFirstError && shouldCancel.get()) { - BatchConversionResult cancelledResult = new BatchConversionResult(documentId, inputSource); - cancelledResult.setBatchIndex(currentIndex); - cancelledResult.setSuccess(false); - cancelledResult.setErrorMessage("Cancelled due to previous failure"); - return cancelledResult; - } + // Partition documents into sub-batches of batchSize + for (int batchStart = 0; batchStart < inputSources.size(); batchStart += batchSize) { + if (failOnFirstError && shouldCancel.get()) { + break; + } - BatchConversionResult result = new BatchConversionResult(documentId, inputSource); - result.setBatchIndex(currentIndex); - long startTime = System.currentTimeMillis(); + int batchEnd = Math.min(batchStart + batchSize, inputSources.size()); + List<String> subBatch = inputSources.subList(batchStart, batchEnd); - try { - LOG.debug("Extracting structured data from document {} (index {}): {}", documentId, currentIndex, - inputSource); + LOG.debug("Processing sub-batch [{}-{}] of {} total documents", + batchStart, batchEnd - 1, inputSources.size()); - ConvertDocumentRequest request = buildStructuredDataRequest(inputSource); - String converted; - if (useAsync) { - converted = convertDocumentAsyncAndWait(request); - } else { - converted = convertDocumentSync(request); + List<CompletableFuture<BatchConversionResult>> futures = new ArrayList<>(); + + for (String inputSource : subBatch) { + final int currentIndex = index.getAndIncrement(); + final String documentId = "doc-" + currentIndex; + + CompletableFuture<BatchConversionResult> future = CompletableFuture.supplyAsync(() -> { + if (failOnFirstError && shouldCancel.get()) { + BatchConversionResult cancelledResult = new BatchConversionResult(documentId, inputSource); + cancelledResult.setBatchIndex(currentIndex); + cancelledResult.setSuccess(false); + cancelledResult.setErrorMessage("Cancelled due to previous failure"); + return cancelledResult; } - result.setResult(converted); - result.setSuccess(true); - result.setProcessingTimeMs(System.currentTimeMillis() - startTime); + BatchConversionResult result = new BatchConversionResult(documentId, inputSource); + result.setBatchIndex(currentIndex); + long startTime = System.currentTimeMillis(); - } catch (Exception e) { - result.setSuccess(false); - result.setErrorMessage(e.getMessage()); - result.setProcessingTimeMs(System.currentTimeMillis() - startTime); + try { + LOG.debug("Extracting structured data from document {} (index {}): {}", documentId, + currentIndex, inputSource); - LOG.error("Failed to extract structured data from document {} (index {}): {}", documentId, - currentIndex, e.getMessage(), e); + ConvertDocumentRequest request = buildStructuredDataRequest(inputSource); + String converted; + if (useAsync) { + converted = convertDocumentAsyncAndWait(request); + } else { + converted = convertDocumentSync(request); + } + + result.setResult(converted); + result.setSuccess(true); + result.setProcessingTimeMs(System.currentTimeMillis() - startTime); + + } catch (Exception e) { + result.setSuccess(false); + result.setErrorMessage(e.getMessage()); + result.setProcessingTimeMs(System.currentTimeMillis() - startTime); + + LOG.error("Failed to extract structured data from document {} (index {}): {}", documentId, + currentIndex, e.getMessage(), e); - if (failOnFirstError) { - shouldCancel.set(true); + if (failOnFirstError) { + shouldCancel.set(true); + } } - } - return result; - }, executor); + return result; + }, executor); - futures.add(future); - } + futures.add(future); + } - CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); + // Wait for sub-batch to complete with remaining timeout + long elapsed = System.currentTimeMillis() - results.getStartTimeMs(); + long remainingTimeout = batchTimeout - elapsed; + if (remainingTimeout <= 0) { + futures.forEach(f -> f.cancel(true)); + throw new RuntimeException( + "Batch structured data extraction timed out after " + batchTimeout + "ms"); + } - try { - allOf.get(batchTimeout, TimeUnit.MILLISECONDS); - } catch (TimeoutException e) { - LOG.error("Batch structured data extraction timed out after {}ms", batchTimeout); - futures.forEach(f -> f.cancel(true)); - throw new RuntimeException("Batch structured data extraction timed out after " + batchTimeout + "ms", e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.error("Batch structured data extraction interrupted", e); - futures.forEach(f -> f.cancel(true)); - throw new RuntimeException("Batch structured data extraction interrupted", e); - } catch (Exception e) { - LOG.error("Batch structured data extraction failed", e); - futures.forEach(f -> f.cancel(true)); - throw new RuntimeException("Batch structured data extraction failed", e); - } + CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); - for (CompletableFuture<BatchConversionResult> future : futures) { try { - BatchConversionResult result = future.getNow(null); - if (result != null) { - results.addResult(result); - - if (failOnFirstError && !result.isSuccess()) { - LOG.warn("Failing batch due to error in document {}: {}", result.getDocumentId(), - result.getErrorMessage()); - break; + allOf.get(remainingTimeout, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + LOG.error("Batch structured data extraction timed out after {}ms", batchTimeout); + futures.forEach(f -> f.cancel(true)); + throw new RuntimeException( + "Batch structured data extraction timed out after " + batchTimeout + "ms", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Batch structured data extraction interrupted", e); + futures.forEach(f -> f.cancel(true)); + throw new RuntimeException("Batch structured data extraction interrupted", e); + } catch (Exception e) { + LOG.error("Batch structured data extraction failed", e); + futures.forEach(f -> f.cancel(true)); + throw new RuntimeException("Batch structured data extraction failed", e); + } + + // Collect results from this sub-batch + for (CompletableFuture<BatchConversionResult> future : futures) { + try { + BatchConversionResult result = future.getNow(null); + if (result != null) { + results.addResult(result); + + if (failOnFirstError && !result.isSuccess()) { + LOG.warn("Failing batch due to error in document {}: {}", result.getDocumentId(), + result.getErrorMessage()); + break; + } } + } catch (Exception e) { + LOG.error("Error retrieving result", e); } - } catch (Exception e) { - LOG.error("Error retrieving result", e); } } diff --git a/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/BatchProcessingTest.java b/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/BatchProcessingTest.java index 3b936a7cddd7..f34a228aab6f 100644 --- a/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/BatchProcessingTest.java +++ b/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/BatchProcessingTest.java @@ -141,6 +141,16 @@ public class BatchProcessingTest extends CamelTestSupport { assertTrue(config.isSplitBatchResults()); } + @Test + public void testBatchSizeParsedFromEndpointUri() throws Exception { + DoclingEndpoint endpoint = (DoclingEndpoint) context.getEndpoint( + "docling:convert?useDoclingServe=true&batchSize=5&batchParallelism=2"); + DoclingConfiguration config = endpoint.getConfiguration(); + + assertEquals(5, config.getBatchSize()); + assertEquals(2, config.getBatchParallelism()); + } + @Test public void testBatchTimeoutConfiguration() { DoclingConfiguration config = new DoclingConfiguration(); diff --git a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DoclingComponentBuilderFactory.java b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DoclingComponentBuilderFactory.java index 780a2a8bf4a0..369368cc6262 100644 --- a/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DoclingComponentBuilderFactory.java +++ b/dsl/camel-componentdsl/src/generated/java/org/apache/camel/builder/component/dsl/DoclingComponentBuilderFactory.java @@ -678,8 +678,11 @@ public interface DoclingComponentBuilderFactory { /** - * Maximum number of documents to process in a single batch (batch - * operations only). + * Number of documents to submit per sub-batch. Documents are + * partitioned into sub-batches of this size and each sub-batch is + * processed before starting the next one. Within each sub-batch, up to + * batchParallelism threads are used concurrently. This controls memory + * usage and back-pressure when processing large document sets. * * The option is a: <code>int</code> type. * diff --git a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DoclingEndpointBuilderFactory.java b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DoclingEndpointBuilderFactory.java index 24f6fcd98b13..e411b051f1d3 100644 --- a/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DoclingEndpointBuilderFactory.java +++ b/dsl/camel-endpointdsl/src/generated/java/org/apache/camel/builder/endpoint/dsl/DoclingEndpointBuilderFactory.java @@ -310,8 +310,11 @@ public interface DoclingEndpointBuilderFactory { return this; } /** - * Maximum number of documents to process in a single batch (batch - * operations only). + * Number of documents to submit per sub-batch. Documents are + * partitioned into sub-batches of this size and each sub-batch is + * processed before starting the next one. Within each sub-batch, up to + * batchParallelism threads are used concurrently. This controls memory + * usage and back-pressure when processing large document sets. * * The option is a: <code>int</code> type. * @@ -326,8 +329,11 @@ public interface DoclingEndpointBuilderFactory { return this; } /** - * Maximum number of documents to process in a single batch (batch - * operations only). + * Number of documents to submit per sub-batch. Documents are + * partitioned into sub-batches of this size and each sub-batch is + * processed before starting the next one. Within each sub-batch, up to + * batchParallelism threads are used concurrently. This controls memory + * usage and back-pressure when processing large document sets. * * The option will be converted to a <code>int</code> type. *
