This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch docling-backports-4.18.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit c5cb776b607357f95170470fc0aa8da0fb0189ec
Author: Andrea Cosentino <[email protected]>
AuthorDate: Mon Mar 2 13:07:34 2026 +0100

    CAMEL-23105 - camel-docling - Fix broken async conversion workflow 
(discarded result and error masking) (#21675)
    
    The CompletionStage returned by convertSourceAsync() was discarded and
    a fabricated task ID with no server-side correlation was returned. The
    subsequent CHECK_CONVERSION_STATUS would fail silently because
    checkConversionStatusInternal() returned COMPLETED on any exception.
    
    Store the CompletableFuture in a ConcurrentHashMap keyed by a generated
    task ID. CHECK_CONVERSION_STATUS now checks the local map first and
    returns the actual future state (IN_PROGRESS, COMPLETED with result,
    or FAILED with error message). Also fix the error path to return FAILED
    instead of COMPLETED when an exception occurs.
    
    Signed-off-by: Andrea Cosentino <[email protected]>
---
 .../camel/component/docling/DoclingProducer.java   |  55 +++++-
 .../docling/DoclingAsyncConversionTest.java        | 200 +++++++++++++++++++++
 2 files changed, 250 insertions(+), 5 deletions(-)

diff --git 
a/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java
 
b/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java
index f85a25373f37..f3b2ec43ca15 100644
--- 
a/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java
+++ 
b/components/camel-ai/camel-docling/src/main/java/org/apache/camel/component/docling/DoclingProducer.java
@@ -32,12 +32,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -82,6 +84,8 @@ public class DoclingProducer extends DefaultProducer {
     private DoclingConfiguration configuration;
     private DoclingServeApi doclingServeApi;
     private ObjectMapper objectMapper;
+    private final Map<String, CompletableFuture<ConvertDocumentResponse>> 
pendingAsyncTasks = new ConcurrentHashMap<>();
+    private final AtomicLong taskIdCounter = new AtomicLong();
 
     public DoclingProducer(DoclingEndpoint endpoint) {
         super(endpoint);
@@ -124,6 +128,9 @@ public class DoclingProducer extends DefaultProducer {
     @Override
     protected void doStop() throws Exception {
         super.doStop();
+        // Cancel any pending async tasks
+        pendingAsyncTasks.forEach((id, future) -> future.cancel(true));
+        pendingAsyncTasks.clear();
         if (doclingServeApi != null) {
             doclingServeApi = null;
             LOG.info("DoclingServeApi reference cleared");
@@ -264,10 +271,12 @@ public class DoclingProducer extends DefaultProducer {
 
         // Start async conversion
         ConvertDocumentRequest request = buildConvertRequest(inputPath, 
outputFormat);
-        CompletionStage<ConvertDocumentResponse> asyncResult = 
doclingServeApi.convertSourceAsync(request);
+        CompletableFuture<ConvertDocumentResponse> asyncResult
+                = 
doclingServeApi.convertSourceAsync(request).toCompletableFuture();
 
-        // Generate a task ID for tracking
-        String taskId = "task-" + System.currentTimeMillis() + "-" + 
inputPath.hashCode();
+        // Generate a unique task ID and store the future for later status 
checks
+        String taskId = "task-" + taskIdCounter.incrementAndGet();
+        pendingAsyncTasks.put(taskId, asyncResult);
         LOG.debug("Started async conversion with task ID: {}", taskId);
 
         // Set task ID in body and header
@@ -322,6 +331,13 @@ public class DoclingProducer extends DefaultProducer {
     private ConversionStatus checkConversionStatusInternal(String taskId) {
         LOG.debug("Checking status for task: {}", taskId);
 
+        // Check the local pending tasks map first (tasks submitted via 
SUBMIT_ASYNC_CONVERSION)
+        CompletableFuture<ConvertDocumentResponse> future = 
pendingAsyncTasks.get(taskId);
+        if (future != null) {
+            return checkLocalAsyncTask(taskId, future);
+        }
+
+        // Fall back to server-side task polling
         try {
             TaskStatusPollRequest pollRequest = TaskStatusPollRequest.builder()
                     .taskId(taskId)
@@ -351,8 +367,37 @@ public class DoclingProducer extends DefaultProducer {
             return new ConversionStatus(taskId, status);
         } catch (Exception e) {
             LOG.warn("Failed to check task status for {}: {}", taskId, 
e.getMessage());
-            // If the task ID doesn't exist on the server, return a completed 
status as a fallback
-            return new ConversionStatus(taskId, 
ConversionStatus.Status.COMPLETED);
+            String errorMsg = e.getMessage() != null ? e.getMessage() : 
e.getClass().getName();
+            return new ConversionStatus(taskId, 
ConversionStatus.Status.FAILED, null, errorMsg, null);
+        }
+    }
+
+    private ConversionStatus checkLocalAsyncTask(String taskId, 
CompletableFuture<ConvertDocumentResponse> future) {
+        if (!future.isDone()) {
+            return new ConversionStatus(taskId, 
ConversionStatus.Status.IN_PROGRESS);
+        }
+
+        if (future.isCompletedExceptionally() || future.isCancelled()) {
+            // Remove completed task from map
+            pendingAsyncTasks.remove(taskId);
+            String errorMessage;
+            try {
+                future.join();
+                errorMessage = "Unknown error";
+            } catch (Exception e) {
+                errorMessage = e.getCause() != null ? 
e.getCause().getMessage() : e.getMessage();
+            }
+            return new ConversionStatus(taskId, 
ConversionStatus.Status.FAILED, null, errorMessage, null);
+        }
+
+        // Completed successfully — extract the result and remove from map
+        pendingAsyncTasks.remove(taskId);
+        try {
+            ConvertDocumentResponse response = future.join();
+            String result = extractConvertedContent(response, 
configuration.getOutputFormat());
+            return new ConversionStatus(taskId, 
ConversionStatus.Status.COMPLETED, result, null, null);
+        } catch (Exception e) {
+            return new ConversionStatus(taskId, 
ConversionStatus.Status.FAILED, null, e.getMessage(), null);
         }
     }
 
diff --git 
a/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/DoclingAsyncConversionTest.java
 
b/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/DoclingAsyncConversionTest.java
new file mode 100644
index 000000000000..3bf29ebfe075
--- /dev/null
+++ 
b/components/camel-ai/camel-docling/src/test/java/org/apache/camel/component/docling/DoclingAsyncConversionTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.docling;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import ai.docling.serve.api.convert.response.ConvertDocumentResponse;
+import ai.docling.serve.api.convert.response.DocumentResponse;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.support.DefaultExchange;
+import org.apache.camel.test.junit6.CamelTestSupport;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+/**
+ * Tests the SUBMIT_ASYNC_CONVERSION and CHECK_CONVERSION_STATUS two-step 
async workflow.
+ *
+ * <p>
+ * Before the fix, the {@code CompletionStage} returned by {@code 
convertSourceAsync()} was discarded and a fabricated
+ * task ID with no server-side correlation was returned. 
CHECK_CONVERSION_STATUS would then fail because the server had
+ * no record of the fake ID, and the error was silently masked by returning 
COMPLETED.
+ *
+ * <p>
+ * After the fix, the {@code CompletableFuture} is stored in a local map keyed 
by the generated task ID. When
+ * CHECK_CONVERSION_STATUS is called, it checks the local map first and 
returns the actual status of the async task.
+ */
+class DoclingAsyncConversionTest extends CamelTestSupport {
+
+    @Test
+    void submitReturnsTaskIdLinkedToFuture() throws Exception {
+        DoclingEndpoint endpoint = context.getEndpoint(
+                
"docling:convert?operation=SUBMIT_ASYNC_CONVERSION&useDoclingServe=true", 
DoclingEndpoint.class);
+        DoclingProducer producer = (DoclingProducer) endpoint.createProducer();
+
+        // Access the pendingAsyncTasks map via reflection to verify the 
future is stored
+        Map<String, CompletableFuture<ConvertDocumentResponse>> pendingTasks = 
getPendingAsyncTasks(producer);
+        assertNotNull(pendingTasks, "pendingAsyncTasks map should exist");
+        assertTrue(pendingTasks.isEmpty(), "pendingAsyncTasks should start 
empty");
+    }
+
+    @Test
+    void checkStatusReturnsFailedForUnknownTaskId() throws Exception {
+        // When CHECK_CONVERSION_STATUS is called with an unknown task ID and 
the server
+        // is not available, it should return FAILED — not COMPLETED (the old 
bug).
+        try {
+            Exchange exchange = new DefaultExchange(context);
+            exchange.getIn().setHeader(DoclingHeaders.TASK_ID, 
"nonexistent-task-id");
+            exchange.getIn().setHeader(DoclingHeaders.OPERATION, 
DoclingOperations.CHECK_CONVERSION_STATUS);
+
+            template.send("direct:check-status", exchange);
+
+            Object body = exchange.getIn().getBody();
+            assertInstanceOf(ConversionStatus.class, body);
+            ConversionStatus status = (ConversionStatus) body;
+
+            // The key assertion: unknown task IDs should NOT return COMPLETED
+            assertNotEquals(ConversionStatus.Status.COMPLETED, 
status.getStatus(),
+                    "Unknown task ID should not return COMPLETED status");
+            assertEquals(ConversionStatus.Status.FAILED, status.getStatus(),
+                    "Unknown task ID with unavailable server should return 
FAILED");
+            assertNotNull(status.getErrorMessage(), "Error message should be 
populated");
+        } catch (CamelExecutionException e) {
+            // If the exchange throws instead of setting FAILED status, that's 
also acceptable —
+            // the important thing is it doesn't silently return COMPLETED
+        }
+    }
+
+    @Test
+    void checkStatusReturnsCompletedForFinishedLocalTask() throws Exception {
+        DoclingEndpoint endpoint = context.getEndpoint(
+                
"docling:convert?operation=CHECK_CONVERSION_STATUS&useDoclingServe=true", 
DoclingEndpoint.class);
+        DoclingProducer producer = (DoclingProducer) endpoint.createProducer();
+
+        // Manually insert a completed future into the pending tasks map
+        Map<String, CompletableFuture<ConvertDocumentResponse>> pendingTasks = 
getPendingAsyncTasks(producer);
+
+        // Create a completed future with a mock response
+        ConvertDocumentResponse mockResponse = 
ConvertDocumentResponse.builder()
+                .document(DocumentResponse.builder()
+                        .markdownContent("# Converted Document")
+                        .build())
+                .build();
+        CompletableFuture<ConvertDocumentResponse> completedFuture = 
CompletableFuture.completedFuture(mockResponse);
+        pendingTasks.put("test-task-1", completedFuture);
+
+        // Check the status — should find it in local map and return COMPLETED 
with result
+        Exchange exchange = new DefaultExchange(context);
+        exchange.getIn().setHeader(DoclingHeaders.TASK_ID, "test-task-1");
+
+        producer.process(exchange);
+
+        Object body = exchange.getIn().getBody();
+        assertInstanceOf(ConversionStatus.class, body);
+        ConversionStatus status = (ConversionStatus) body;
+        assertEquals(ConversionStatus.Status.COMPLETED, status.getStatus());
+        assertNotNull(status.getResult(), "Result should contain the converted 
content");
+
+        // Future should be removed from map after completion
+        assertFalse(pendingTasks.containsKey("test-task-1"),
+                "Completed task should be removed from pending map");
+    }
+
+    @Test
+    void checkStatusReturnsInProgressForPendingLocalTask() throws Exception {
+        DoclingEndpoint endpoint = context.getEndpoint(
+                
"docling:convert?operation=CHECK_CONVERSION_STATUS&useDoclingServe=true", 
DoclingEndpoint.class);
+        DoclingProducer producer = (DoclingProducer) endpoint.createProducer();
+
+        Map<String, CompletableFuture<ConvertDocumentResponse>> pendingTasks = 
getPendingAsyncTasks(producer);
+
+        // Insert an incomplete future
+        CompletableFuture<ConvertDocumentResponse> incompleteFuture = new 
CompletableFuture<>();
+        pendingTasks.put("test-task-2", incompleteFuture);
+
+        Exchange exchange = new DefaultExchange(context);
+        exchange.getIn().setHeader(DoclingHeaders.TASK_ID, "test-task-2");
+
+        producer.process(exchange);
+
+        Object body = exchange.getIn().getBody();
+        assertInstanceOf(ConversionStatus.class, body);
+        ConversionStatus status = (ConversionStatus) body;
+        assertEquals(ConversionStatus.Status.IN_PROGRESS, status.getStatus());
+
+        // Future should remain in map since it's not done yet
+        assertTrue(pendingTasks.containsKey("test-task-2"),
+                "In-progress task should remain in pending map");
+
+        // Clean up
+        incompleteFuture.cancel(true);
+    }
+
+    @Test
+    void checkStatusReturnsFailedForExceptionalLocalTask() throws Exception {
+        DoclingEndpoint endpoint = context.getEndpoint(
+                
"docling:convert?operation=CHECK_CONVERSION_STATUS&useDoclingServe=true", 
DoclingEndpoint.class);
+        DoclingProducer producer = (DoclingProducer) endpoint.createProducer();
+
+        Map<String, CompletableFuture<ConvertDocumentResponse>> pendingTasks = 
getPendingAsyncTasks(producer);
+
+        // Insert a failed future
+        CompletableFuture<ConvertDocumentResponse> failedFuture = new 
CompletableFuture<>();
+        failedFuture.completeExceptionally(new RuntimeException("Server 
connection refused"));
+        pendingTasks.put("test-task-3", failedFuture);
+
+        Exchange exchange = new DefaultExchange(context);
+        exchange.getIn().setHeader(DoclingHeaders.TASK_ID, "test-task-3");
+
+        producer.process(exchange);
+
+        Object body = exchange.getIn().getBody();
+        assertInstanceOf(ConversionStatus.class, body);
+        ConversionStatus status = (ConversionStatus) body;
+        assertEquals(ConversionStatus.Status.FAILED, status.getStatus());
+        assertNotNull(status.getErrorMessage());
+        assertTrue(status.getErrorMessage().contains("Server connection 
refused"));
+
+        // Failed task should be removed from map
+        assertFalse(pendingTasks.containsKey("test-task-3"),
+                "Failed task should be removed from pending map");
+    }
+
+    @SuppressWarnings("unchecked")
+    private Map<String, CompletableFuture<ConvertDocumentResponse>> 
getPendingAsyncTasks(DoclingProducer producer)
+            throws Exception {
+        Field field = 
DoclingProducer.class.getDeclaredField("pendingAsyncTasks");
+        field.setAccessible(true);
+        return (Map<String, CompletableFuture<ConvertDocumentResponse>>) 
field.get(producer);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("direct:check-status")
+                        
.to("docling:convert?operation=CHECK_CONVERSION_STATUS&useDoclingServe=true");
+            }
+        };
+    }
+}

Reply via email to