This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/exchange-factory by this push:
new 76617ba CAMEL-16222: PooledExchangeFactory experiment
76617ba is described below
commit 76617ba3e8496afeb469195d547f1fbf03d2d81c
Author: Claus Ibsen <[email protected]>
AuthorDate: Wed Feb 17 15:37:52 2021 +0100
CAMEL-16222: PooledExchangeFactory experiment
---
.../java/org/apache/camel/ExtendedExchange.java | 5 ++
.../camel/impl/engine/PooledExchangeFactory.java | 84 ++++++++++++++++++++++
.../org/apache/camel/support/DefaultExchange.java | 29 +++++++-
3 files changed, 117 insertions(+), 1 deletion(-)
diff --git
a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
index bb0b523..c2a1ffd 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
@@ -29,6 +29,11 @@ import org.apache.camel.spi.UnitOfWork;
public interface ExtendedExchange extends Exchange {
/**
+ * Clears the exchange from user data so it may be reused.
+ */
+ void reset();
+
+ /**
* Sets the endpoint which originated this message exchange. This method
should typically only be called by
* {@link Endpoint} implementations
*/
diff --git
a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
new file mode 100644
index 0000000..ced186e
--- /dev/null
+++
b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/PooledExchangeFactory.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.impl.engine;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.camel.*;
+import org.apache.camel.spi.ExchangeFactory;
+import org.apache.camel.support.DefaultExchange;
+import org.apache.camel.support.service.ServiceSupport;
+
+/**
+ * Pooled {@link ExchangeFactory} that reuses {@link Exchange} instance from a
pool.
+ */
+@Experimental
+public class PooledExchangeFactory extends ServiceSupport
+ implements ExchangeFactory, CamelContextAware, StaticService,
NonManagedService {
+
+ private final ConcurrentLinkedQueue<Exchange> pool = new
ConcurrentLinkedQueue<>();
+
+ private CamelContext camelContext;
+
+ @Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ @Override
+ public Exchange create() {
+ Exchange exchange = pool.poll();
+ if (exchange == null) {
+ // create a new exchange as there was no free from the pool
+ exchange = new DefaultExchange(camelContext);
+ } else {
+ // reset exchange before we use it
+ ExtendedExchange ee = exchange.adapt(ExtendedExchange.class);
+ ee.reset();
+ }
+ return exchange;
+ }
+
+ @Override
+ public Exchange create(Endpoint fromEndpoint) {
+ Exchange exchange = pool.poll();
+ if (exchange == null) {
+ // create a new exchange as there was no free from the pool
+ exchange = new DefaultExchange(fromEndpoint);
+ } else {
+ // need to mark this exchange from the given endpoint
+
exchange.adapt(ExtendedExchange.class).setFromEndpoint(fromEndpoint);
+ }
+ return exchange;
+ }
+
+ @Override
+ public void release(Exchange exchange) {
+ pool.offer(exchange);
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ pool.clear();
+ }
+
+}
diff --git
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
index 986312b..fca25fa 100644
---
a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
+++
b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
@@ -45,7 +45,7 @@ import org.apache.camel.util.ObjectHelper;
public final class DefaultExchange implements ExtendedExchange {
private final CamelContext context;
- private final long created;
+ private long created;
// optimize to create properties always and with a reasonable small size
private final Map<String, Object> properties = new ConcurrentHashMap<>(8);
private Message in;
@@ -115,6 +115,33 @@ public final class DefaultExchange implements
ExtendedExchange {
}
}
+ public void reset() {
+ this.properties.clear();
+ this.exchangeId = null;
+ this.created = System.currentTimeMillis();
+ this.out = null;
+ this.exception = null;
+ this.unitOfWork = null;
+ this.pattern = null;
+ this.fromEndpoint = null;
+ this.fromRouteId = null;
+ if (this.onCompletions != null) {
+ this.onCompletions.clear();
+ }
+ this.externalRedelivered = null;
+ this.historyNodeId = null;
+ this.historyNodeLabel = null;
+ this.transacted = false;
+ this.routeStop = false;
+ this.rollbackOnly = false;
+ this.rollbackOnlyLast = false;
+ this.notifyEvent = false;
+ this.interrupted = false;
+ this.interruptable = true;
+ this.redeliveryExhausted = false;
+ this.errorHandlerHandled = null;
+ }
+
@Override
public long getCreated() {
return created;