This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 29b55273dc6 CAMEL-18739 fix by handing over the completion to the 
original exchange (#8795)
29b55273dc6 is described below

commit 29b55273dc66027b5492e688f5a50b9c6463e548
Author: Alexander <alexander-...@outlook.de>
AuthorDate: Wed Dec 14 18:51:44 2022 +0100

    CAMEL-18739 fix by handing over the completion to the original exchange 
(#8795)
    
    * Fix CAMEL-18739 by handing over the completion to the original exchange
    
    * Add unit test for bugfix issue CAMEL-18739
    
    * Use awaitility lib for checking async if tempfile no longer exists
    
    Co-authored-by: Alexander Lex <alexander....@ams-osram.com>
---
 components/camel-zipfile/pom.xml                   |   5 +
 .../zipfile/ZipAggregationStrategySplitTest.java   | 104 +++++++++++++++++++++
 .../apache/camel/processor/MulticastProcessor.java |   1 +
 3 files changed, 110 insertions(+)

diff --git a/components/camel-zipfile/pom.xml b/components/camel-zipfile/pom.xml
index 93c7c2c6d93..82836c1cc9b 100644
--- a/components/camel-zipfile/pom.xml
+++ b/components/camel-zipfile/pom.xml
@@ -65,6 +65,11 @@
             <artifactId>log4j-slf4j-impl</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
diff --git 
a/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategySplitTest.java
 
b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategySplitTest.java
new file mode 100644
index 00000000000..0887c338ba9
--- /dev/null
+++ 
b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategySplitTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.zipfile;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.util.concurrent.TimeUnit;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.GroupedMessageAggregationStrategy;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.apache.camel.util.IOHelper;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.camel.test.junit5.TestSupport.deleteDirectory;
+import static org.junit.jupiter.api.Assertions.*;
+
+public class ZipAggregationStrategySplitTest extends CamelTestSupport {
+
+    private static final int EXPECTED_NO_FILES = 3;
+    private static final String TEST_DIR = 
"target/out_ZipAggregationStrategyTest";
+
+    @Override
+    @BeforeEach
+    public void setUp() throws Exception {
+        deleteDirectory(TEST_DIR);
+        super.setUp();
+    }
+
+    @Test
+    public void testSplitter() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:aggregateToZipEntry");
+        mock.expectedMessageCount(1);
+
+        MockEndpoint.assertIsSatisfied(context);
+
+        String tempFileLocation = 
mock.getExchanges().get(0).getIn().getHeader("tempFile", String.class);
+
+        File[] files = new File(TEST_DIR).listFiles();
+        assertNotNull(files);
+        assertTrue(files.length > 0, "Should be a file in " + TEST_DIR + " 
directory");
+
+        File resultFile = files[0];
+
+        ZipInputStream zin = new ZipInputStream(new 
FileInputStream(resultFile));
+        try {
+            int fileCount = 0;
+            for (ZipEntry ze = zin.getNextEntry(); ze != null; ze = 
zin.getNextEntry()) {
+                fileCount++;
+            }
+            assertEquals(ZipAggregationStrategySplitTest.EXPECTED_NO_FILES, 
fileCount,
+                    "Zip file should contains " + 
ZipAggregationStrategySplitTest.EXPECTED_NO_FILES + " files");
+        } finally {
+            IOHelper.close(zin);
+        }
+
+        // Temp file needs to be deleted now
+        File tempFile = new File(tempFileLocation);
+        Awaitility.waitAtMost(5, TimeUnit.SECONDS).alias("Tempfile is 
deleted").until(() -> !tempFile.exists());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                // Unzip file and Split it according to FileEntry
+                
from("file:src/test/resources/org/apache/camel/aggregate/zipfile/data?delay=1000&noop=true")
+                    .aggregate(new GroupedMessageAggregationStrategy())
+                    .constant(true)
+                    .completionFromBatchConsumer()
+                    .eagerCheckCompletion()
+                    .split(body(), new ZipAggregationStrategy(true, true))
+                    .streaming()
+                    .process(exchange -> { /* NOOP - Do nothing */ })
+                    .end()
+                    .setHeader("tempFile", header("CamelFileAbsolutePath"))
+                    .to("file:" + TEST_DIR)
+                    .to("mock:aggregateToZipEntry")
+                    .log("Done processing zip file: ${header.CamelFileName}");
+            }
+        };
+
+    }
+}
diff --git 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 380829ff369..e7f5c2d2fc2 100644
--- 
a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ 
b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -802,6 +802,7 @@ public class MulticastProcessor extends 
AsyncProcessorSupport
             } else {
                 // copy the current result to original so it will contain this 
result of this eip
                 ExchangeHelper.copyResults(original, subExchange);
+                
subExchange.adapt(ExtendedExchange.class).handoverCompletions(original);
             }
         }
 

Reply via email to