This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new d89ac1ffab2 CAMEL-20614: deep-copy output processors during 
instantiation of a route template (#13823)
d89ac1ffab2 is described below

commit d89ac1ffab2d55f1d6fed77822fcf7026d3542b0
Author: Bartosz Popiela <bartosz...@gmail.com>
AuthorDate: Thu Apr 18 13:33:17 2024 +0200

    CAMEL-20614: deep-copy output processors during instantiation of a route 
template (#13823)
    
    * CAMEL-20614: deep-copy output processors during instantiation of a route 
template
    
    When multiple threads try to instantiate and send an exchange to the same 
kamelet in parallel, 
org.apache.camel.component.kamelet.KameletConsumerNotAvailableException may be 
thrown because the underlying RouteTemplateDefinition is shallow-copied and 
changes to the RouteDefinition are reflected in the RouteTemplateDefinition.
    
    * CAMEL-20614: add shallowCopy as per code review comment
    
    * CAMEL-20614: update access modifiers of copy constructors to be protected 
as per code review comment
    
    * CAMEL-20614: add unit tests
---
 .../kamelet/KameletMultiThreadedTest.java          | 70 ++++++++++++++++++++++
 ...ition.java => CopyableProcessorDefinition.java} | 15 ++---
 .../org/apache/camel/model/NoOutputDefinition.java |  6 ++
 .../camel/model/OptionalIdentifiedDefinition.java  | 12 ++++
 .../apache/camel/model/ProcessorDefinition.java    | 11 ++++
 .../camel/model/RouteTemplateDefinition.java       | 28 ++++++++-
 .../org/apache/camel/model/SendDefinition.java     |  8 +++
 .../java/org/apache/camel/model/ToDefinition.java  | 13 +++-
 .../apache/camel/model/ToDynamicDefinition.java    | 19 +++++-
 .../camel/model/RouteTemplateDefinitionTest.java   | 63 +++++++++++++++++++
 10 files changed, 229 insertions(+), 16 deletions(-)

diff --git 
a/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletMultiThreadedTest.java
 
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletMultiThreadedTest.java
new file mode 100644
index 00000000000..73d47e2cd6f
--- /dev/null
+++ 
b/components/camel-kamelet/src/test/java/org/apache/camel/component/kamelet/KameletMultiThreadedTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kamelet;
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.camel.component.kamelet.Kamelet.templateToRoute;
+
+public class KameletMultiThreadedTest extends CamelTestSupport {
+
+    @Test
+    public void 
createSameKameletTwiceInParallel_KameletConsumerNotAvailableExceptionThrown() 
throws InterruptedException {
+        var latch = new CountDownLatch(2);
+        context.addRouteTemplateDefinitionConverter("*", (in, parameters) -> {
+            try {
+                return templateToRoute(in, parameters);
+            } finally {
+                latch.countDown();
+                latch.await();
+            }
+        });
+        getMockEndpoint("mock:foo").expectedMessageCount(2);
+
+        template.sendBody("seda:route", null);
+        template.requestBody("seda:route", ((Object) null));
+
+        MockEndpoint.assertIsSatisfied(context);
+    }
+
+    // **********************************************
+    //
+    // test set-up
+    //
+    // **********************************************
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                from("seda:route?concurrentConsumers=2")
+                        .toD("kamelet:-");
+
+                routeTemplate("-"). // This is a workaround for "*" to be 
iterated before templateId at 
org.apache.camel.impl.DefaultModel#addRouteFromTemplate (line 460)
+                        from("kamelet:source")
+                        .to("mock:foo");
+            }
+        };
+    }
+}
diff --git 
a/core/camel-core-model/src/main/java/org/apache/camel/model/NoOutputDefinition.java
 
b/core/camel-core-model/src/main/java/org/apache/camel/model/CopyableProcessorDefinition.java
similarity index 70%
copy from 
core/camel-core-model/src/main/java/org/apache/camel/model/NoOutputDefinition.java
copy to 
core/camel-core-model/src/main/java/org/apache/camel/model/CopyableProcessorDefinition.java
index fdd21dd2e8a..cf770d7515c 100644
--- 
a/core/camel-core-model/src/main/java/org/apache/camel/model/NoOutputDefinition.java
+++ 
b/core/camel-core-model/src/main/java/org/apache/camel/model/CopyableProcessorDefinition.java
@@ -16,17 +16,10 @@
  */
 package org.apache.camel.model;
 
-import java.util.Collections;
-import java.util.List;
-
 /**
- * Base class for definitions which does not support outputs.
+ * This interface is used to copy {@link ProcessorDefinition 
ProcessorDefinitions} during instantiation of a route
+ * template.
  */
-public abstract class NoOutputDefinition<Type extends 
ProcessorDefinition<Type>> extends ProcessorDefinition<Type> {
-
-    @Override
-    public List<ProcessorDefinition<?>> getOutputs() {
-        return Collections.emptyList();
-    }
-
+interface CopyableProcessorDefinition {
+    ProcessorDefinition<?> copy();
 }
diff --git 
a/core/camel-core-model/src/main/java/org/apache/camel/model/NoOutputDefinition.java
 
b/core/camel-core-model/src/main/java/org/apache/camel/model/NoOutputDefinition.java
index fdd21dd2e8a..92dc058a088 100644
--- 
a/core/camel-core-model/src/main/java/org/apache/camel/model/NoOutputDefinition.java
+++ 
b/core/camel-core-model/src/main/java/org/apache/camel/model/NoOutputDefinition.java
@@ -29,4 +29,10 @@ public abstract class NoOutputDefinition<Type extends 
ProcessorDefinition<Type>>
         return Collections.emptyList();
     }
 
+    public NoOutputDefinition() {
+    }
+
+    protected NoOutputDefinition(NoOutputDefinition source) {
+        super(source);
+    }
 }
diff --git 
a/core/camel-core-model/src/main/java/org/apache/camel/model/OptionalIdentifiedDefinition.java
 
b/core/camel-core-model/src/main/java/org/apache/camel/model/OptionalIdentifiedDefinition.java
index f8c42b3e246..767f40181b4 100644
--- 
a/core/camel-core-model/src/main/java/org/apache/camel/model/OptionalIdentifiedDefinition.java
+++ 
b/core/camel-core-model/src/main/java/org/apache/camel/model/OptionalIdentifiedDefinition.java
@@ -45,6 +45,18 @@ public abstract class OptionalIdentifiedDefinition<T extends 
OptionalIdentifiedD
     private int lineNumber = -1;
     private String location;
 
+    public OptionalIdentifiedDefinition() {
+    }
+
+    protected OptionalIdentifiedDefinition(OptionalIdentifiedDefinition 
source) {
+        this.camelContext = source.camelContext;
+        this.id = source.id;
+        this.customId = source.customId;
+        this.description = source.description;
+        this.lineNumber = source.lineNumber;
+        this.location = source.location;
+    }
+
     @Override
     public CamelContext getCamelContext() {
         return camelContext;
diff --git 
a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
 
b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
index bff79bdb6ad..194b15678dc 100644
--- 
a/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
+++ 
b/core/camel-core-model/src/main/java/org/apache/camel/model/ProcessorDefinition.java
@@ -102,6 +102,17 @@ public abstract class ProcessorDefinition<Type extends 
ProcessorDefinition<Type>
         index = COUNTER.getAndIncrement();
     }
 
+    protected ProcessorDefinition(ProcessorDefinition source) {
+        super(source);
+        this.disabled = source.disabled;
+        this.inheritErrorHandler = source.inheritErrorHandler;
+        this.blocks.addAll(source.blocks);
+        this.parent = source.parent;
+        this.routeConfiguration = source.routeConfiguration;
+        this.interceptStrategies.addAll(source.interceptStrategies);
+        this.index = source.index;
+    }
+
     private static <T extends ExpressionNode> ExpressionClause<T> 
createAndSetExpression(T result) {
         ExpressionClause<T> clause = new ExpressionClause<>(result);
         result.setExpression(clause);
diff --git 
a/core/camel-core-model/src/main/java/org/apache/camel/model/RouteTemplateDefinition.java
 
b/core/camel-core-model/src/main/java/org/apache/camel/model/RouteTemplateDefinition.java
index 8cff7141914..d46d30433aa 100644
--- 
a/core/camel-core-model/src/main/java/org/apache/camel/model/RouteTemplateDefinition.java
+++ 
b/core/camel-core-model/src/main/java/org/apache/camel/model/RouteTemplateDefinition.java
@@ -17,6 +17,7 @@
 package org.apache.camel.model;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Consumer;
@@ -417,10 +418,10 @@ public class RouteTemplateDefinition extends 
OptionalIdentifiedDefinition<RouteT
         copy.setLogMask(route.getLogMask());
         copy.setMessageHistory(route.getMessageHistory());
         copy.setOutputType(route.getOutputType());
-        copy.setOutputs(route.getOutputs());
-        copy.setRoutePolicies(route.getRoutePolicies());
+        copy.setOutputs(copy(route.getOutputs()));
+        copy.setRoutePolicies(shallowCopy(route.getRoutePolicies()));
         copy.setRoutePolicyRef(route.getRoutePolicyRef());
-        copy.setRouteProperties(route.getRouteProperties());
+        copy.setRouteProperties(shallowCopy(route.getRouteProperties()));
         copy.setShutdownRoute(route.getShutdownRoute());
         copy.setShutdownRunningTask(route.getShutdownRunningTask());
         copy.setStartupOrder(route.getStartupOrder());
@@ -434,6 +435,27 @@ public class RouteTemplateDefinition extends 
OptionalIdentifiedDefinition<RouteT
         }
         copy.setPrecondition(route.getPrecondition());
         copy.setRouteConfigurationId(route.getRouteConfigurationId());
+        copy.setTemplateParameters(shallowCopy(route.getTemplateParameters()));
+        return copy;
+    }
+
+    private <T> List<T> shallowCopy(List<T> list) {
+        return (list != null) ? new ArrayList<>(list) : null;
+    }
+
+    private <K, V> Map<K, V> shallowCopy(Map<K, V> map) {
+        return (map != null) ? new HashMap<>(map) : null;
+    }
+
+    private List<ProcessorDefinition<?>> copy(List<ProcessorDefinition<?>> 
outputs) {
+        var copy = new ArrayList<ProcessorDefinition<?>>();
+        for (var definition : outputs) {
+            if (definition instanceof CopyableProcessorDefinition copyable) {
+                copy.add(copyable.copy());
+            } else {
+                copy.add(definition);
+            }
+        }
         return copy;
     }
 
diff --git 
a/core/camel-core-model/src/main/java/org/apache/camel/model/SendDefinition.java
 
b/core/camel-core-model/src/main/java/org/apache/camel/model/SendDefinition.java
index 21392205b79..5e3a9725ae3 100644
--- 
a/core/camel-core-model/src/main/java/org/apache/camel/model/SendDefinition.java
+++ 
b/core/camel-core-model/src/main/java/org/apache/camel/model/SendDefinition.java
@@ -51,6 +51,14 @@ public abstract class SendDefinition<Type extends 
ProcessorDefinition<Type>> ext
         this.uri = uri;
     }
 
+    protected SendDefinition(SendDefinition source) {
+        super(source);
+        this.endpointUriToString = source.endpointUriToString;
+        this.endpoint = source.endpoint;
+        this.endpointProducerBuilder = source.endpointProducerBuilder;
+        this.uri = source.uri;
+    }
+
     @Override
     public String getEndpointUri() {
         if (endpointProducerBuilder != null) {
diff --git 
a/core/camel-core-model/src/main/java/org/apache/camel/model/ToDefinition.java 
b/core/camel-core-model/src/main/java/org/apache/camel/model/ToDefinition.java
index fed74435cd4..469ab1ea8b9 100644
--- 
a/core/camel-core-model/src/main/java/org/apache/camel/model/ToDefinition.java
+++ 
b/core/camel-core-model/src/main/java/org/apache/camel/model/ToDefinition.java
@@ -33,7 +33,7 @@ import org.apache.camel.spi.Metadata;
 @Metadata(label = "eip,routing")
 @XmlRootElement(name = "to")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class ToDefinition extends SendDefinition<ToDefinition> {
+public class ToDefinition extends SendDefinition<ToDefinition> implements 
CopyableProcessorDefinition {
 
     @XmlAttribute
     private String variableSend;
@@ -76,6 +76,13 @@ public class ToDefinition extends 
SendDefinition<ToDefinition> {
         this.pattern = pattern.name();
     }
 
+    protected ToDefinition(ToDefinition source) {
+        super(source);
+        this.variableSend = source.variableSend;
+        this.variableReceive = source.variableReceive;
+        this.pattern = source.pattern;
+    }
+
     @Override
     public String getShortName() {
         return "to";
@@ -128,4 +135,8 @@ public class ToDefinition extends 
SendDefinition<ToDefinition> {
     public void setVariableReceive(String variableReceive) {
         this.variableReceive = variableReceive;
     }
+
+    public ToDefinition copy() {
+        return new ToDefinition(this);
+    }
 }
diff --git 
a/core/camel-core-model/src/main/java/org/apache/camel/model/ToDynamicDefinition.java
 
b/core/camel-core-model/src/main/java/org/apache/camel/model/ToDynamicDefinition.java
index 3daac2cbf52..b1495e9fa8c 100644
--- 
a/core/camel-core-model/src/main/java/org/apache/camel/model/ToDynamicDefinition.java
+++ 
b/core/camel-core-model/src/main/java/org/apache/camel/model/ToDynamicDefinition.java
@@ -34,7 +34,7 @@ import org.apache.camel.spi.Metadata;
 @Metadata(label = "eip,routing")
 @XmlRootElement(name = "toD")
 @XmlAccessorType(XmlAccessType.FIELD)
-public class ToDynamicDefinition extends 
NoOutputDefinition<ToDynamicDefinition> {
+public class ToDynamicDefinition extends 
NoOutputDefinition<ToDynamicDefinition> implements CopyableProcessorDefinition {
 
     @XmlTransient
     protected EndpointProducerBuilder endpointProducerBuilder;
@@ -69,6 +69,19 @@ public class ToDynamicDefinition extends 
NoOutputDefinition<ToDynamicDefinition>
         this.uri = uri;
     }
 
+    protected ToDynamicDefinition(ToDynamicDefinition source) {
+        super(source);
+        this.endpointProducerBuilder = source.endpointProducerBuilder;
+        this.uri = source.uri;
+        this.variableSend = source.variableSend;
+        this.variableReceive = source.variableReceive;
+        this.pattern = source.pattern;
+        this.cacheSize = source.cacheSize;
+        this.ignoreInvalidEndpoint = source.ignoreInvalidEndpoint;
+        this.allowOptimisedComponents = source.allowOptimisedComponents;
+        this.autoStartComponents = source.autoStartComponents;
+    }
+
     @Override
     public String getShortName() {
         return "toD";
@@ -315,4 +328,8 @@ public class ToDynamicDefinition extends 
NoOutputDefinition<ToDynamicDefinition>
     public void setAutoStartComponents(String autoStartComponents) {
         this.autoStartComponents = autoStartComponents;
     }
+
+    public ToDynamicDefinition copy() {
+        return new ToDynamicDefinition(this);
+    }
 }
diff --git 
a/core/camel-core/src/test/java/org/apache/camel/model/RouteTemplateDefinitionTest.java
 
b/core/camel-core/src/test/java/org/apache/camel/model/RouteTemplateDefinitionTest.java
new file mode 100644
index 00000000000..0c74b35a181
--- /dev/null
+++ 
b/core/camel-core/src/test/java/org/apache/camel/model/RouteTemplateDefinitionTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.model;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.camel.support.RoutePolicySupport;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
+
+class RouteTemplateDefinitionTest {
+
+    @Test
+    void testDeepCopyMutableProperties() {
+        RouteDefinition route = new RouteDefinition();
+        route.setTemplateParameters(Map.of("parameter", "parameterValue"));
+        route.setRouteProperties(List.of(new PropertyDefinition("property", 
"propertyValue")));
+        route.setRoutePolicies(List.of(new RoutePolicySupport() {
+        }));
+        route.setInput(new FromDefinition("direct://fromEndpoint"));
+        route.setOutputs(List.of(new ToDefinition("direct://toEndpoint"), new 
SetHeaderDefinition("header", "headerValue")));
+        RouteTemplateDefinition routeTemplate = new RouteTemplateDefinition();
+        routeTemplate.setRoute(route);
+
+        RouteDefinition routeCopy = routeTemplate.asRouteDefinition();
+
+        assertNotSame(route.getTemplateParameters(), 
routeCopy.getTemplateParameters());
+        assertEquals(route.getTemplateParameters(), 
routeCopy.getTemplateParameters());
+        assertNotSame(route.getRouteProperties(), 
routeCopy.getRouteProperties());
+        assertEquals(route.getRouteProperties(), 
routeCopy.getRouteProperties());
+        assertNotSame(route.getRoutePolicies(), routeCopy.getRoutePolicies());
+        assertEquals(route.getRoutePolicies(), routeCopy.getRoutePolicies());
+        assertNotSame(route.getInput(), routeCopy.getInput());
+        assertEquals(route.getInput().getUri(), routeCopy.getInput().getUri());
+        assertNotSame(route.getOutputs(), routeCopy.getOutputs());
+        assertEquals(2, routeCopy.getOutputs().size());
+        assertNotSame(route.getOutputs().get(0), 
routeCopy.getOutputs().get(0));
+        assertInstanceOf(ToDefinition.class, route.getOutputs().get(0));
+        assertInstanceOf(ToDefinition.class, routeCopy.getOutputs().get(0));
+        assertEquals(((ToDefinition) route.getOutputs().get(0)).getUri(),
+                ((ToDefinition) routeCopy.getOutputs().get(0)).getUri());
+        assertSame(route.getOutputs().get(1), routeCopy.getOutputs().get(1));
+    }
+}

Reply via email to