This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push: new 29b55273dc6 CAMEL-18739 fix by handing over the completion to the original exchange (#8795) 29b55273dc6 is described below commit 29b55273dc66027b5492e688f5a50b9c6463e548 Author: Alexander <alexander-...@outlook.de> AuthorDate: Wed Dec 14 18:51:44 2022 +0100 CAMEL-18739 fix by handing over the completion to the original exchange (#8795) * Fix CAMEL-18739 by handing over the completion to the original exchange * Add unit test for bugfix issue CAMEL-18739 * Use awaitility lib for checking async if tempfile no longer exists Co-authored-by: Alexander Lex <alexander....@ams-osram.com> --- components/camel-zipfile/pom.xml | 5 + .../zipfile/ZipAggregationStrategySplitTest.java | 104 +++++++++++++++++++++ .../apache/camel/processor/MulticastProcessor.java | 1 + 3 files changed, 110 insertions(+) diff --git a/components/camel-zipfile/pom.xml b/components/camel-zipfile/pom.xml index 93c7c2c6d93..82836c1cc9b 100644 --- a/components/camel-zipfile/pom.xml +++ b/components/camel-zipfile/pom.xml @@ -65,6 +65,11 @@ <artifactId>log4j-slf4j-impl</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.awaitility</groupId> + <artifactId>awaitility</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> diff --git a/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategySplitTest.java b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategySplitTest.java new file mode 100644 index 00000000000..0887c338ba9 --- /dev/null +++ b/components/camel-zipfile/src/test/java/org/apache/camel/processor/aggregate/zipfile/ZipAggregationStrategySplitTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregate.zipfile; + +import java.io.File; +import java.io.FileInputStream; +import java.util.concurrent.TimeUnit; +import java.util.zip.ZipEntry; +import java.util.zip.ZipInputStream; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.aggregate.GroupedMessageAggregationStrategy; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.apache.camel.util.IOHelper; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.apache.camel.test.junit5.TestSupport.deleteDirectory; +import static org.junit.jupiter.api.Assertions.*; + +public class ZipAggregationStrategySplitTest extends CamelTestSupport { + + private static final int EXPECTED_NO_FILES = 3; + private static final String TEST_DIR = "target/out_ZipAggregationStrategyTest"; + + @Override + @BeforeEach + public void setUp() throws Exception { + deleteDirectory(TEST_DIR); + super.setUp(); + } + + @Test + public void testSplitter() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:aggregateToZipEntry"); + mock.expectedMessageCount(1); + + MockEndpoint.assertIsSatisfied(context); + + String tempFileLocation = mock.getExchanges().get(0).getIn().getHeader("tempFile", String.class); + + File[] files = new File(TEST_DIR).listFiles(); + assertNotNull(files); + assertTrue(files.length > 0, "Should be a file in " + TEST_DIR + " directory"); + + File resultFile = files[0]; + + ZipInputStream zin = new ZipInputStream(new FileInputStream(resultFile)); + try { + int fileCount = 0; + for (ZipEntry ze = zin.getNextEntry(); ze != null; ze = zin.getNextEntry()) { + fileCount++; + } + assertEquals(ZipAggregationStrategySplitTest.EXPECTED_NO_FILES, fileCount, + "Zip file should contains " + ZipAggregationStrategySplitTest.EXPECTED_NO_FILES + " files"); + } finally { + IOHelper.close(zin); + } + + // Temp file needs to be deleted now + File tempFile = new File(tempFileLocation); + Awaitility.waitAtMost(5, TimeUnit.SECONDS).alias("Tempfile is deleted").until(() -> !tempFile.exists()); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + // Unzip file and Split it according to FileEntry + from("file:src/test/resources/org/apache/camel/aggregate/zipfile/data?delay=1000&noop=true") + .aggregate(new GroupedMessageAggregationStrategy()) + .constant(true) + .completionFromBatchConsumer() + .eagerCheckCompletion() + .split(body(), new ZipAggregationStrategy(true, true)) + .streaming() + .process(exchange -> { /* NOOP - Do nothing */ }) + .end() + .setHeader("tempFile", header("CamelFileAbsolutePath")) + .to("file:" + TEST_DIR) + .to("mock:aggregateToZipEntry") + .log("Done processing zip file: ${header.CamelFileName}"); + } + }; + + } +} diff --git a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java index 380829ff369..e7f5c2d2fc2 100644 --- a/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/core/camel-core-processor/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -802,6 +802,7 @@ public class MulticastProcessor extends AsyncProcessorSupport } else { // copy the current result to original so it will contain this result of this eip ExchangeHelper.copyResults(original, subExchange); + subExchange.adapt(ExtendedExchange.class).handoverCompletions(original); } }