This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/exchange-factory by this push:
     new 3609376  CAMEL-16222: PooledExchangeFactory experiment
3609376 is described below

commit 36093762e671bdd29156bf32092c0d6cb92798f8
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Sun Feb 21 19:40:18 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../component/git/consumer/GitBranchConsumer.java  |  2 +-
 .../component/git/consumer/GitCommitConsumer.java  |  2 +-
 .../component/git/consumer/GitTagConsumer.java     |  2 +-
 .../component/github/consumer/CommitConsumer.java  |  2 +-
 .../component/github/consumer/EventsConsumer.java  |  2 +-
 .../consumer/PullRequestCommentConsumer.java       |  2 +-
 .../github/consumer/PullRequestConsumer.java       |  2 +-
 .../component/github/consumer/TagConsumer.java     |  2 +-
 .../mail/stream/GoogleMailStreamConsumer.java      | 43 ++++++++++++++-
 .../mail/stream/GoogleMailStreamEndpoint.java      | 42 --------------
 .../google/pubsub/GooglePubsubConsumer.java        |  8 +--
 .../pubsub/consumer/CamelMessageReceiver.java      |  9 ++-
 .../sheets/stream/GoogleSheetsStreamConsumer.java  | 48 ++++++++++++++--
 .../sheets/stream/GoogleSheetsStreamEndpoint.java  | 37 -------------
 .../google/storage/GoogleCloudStorageConsumer.java | 64 +++++++++++++++++++++-
 .../google/storage/GoogleCloudStorageEndpoint.java | 59 --------------------
 .../apache/camel/component/gora/GoraConsumer.java  | 10 +---
 .../guava/eventbus/CamelEventHandler.java          | 18 ++++--
 .../guava/eventbus/FilteringCamelEventHandler.java |  5 +-
 .../guava/eventbus/GuavaEventBusConsumer.java      |  8 +--
 .../guava/eventbus/GuavaEventBusEndpoint.java      |  7 ---
 21 files changed, 187 insertions(+), 187 deletions(-)

diff --git 
a/components/camel-git/src/main/java/org/apache/camel/component/git/consumer/GitBranchConsumer.java
 
b/components/camel-git/src/main/java/org/apache/camel/component/git/consumer/GitBranchConsumer.java
index 49d9456..6dc075d 100644
--- 
a/components/camel-git/src/main/java/org/apache/camel/component/git/consumer/GitBranchConsumer.java
+++ 
b/components/camel-git/src/main/java/org/apache/camel/component/git/consumer/GitBranchConsumer.java
@@ -40,7 +40,7 @@ public class GitBranchConsumer extends AbstractGitConsumer {
         List<Ref> call = 
getGit().branchList().setListMode(ListMode.ALL).call();
         for (Ref ref : call) {
             if (!branchesConsumed.contains(ref.getName())) {
-                Exchange e = getEndpoint().createExchange();
+                Exchange e = createExchange(true);
                 e.getMessage().setBody(ref.getName());
                 e.getMessage().setHeader(GitConstants.GIT_BRANCH_LEAF, 
ref.getLeaf().getName());
                 e.getMessage().setHeader(GitConstants.GIT_BRANCH_OBJECT_ID, 
ref.getObjectId().getName());
diff --git 
a/components/camel-git/src/main/java/org/apache/camel/component/git/consumer/GitCommitConsumer.java
 
b/components/camel-git/src/main/java/org/apache/camel/component/git/consumer/GitCommitConsumer.java
index 06e203b..122a85a 100644
--- 
a/components/camel-git/src/main/java/org/apache/camel/component/git/consumer/GitCommitConsumer.java
+++ 
b/components/camel-git/src/main/java/org/apache/camel/component/git/consumer/GitCommitConsumer.java
@@ -46,7 +46,7 @@ public class GitCommitConsumer extends AbstractGitConsumer {
         }
         for (RevCommit commit : commits) {
             if (!commitsConsumed.contains(commit.getId())) {
-                Exchange e = getEndpoint().createExchange();
+                Exchange e = createExchange(true);
                 e.getMessage().setBody(commit.getFullMessage());
                 e.getMessage().setHeader(GitConstants.GIT_COMMIT_ID, 
commit.getId());
                 e.getMessage().setHeader(GitConstants.GIT_COMMIT_AUTHOR_NAME, 
commit.getAuthorIdent().getName());
diff --git 
a/components/camel-git/src/main/java/org/apache/camel/component/git/consumer/GitTagConsumer.java
 
b/components/camel-git/src/main/java/org/apache/camel/component/git/consumer/GitTagConsumer.java
index 31ea39a..7052764 100644
--- 
a/components/camel-git/src/main/java/org/apache/camel/component/git/consumer/GitTagConsumer.java
+++ 
b/components/camel-git/src/main/java/org/apache/camel/component/git/consumer/GitTagConsumer.java
@@ -39,7 +39,7 @@ public class GitTagConsumer extends AbstractGitConsumer {
         List<Ref> call = getGit().tagList().call();
         for (Ref ref : call) {
             if (!tagsConsumed.contains(ref.getName())) {
-                Exchange e = getEndpoint().createExchange();
+                Exchange e = createExchange(true);
                 e.getMessage().setBody(ref.getName());
                 e.getMessage().setHeader(GitConstants.GIT_BRANCH_LEAF, 
ref.getLeaf().getName());
                 e.getMessage().setHeader(GitConstants.GIT_BRANCH_OBJECT_ID, 
ref.getObjectId().getName());
diff --git 
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java
 
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java
index ac92e01..5a4bbf7 100644
--- 
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java
+++ 
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java
@@ -71,7 +71,7 @@ public class CommitConsumer extends AbstractGitHubConsumer {
 
         while (!newCommits.empty()) {
             RepositoryCommit newCommit = newCommits.pop();
-            Exchange e = getEndpoint().createExchange();
+            Exchange e = createExchange(true);
             e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_AUTHOR, 
newCommit.getAuthor().getName());
             e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_COMMITTER, 
newCommit.getCommitter().getName());
             e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_SHA, 
newCommit.getSha());
diff --git 
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/EventsConsumer.java
 
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/EventsConsumer.java
index 343fe45..a47e763 100644
--- 
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/EventsConsumer.java
+++ 
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/EventsConsumer.java
@@ -78,7 +78,7 @@ public class EventsConsumer extends AbstractGitHubConsumer {
             lastEventId = Long.parseLong(latestEvent.getId());
 
             for (Event event : newEvents) {
-                Exchange exchange = getEndpoint().createExchange();
+                Exchange exchange = createExchange(true);
                 exchange.getMessage().setBody(event.getType());
                 
exchange.getMessage().setHeader(GitHubConstants.GITHUB_EVENT_PAYLOAD, 
event.getPayload());
                 getProcessor().process(exchange);
diff --git 
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestCommentConsumer.java
 
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestCommentConsumer.java
index 7f2164b..fc4fa03 100644
--- 
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestCommentConsumer.java
+++ 
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestCommentConsumer.java
@@ -110,7 +110,7 @@ public class PullRequestCommentConsumer extends 
AbstractGitHubConsumer {
 
         while (!newComments.empty()) {
             Comment newComment = newComments.pop();
-            Exchange e = getEndpoint().createExchange();
+            Exchange e = createExchange(true);
             e.getIn().setBody(newComment);
 
             // Required by the producers.  Set it here for convenience.
diff --git 
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestConsumer.java
 
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestConsumer.java
index 9d6b804..61d839c 100644
--- 
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestConsumer.java
+++ 
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestConsumer.java
@@ -76,7 +76,7 @@ public class PullRequestConsumer extends 
AbstractGitHubConsumer {
 
         while (!newPullRequests.empty()) {
             PullRequest newPullRequest = newPullRequests.pop();
-            Exchange e = getEndpoint().createExchange();
+            Exchange e = createExchange(true);
 
             e.getIn().setBody(newPullRequest);
 
diff --git 
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/TagConsumer.java
 
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/TagConsumer.java
index 559cbba..91a7107 100644
--- 
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/TagConsumer.java
+++ 
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/TagConsumer.java
@@ -56,7 +56,7 @@ public class TagConsumer extends AbstractGitHubConsumer {
 
         while (!newTags.empty()) {
             RepositoryTag newTag = newTags.pop();
-            Exchange e = getEndpoint().createExchange();
+            Exchange e = createExchange(true);
             e.getIn().setBody(newTag);
             getProcessor().process(e);
         }
diff --git 
a/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java
 
b/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java
index 4d4f5e2..da70219 100644
--- 
a/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java
+++ 
b/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java
@@ -16,17 +16,22 @@
  */
 package org.apache.camel.component.google.mail.stream;
 
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
 
+import com.google.api.client.util.Base64;
 import com.google.api.services.gmail.Gmail;
 import com.google.api.services.gmail.model.ListMessagesResponse;
 import com.google.api.services.gmail.model.Message;
+import com.google.api.services.gmail.model.MessagePart;
+import com.google.api.services.gmail.model.MessagePartHeader;
 import com.google.api.services.gmail.model.ModifyMessageRequest;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
 import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Processor;
 import org.apache.camel.spi.Synchronization;
@@ -83,7 +88,7 @@ public class GoogleMailStreamConsumer extends 
ScheduledBatchPollingConsumer {
         if (c.getMessages() != null) {
             for (Message message : c.getMessages()) {
                 Message mess = getClient().users().messages().get("me", 
message.getId()).setFormat("FULL").execute();
-                Exchange exchange = 
getEndpoint().createExchange(getEndpoint().getExchangePattern(), mess);
+                Exchange exchange = 
createExchange(getEndpoint().getExchangePattern(), mess);
                 answer.add(exchange);
             }
         }
@@ -170,4 +175,40 @@ public class GoogleMailStreamConsumer extends 
ScheduledBatchPollingConsumer {
         }
     }
 
+    public Exchange createExchange(ExchangePattern pattern, 
com.google.api.services.gmail.model.Message mail) {
+        Exchange exchange = createExchange(true);
+        exchange.setPattern(pattern);
+        org.apache.camel.Message message = exchange.getIn();
+        exchange.getIn().setHeader(GoogleMailStreamConstants.MAIL_ID, 
mail.getId());
+        List<MessagePart> parts = mail.getPayload().getParts();
+        if (parts != null && parts.get(0).getBody().getData() != null) {
+            byte[] bodyBytes = 
Base64.decodeBase64(parts.get(0).getBody().getData().trim());
+            String body = new String(bodyBytes, StandardCharsets.UTF_8);
+            message.setBody(body);
+        }
+        configureHeaders(message, mail.getPayload().getHeaders());
+        return exchange;
+    }
+
+    private void configureHeaders(org.apache.camel.Message message, 
List<MessagePartHeader> headers) {
+        for (MessagePartHeader header : headers) {
+            String headerName = header.getName();
+            if ("SUBJECT".equalsIgnoreCase(headerName)) {
+                message.setHeader(GoogleMailStreamConstants.MAIL_SUBJECT, 
header.getValue());
+            }
+            if ("TO".equalsIgnoreCase(headerName)) {
+                message.setHeader(GoogleMailStreamConstants.MAIL_TO, 
header.getValue());
+            }
+            if ("FROM".equalsIgnoreCase(headerName)) {
+                message.setHeader(GoogleMailStreamConstants.MAIL_FROM, 
header.getValue());
+            }
+            if ("CC".equalsIgnoreCase(headerName)) {
+                message.setHeader(GoogleMailStreamConstants.MAIL_CC, 
header.getValue());
+            }
+            if ("BCC".equalsIgnoreCase(headerName)) {
+                message.setHeader(GoogleMailStreamConstants.MAIL_BCC, 
header.getValue());
+            }
+        }
+    }
+
 }
diff --git 
a/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamEndpoint.java
 
b/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamEndpoint.java
index 8c47b31..947f69a2 100644
--- 
a/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamEndpoint.java
+++ 
b/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamEndpoint.java
@@ -16,22 +16,15 @@
  */
 package org.apache.camel.component.google.mail.stream;
 
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
 
-import com.google.api.client.util.Base64;
 import com.google.api.services.gmail.Gmail;
 import com.google.api.services.gmail.model.Label;
 import com.google.api.services.gmail.model.ListLabelsResponse;
-import com.google.api.services.gmail.model.MessagePart;
-import com.google.api.services.gmail.model.MessagePartHeader;
 import com.google.common.base.Splitter;
 import org.apache.camel.Category;
 import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.component.google.mail.GoogleMailClientFactory;
@@ -108,41 +101,6 @@ public class GoogleMailStreamEndpoint extends 
ScheduledPollEndpoint {
         return configuration;
     }
 
-    public Exchange createExchange(ExchangePattern pattern, 
com.google.api.services.gmail.model.Message mail) {
-        Exchange exchange = super.createExchange(pattern);
-        Message message = exchange.getIn();
-        exchange.getIn().setHeader(GoogleMailStreamConstants.MAIL_ID, 
mail.getId());
-        List<MessagePart> parts = mail.getPayload().getParts();
-        if (parts != null && parts.get(0).getBody().getData() != null) {
-            byte[] bodyBytes = 
Base64.decodeBase64(parts.get(0).getBody().getData().trim());
-            String body = new String(bodyBytes, StandardCharsets.UTF_8);
-            message.setBody(body);
-        }
-        setHeaders(message, mail.getPayload().getHeaders());
-        return exchange;
-    }
-
-    private void setHeaders(Message message, List<MessagePartHeader> headers) {
-        for (MessagePartHeader header : headers) {
-            String headerName = header.getName();
-            if ("SUBJECT".equalsIgnoreCase(headerName)) {
-                message.setHeader(GoogleMailStreamConstants.MAIL_SUBJECT, 
header.getValue());
-            }
-            if ("TO".equalsIgnoreCase(headerName)) {
-                message.setHeader(GoogleMailStreamConstants.MAIL_TO, 
header.getValue());
-            }
-            if ("FROM".equalsIgnoreCase(headerName)) {
-                message.setHeader(GoogleMailStreamConstants.MAIL_FROM, 
header.getValue());
-            }
-            if ("CC".equalsIgnoreCase(headerName)) {
-                message.setHeader(GoogleMailStreamConstants.MAIL_CC, 
header.getValue());
-            }
-            if ("BCC".equalsIgnoreCase(headerName)) {
-                message.setHeader(GoogleMailStreamConstants.MAIL_BCC, 
header.getValue());
-            }
-        }
-    }
-
     private List<String> splitLabels(String labels) {
         return Splitter.on(',').splitToList(labels);
     }
diff --git 
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
 
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
index 360dadc..8adc5e1 100644
--- 
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
+++ 
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
@@ -40,7 +40,7 @@ import org.apache.camel.support.DefaultConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class GooglePubsubConsumer extends DefaultConsumer {
+public class GooglePubsubConsumer extends DefaultConsumer {
 
     private Logger localLog;
 
@@ -123,7 +123,7 @@ class GooglePubsubConsumer extends DefaultConsumer {
 
         private void asynchronousPull(String subscriptionName) {
             while (isRunAllowed() && !isSuspendingOrSuspended()) {
-                MessageReceiver messageReceiver = new 
CamelMessageReceiver(endpoint, processor);
+                MessageReceiver messageReceiver = new 
CamelMessageReceiver(GooglePubsubConsumer.this, endpoint, processor);
 
                 Subscriber subscriber = 
endpoint.getComponent().getSubscriber(subscriptionName, messageReceiver);
                 try {
@@ -152,7 +152,7 @@ class GooglePubsubConsumer extends DefaultConsumer {
                     PullResponse pullResponse = 
subscriber.pullCallable().call(pullRequest);
                     for (ReceivedMessage message : 
pullResponse.getReceivedMessagesList()) {
                         PubsubMessage pubsubMessage = message.getMessage();
-                        Exchange exchange = endpoint.createExchange();
+                        Exchange exchange = createExchange(true);
                         
exchange.getIn().setBody(pubsubMessage.getData().toByteArray());
 
                         
exchange.getIn().setHeader(GooglePubsubConstants.ACK_ID, message.getAckId());
@@ -171,7 +171,7 @@ class GooglePubsubConsumer extends DefaultConsumer {
                         try {
                             processor.process(exchange);
                         } catch (Exception e) {
-                            exchange.setException(e);
+                            getExceptionHandler().handleException(e);
                         }
                     }
                 } catch (IOException e) {
diff --git 
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
 
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
index 0b441b0..d26d649 100644
--- 
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
+++ 
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
@@ -24,6 +24,7 @@ import org.apache.camel.Exchange;
 import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Processor;
 import org.apache.camel.component.google.pubsub.GooglePubsubConstants;
+import org.apache.camel.component.google.pubsub.GooglePubsubConsumer;
 import org.apache.camel.component.google.pubsub.GooglePubsubEndpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,10 +32,12 @@ import org.slf4j.LoggerFactory;
 public class CamelMessageReceiver implements MessageReceiver {
 
     private final Logger localLog;
+    private final GooglePubsubConsumer consumer;
     private final GooglePubsubEndpoint endpoint;
     private final Processor processor;
 
-    public CamelMessageReceiver(GooglePubsubEndpoint endpoint, Processor 
processor) {
+    public CamelMessageReceiver(GooglePubsubConsumer consumer, 
GooglePubsubEndpoint endpoint, Processor processor) {
+        this.consumer = consumer;
         this.endpoint = endpoint;
         this.processor = processor;
         String loggerId = endpoint.getLoggerId();
@@ -50,7 +53,7 @@ public class CamelMessageReceiver implements MessageReceiver {
             localLog.trace("Received message ID : {}", 
pubsubMessage.getMessageId());
         }
 
-        Exchange exchange = endpoint.createExchange();
+        Exchange exchange = consumer.createExchange(true);
         exchange.getIn().setBody(pubsubMessage.getData().toByteArray());
 
         exchange.getIn().setHeader(GooglePubsubConstants.MESSAGE_ID, 
pubsubMessage.getMessageId());
@@ -67,7 +70,7 @@ public class CamelMessageReceiver implements MessageReceiver {
         try {
             processor.process(exchange);
         } catch (Exception e) {
-            exchange.setException(e);
+            consumer.getExceptionHandler().handleException(e);
         }
     }
 }
diff --git 
a/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConsumer.java
 
b/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConsumer.java
index 5d382db..eb04b5d 100644
--- 
a/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConsumer.java
+++ 
b/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConsumer.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.google.sheets.stream;
 import java.util.ArrayDeque;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
@@ -29,6 +30,7 @@ import com.google.api.services.sheets.v4.model.Spreadsheet;
 import com.google.api.services.sheets.v4.model.ValueRange;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.support.ScheduledBatchPollingConsumer;
 import org.apache.camel.util.CastUtils;
@@ -89,12 +91,12 @@ public class GoogleSheetsStreamConsumer extends 
ScheduledBatchPollingConsumer {
                         if (getConfiguration().getMaxResults() > 0) {
                             valueRange.getValues().stream()
                                     .limit(getConfiguration().getMaxResults())
-                                    .map(values -> 
getEndpoint().createExchange(rangeIndex.get(), valueIndex.incrementAndGet(),
+                                    .map(values -> 
createExchange(rangeIndex.get(), valueIndex.incrementAndGet(),
                                             valueRange.getRange(), 
valueRange.getMajorDimension(), values))
                                     .forEach(answer::add);
                         } else {
                             valueRange.getValues().stream()
-                                    .map(values -> 
getEndpoint().createExchange(rangeIndex.get(), valueIndex.incrementAndGet(),
+                                    .map(values -> 
createExchange(rangeIndex.get(), valueIndex.incrementAndGet(),
                                             valueRange.getRange(), 
valueRange.getMajorDimension(), values))
                                     .forEach(answer::add);
                         }
@@ -112,7 +114,7 @@ public class GoogleSheetsStreamConsumer extends 
ScheduledBatchPollingConsumer {
                                             .collect(Collectors.toList()));
                                 }
                             })
-                            .map(valueRange -> 
getEndpoint().createExchange(rangeIndex.incrementAndGet(), valueRange))
+                            .map(valueRange -> 
createExchange(rangeIndex.incrementAndGet(), valueRange))
                             .forEach(answer::add);
                 }
             }
@@ -122,7 +124,7 @@ public class GoogleSheetsStreamConsumer extends 
ScheduledBatchPollingConsumer {
             request.setIncludeGridData(getConfiguration().isIncludeGridData());
 
             Spreadsheet spreadsheet = request.execute();
-            answer.add(getEndpoint().createExchange(spreadsheet));
+            answer.add(createExchange(spreadsheet));
         }
 
         return processBatch(CastUtils.cast(answer));
@@ -148,4 +150,42 @@ public class GoogleSheetsStreamConsumer extends 
ScheduledBatchPollingConsumer {
 
         return total;
     }
+
+    public Exchange createExchange(int rangeIndex, ValueRange valueRange) {
+        Exchange exchange = createExchange(true);
+        exchange.setPattern(getEndpoint().getExchangePattern());
+        Message message = exchange.getIn();
+        exchange.getIn().setHeader(GoogleSheetsStreamConstants.SPREADSHEET_ID,
+                getEndpoint().getConfiguration().getSpreadsheetId());
+        exchange.getIn().setHeader(GoogleSheetsStreamConstants.RANGE, 
valueRange.getRange());
+        exchange.getIn().setHeader(GoogleSheetsStreamConstants.RANGE_INDEX, 
rangeIndex);
+        
exchange.getIn().setHeader(GoogleSheetsStreamConstants.MAJOR_DIMENSION, 
valueRange.getMajorDimension());
+        message.setBody(valueRange);
+        return exchange;
+    }
+
+    public Exchange createExchange(int rangeIndex, int valueIndex, String 
range, String majorDimension, List<Object> values) {
+        Exchange exchange = createExchange(true);
+        exchange.setPattern(getEndpoint().getExchangePattern());
+        Message message = exchange.getIn();
+        exchange.getIn().setHeader(GoogleSheetsStreamConstants.SPREADSHEET_ID,
+                getEndpoint().getConfiguration().getSpreadsheetId());
+        exchange.getIn().setHeader(GoogleSheetsStreamConstants.RANGE_INDEX, 
rangeIndex);
+        exchange.getIn().setHeader(GoogleSheetsStreamConstants.VALUE_INDEX, 
valueIndex);
+        exchange.getIn().setHeader(GoogleSheetsStreamConstants.RANGE, range);
+        
exchange.getIn().setHeader(GoogleSheetsStreamConstants.MAJOR_DIMENSION, 
majorDimension);
+        message.setBody(values);
+        return exchange;
+    }
+
+    public Exchange createExchange(Spreadsheet spreadsheet) {
+        Exchange exchange = createExchange(true);
+        exchange.setPattern(getEndpoint().getExchangePattern());
+        Message message = exchange.getIn();
+        exchange.getIn().setHeader(GoogleSheetsStreamConstants.SPREADSHEET_ID, 
spreadsheet.getSpreadsheetId());
+        
exchange.getIn().setHeader(GoogleSheetsStreamConstants.SPREADSHEET_URL, 
spreadsheet.getSpreadsheetUrl());
+        message.setBody(spreadsheet);
+        return exchange;
+    }
+
 }
diff --git 
a/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamEndpoint.java
 
b/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamEndpoint.java
index 18062f3..c17f148 100644
--- 
a/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamEndpoint.java
+++ 
b/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamEndpoint.java
@@ -16,15 +16,9 @@
  */
 package org.apache.camel.component.google.sheets.stream;
 
-import java.util.List;
-
 import com.google.api.services.sheets.v4.Sheets;
-import com.google.api.services.sheets.v4.model.Spreadsheet;
-import com.google.api.services.sheets.v4.model.ValueRange;
 import org.apache.camel.Category;
 import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.component.google.sheets.GoogleSheetsClientFactory;
@@ -80,35 +74,4 @@ public class GoogleSheetsStreamEndpoint extends 
ScheduledPollEndpoint {
         return configuration;
     }
 
-    public Exchange createExchange(int rangeIndex, ValueRange valueRange) {
-        Exchange exchange = super.createExchange(getExchangePattern());
-        Message message = exchange.getIn();
-        exchange.getIn().setHeader(GoogleSheetsStreamConstants.SPREADSHEET_ID, 
configuration.getSpreadsheetId());
-        exchange.getIn().setHeader(GoogleSheetsStreamConstants.RANGE, 
valueRange.getRange());
-        exchange.getIn().setHeader(GoogleSheetsStreamConstants.RANGE_INDEX, 
rangeIndex);
-        
exchange.getIn().setHeader(GoogleSheetsStreamConstants.MAJOR_DIMENSION, 
valueRange.getMajorDimension());
-        message.setBody(valueRange);
-        return exchange;
-    }
-
-    public Exchange createExchange(int rangeIndex, int valueIndex, String 
range, String majorDimension, List<Object> values) {
-        Exchange exchange = super.createExchange(getExchangePattern());
-        Message message = exchange.getIn();
-        exchange.getIn().setHeader(GoogleSheetsStreamConstants.SPREADSHEET_ID, 
configuration.getSpreadsheetId());
-        exchange.getIn().setHeader(GoogleSheetsStreamConstants.RANGE_INDEX, 
rangeIndex);
-        exchange.getIn().setHeader(GoogleSheetsStreamConstants.VALUE_INDEX, 
valueIndex);
-        exchange.getIn().setHeader(GoogleSheetsStreamConstants.RANGE, range);
-        
exchange.getIn().setHeader(GoogleSheetsStreamConstants.MAJOR_DIMENSION, 
majorDimension);
-        message.setBody(values);
-        return exchange;
-    }
-
-    public Exchange createExchange(Spreadsheet spreadsheet) {
-        Exchange exchange = super.createExchange(getExchangePattern());
-        Message message = exchange.getIn();
-        exchange.getIn().setHeader(GoogleSheetsStreamConstants.SPREADSHEET_ID, 
spreadsheet.getSpreadsheetId());
-        
exchange.getIn().setHeader(GoogleSheetsStreamConstants.SPREADSHEET_URL, 
spreadsheet.getSpreadsheetUrl());
-        message.setBody(spreadsheet);
-        return exchange;
-    }
 }
diff --git 
a/components/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageConsumer.java
 
b/components/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageConsumer.java
index 935b7e6..b8cede8 100644
--- 
a/components/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageConsumer.java
+++ 
b/components/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageConsumer.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.google.storage;
 
+import java.io.ByteArrayOutputStream;
+import java.util.Date;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
@@ -28,8 +30,11 @@ import com.google.cloud.storage.Storage;
 import com.google.cloud.storage.Storage.CopyRequest;
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
 import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.support.ScheduledBatchPollingConsumer;
 import org.apache.camel.util.CastUtils;
@@ -73,7 +78,7 @@ public class GoogleCloudStorageConsumer extends 
ScheduledBatchPollingConsumer {
 
         String fileName = getConfiguration().getObjectName();
         String bucketName = getConfiguration().getBucketName();
-        Queue<Exchange> exchanges = new LinkedList<>();
+        Queue<Exchange> exchanges;
 
         if (fileName != null) {
             LOG.trace("Getting object in bucket [{}] with file name [{}]...", 
bucketName, fileName);
@@ -101,7 +106,7 @@ public class GoogleCloudStorageConsumer extends 
ScheduledBatchPollingConsumer {
 
     protected Queue<Exchange> createExchanges(Blob blob, String key) {
         Queue<Exchange> answer = new LinkedList<>();
-        Exchange exchange = getEndpoint().createExchange(blob, key);
+        Exchange exchange = createExchange(blob, key);
         answer.add(exchange);
         return answer;
     }
@@ -115,7 +120,7 @@ public class GoogleCloudStorageConsumer extends 
ScheduledBatchPollingConsumer {
         try {
             for (Blob blob : blobList) {
                 if (includeObject(blob)) {
-                    Exchange exchange = getEndpoint().createExchange(blob, 
blob.getBlobId().getName());
+                    Exchange exchange = createExchange(blob, 
blob.getBlobId().getName());
                     answer.add(exchange);
                 }
             }
@@ -248,4 +253,57 @@ public class GoogleCloudStorageConsumer extends 
ScheduledBatchPollingConsumer {
     public GoogleCloudStorageEndpoint getEndpoint() {
         return (GoogleCloudStorageEndpoint) super.getEndpoint();
     }
+
+    public Exchange createExchange(Blob blob, String key) {
+        return createExchange(getEndpoint().getExchangePattern(), blob, key);
+    }
+
+    public Exchange createExchange(ExchangePattern pattern, Blob blob, String 
key) {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Getting object with key [{}] from bucket [{}]...", key, 
getConfiguration().getBucketName());
+            LOG.trace("Got object [{}]", blob);
+        }
+
+        Exchange exchange = createExchange(true);
+        exchange.setPattern(pattern);
+        Message message = exchange.getIn();
+
+        if (getConfiguration().isIncludeBody()) {
+            try {
+                ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                blob.downloadTo(baos);
+                message.setBody(baos.toByteArray());
+            } catch (Exception e) {
+                throw new RuntimeCamelException(e);
+            }
+        } else {
+            message.setBody(blob);
+        }
+
+        message.setHeader(GoogleCloudStorageConstants.OBJECT_NAME, key);
+        message.setHeader(GoogleCloudStorageConstants.BUCKET_NAME, 
getConfiguration().getBucketName());
+        //OTHER METADATA
+        message.setHeader(GoogleCloudStorageConstants.CACHE_CONTROL, 
blob.getCacheControl());
+        
message.setHeader(GoogleCloudStorageConstants.METADATA_COMPONENT_COUNT, 
blob.getComponentCount());
+        message.setHeader(GoogleCloudStorageConstants.CONTENT_DISPOSITION, 
blob.getContentDisposition());
+        message.setHeader(GoogleCloudStorageConstants.CONTENT_ENCODING, 
blob.getContentEncoding());
+        
message.setHeader(GoogleCloudStorageConstants.METADATA_CONTENT_LANGUAGE, 
blob.getContentLanguage());
+        message.setHeader(GoogleCloudStorageConstants.CONTENT_TYPE, 
blob.getContentType());
+        message.setHeader(GoogleCloudStorageConstants.METADATA_CUSTOM_TIME, 
blob.getCustomTime());
+        message.setHeader(GoogleCloudStorageConstants.METADATA_CRC32C_HEX, 
blob.getCrc32cToHexString());
+        message.setHeader(GoogleCloudStorageConstants.METADATA_ETAG, 
blob.getEtag());
+        message.setHeader(GoogleCloudStorageConstants.METADATA_GENERATION, 
blob.getGeneration());
+        message.setHeader(GoogleCloudStorageConstants.METADATA_BLOB_ID, 
blob.getBlobId());
+        message.setHeader(GoogleCloudStorageConstants.METADATA_KMS_KEY_NAME, 
blob.getKmsKeyName());
+        message.setHeader(GoogleCloudStorageConstants.CONTENT_MD5, 
blob.getMd5ToHexString());
+        message.setHeader(GoogleCloudStorageConstants.METADATA_MEDIA_LINK, 
blob.getMediaLink());
+        message.setHeader(GoogleCloudStorageConstants.METADATA_METAGENERATION, 
blob.getMetageneration());
+        message.setHeader(GoogleCloudStorageConstants.CONTENT_LENGTH, 
blob.getSize());
+        message.setHeader(GoogleCloudStorageConstants.METADATA_STORAGE_CLASS, 
blob.getStorageClass());
+        message.setHeader(GoogleCloudStorageConstants.METADATA_CREATE_TIME, 
blob.getCreateTime());
+        message.setHeader(GoogleCloudStorageConstants.METADATA_LAST_UPDATE, 
new Date(blob.getUpdateTime()));
+
+        return exchange;
+    }
+
 }
diff --git 
a/components/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageEndpoint.java
 
b/components/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageEndpoint.java
index f3fe59a..79da387 100644
--- 
a/components/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageEndpoint.java
+++ 
b/components/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageEndpoint.java
@@ -16,10 +16,6 @@
  */
 package org.apache.camel.component.google.storage;
 
-import java.io.ByteArrayOutputStream;
-import java.util.Date;
-
-import com.google.cloud.storage.Blob;
 import com.google.cloud.storage.Bucket;
 import com.google.cloud.storage.BucketInfo;
 import com.google.cloud.storage.BucketInfo.Builder;
@@ -27,12 +23,8 @@ import com.google.cloud.storage.Storage;
 import com.google.cloud.storage.StorageClass;
 import org.apache.camel.Category;
 import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.support.ScheduledPollEndpoint;
@@ -123,55 +115,4 @@ public class GoogleCloudStorageEndpoint extends 
ScheduledPollEndpoint {
         return storageClient;
     }
 
-    public Exchange createExchange(Blob blob, String key) {
-        return createExchange(getExchangePattern(), blob, key);
-    }
-
-    public Exchange createExchange(ExchangePattern pattern, Blob blob, String 
key) {
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Getting object with key [{}] from bucket [{}]...", key, 
getConfiguration().getBucketName());
-            LOG.trace("Got object [{}]", blob);
-        }
-
-        Exchange exchange = super.createExchange(pattern);
-        Message message = exchange.getIn();
-
-        if (configuration.isIncludeBody()) {
-            try {
-                ByteArrayOutputStream baos = new ByteArrayOutputStream();
-                blob.downloadTo(baos);
-                message.setBody(baos.toByteArray());
-            } catch (Exception e) {
-                throw new RuntimeCamelException(e);
-            }
-        } else {
-            message.setBody(blob);
-        }
-
-        message.setHeader(GoogleCloudStorageConstants.OBJECT_NAME, key);
-        message.setHeader(GoogleCloudStorageConstants.BUCKET_NAME, 
getConfiguration().getBucketName());
-        //OTHER METADATA        
-        message.setHeader(GoogleCloudStorageConstants.CACHE_CONTROL, 
blob.getCacheControl());
-        
message.setHeader(GoogleCloudStorageConstants.METADATA_COMPONENT_COUNT, 
blob.getComponentCount());
-        message.setHeader(GoogleCloudStorageConstants.CONTENT_DISPOSITION, 
blob.getContentDisposition());
-        message.setHeader(GoogleCloudStorageConstants.CONTENT_ENCODING, 
blob.getContentEncoding());
-        
message.setHeader(GoogleCloudStorageConstants.METADATA_CONTENT_LANGUAGE, 
blob.getContentLanguage());
-        message.setHeader(GoogleCloudStorageConstants.CONTENT_TYPE, 
blob.getContentType());
-        message.setHeader(GoogleCloudStorageConstants.METADATA_CUSTOM_TIME, 
blob.getCustomTime());
-        message.setHeader(GoogleCloudStorageConstants.METADATA_CRC32C_HEX, 
blob.getCrc32cToHexString());
-        message.setHeader(GoogleCloudStorageConstants.METADATA_ETAG, 
blob.getEtag());
-        message.setHeader(GoogleCloudStorageConstants.METADATA_GENERATION, 
blob.getGeneration());
-        message.setHeader(GoogleCloudStorageConstants.METADATA_BLOB_ID, 
blob.getBlobId());
-        message.setHeader(GoogleCloudStorageConstants.METADATA_KMS_KEY_NAME, 
blob.getKmsKeyName());
-        message.setHeader(GoogleCloudStorageConstants.CONTENT_MD5, 
blob.getMd5ToHexString());
-        message.setHeader(GoogleCloudStorageConstants.METADATA_MEDIA_LINK, 
blob.getMediaLink());
-        message.setHeader(GoogleCloudStorageConstants.METADATA_METAGENERATION, 
blob.getMetageneration());
-        message.setHeader(GoogleCloudStorageConstants.CONTENT_LENGTH, 
blob.getSize());
-        message.setHeader(GoogleCloudStorageConstants.METADATA_STORAGE_CLASS, 
blob.getStorageClass());
-        message.setHeader(GoogleCloudStorageConstants.METADATA_CREATE_TIME, 
blob.getCreateTime());
-        message.setHeader(GoogleCloudStorageConstants.METADATA_LAST_UPDATE, 
new Date(blob.getUpdateTime()));
-
-        return exchange;
-    }
-
 }
diff --git 
a/components/camel-gora/src/main/java/org/apache/camel/component/gora/GoraConsumer.java
 
b/components/camel-gora/src/main/java/org/apache/camel/component/gora/GoraConsumer.java
index 8bd0fc4..876c109 100644
--- 
a/components/camel-gora/src/main/java/org/apache/camel/component/gora/GoraConsumer.java
+++ 
b/components/camel-gora/src/main/java/org/apache/camel/component/gora/GoraConsumer.java
@@ -76,7 +76,7 @@ public class GoraConsumer extends ScheduledPollConsumer {
 
     @Override
     protected int poll() throws Exception {
-        final Exchange exchange = this.getEndpoint().createExchange();
+        final Exchange exchange = createExchange(true);
 
         // compute time (approx) since last update
         if (firstRun) {
@@ -88,13 +88,7 @@ public class GoraConsumer extends ScheduledPollConsumer {
         //proceed with query
         final Result result = query.execute();
 
-        try {
-            getProcessor().process(exchange);
-        } finally {
-            if (exchange.getException() != null) {
-                getExceptionHandler().handleException("Error processing 
exchange", exchange, exchange.getException());
-            }
-        }
+        getProcessor().process(exchange);
 
         return Long.valueOf(result.getOffset()).intValue();
     }
diff --git 
a/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/CamelEventHandler.java
 
b/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/CamelEventHandler.java
index b5d8bcc..9b53f57 100644
--- 
a/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/CamelEventHandler.java
+++ 
b/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/CamelEventHandler.java
@@ -31,14 +31,16 @@ import org.slf4j.LoggerFactory;
 public class CamelEventHandler {
 
     protected final Logger log = 
LoggerFactory.getLogger(CamelEventHandler.class);
-    protected final GuavaEventBusEndpoint eventBusEndpoint;
+    protected final GuavaEventBusConsumer consumer;
+    protected final GuavaEventBusEndpoint endpoint;
     protected final AsyncProcessor processor;
 
-    public CamelEventHandler(GuavaEventBusEndpoint eventBusEndpoint, Processor 
processor) {
-        ObjectHelper.notNull(eventBusEndpoint, "eventBusEndpoint");
+    public CamelEventHandler(GuavaEventBusConsumer consumer, 
GuavaEventBusEndpoint endpoint, Processor processor) {
+        ObjectHelper.notNull(endpoint, "eventBusEndpoint");
         ObjectHelper.notNull(processor, "processor");
 
-        this.eventBusEndpoint = eventBusEndpoint;
+        this.consumer = consumer;
+        this.endpoint = endpoint;
         this.processor = AsyncProcessorConverterHelper.convert(processor);
     }
 
@@ -49,7 +51,7 @@ public class CamelEventHandler {
      */
     public void doEventReceived(Object event) {
         log.trace("Received event: {}", event);
-        final Exchange exchange = eventBusEndpoint.createExchange(event);
+        final Exchange exchange = createExchange(event);
         log.debug("Processing event: {}", event);
         // use async processor to support async routing engine
         processor.process(exchange, new AsyncCallback() {
@@ -60,4 +62,10 @@ public class CamelEventHandler {
         });
     }
 
+    public Exchange createExchange(Object event) {
+        Exchange exchange = consumer.createExchange(true);
+        exchange.getIn().setBody(event);
+        return exchange;
+    }
+
 }
diff --git 
a/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/FilteringCamelEventHandler.java
 
b/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/FilteringCamelEventHandler.java
index 7b01d3b..c950d3f 100644
--- 
a/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/FilteringCamelEventHandler.java
+++ 
b/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/FilteringCamelEventHandler.java
@@ -27,8 +27,9 @@ public class FilteringCamelEventHandler extends 
CamelEventHandler {
 
     private final Class<?> eventClass;
 
-    public FilteringCamelEventHandler(GuavaEventBusEndpoint eventBusEndpoint, 
Processor processor, Class<?> eventClass) {
-        super(eventBusEndpoint, processor);
+    public FilteringCamelEventHandler(GuavaEventBusConsumer consumer, 
GuavaEventBusEndpoint endpoint, Processor processor,
+                                      Class<?> eventClass) {
+        super(consumer, endpoint, processor);
         this.eventClass = eventClass;
     }
 
diff --git 
a/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumer.java
 
b/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumer.java
index 1740117..bdf9500 100644
--- 
a/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumer.java
+++ 
b/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumer.java
@@ -50,7 +50,7 @@ public class GuavaEventBusConsumer extends DefaultConsumer {
         if (listenerInterface != null) {
             this.eventHandler = createListenerInterfaceProxy(endpoint, 
processor, listenerInterface);
         } else {
-            this.eventHandler = new FilteringCamelEventHandler(endpoint, 
processor, eventClass);
+            this.eventHandler = new FilteringCamelEventHandler(this, endpoint, 
processor, eventClass);
         }
     }
 
@@ -72,7 +72,7 @@ public class GuavaEventBusConsumer extends DefaultConsumer {
             GuavaEventBusEndpoint endpoint, Processor processor, Class<?> 
listenerInterface) {
         ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
         return Proxy.newProxyInstance(classLoader, new Class[] { 
listenerInterface },
-                new ListenerInterfaceHandler(endpoint, processor));
+                new ListenerInterfaceHandler(this, endpoint, processor));
     }
 
     private static final class ListenerInterfaceHandler implements 
InvocationHandler {
@@ -81,8 +81,8 @@ public class GuavaEventBusConsumer extends DefaultConsumer {
 
         private final CamelEventHandler delegateHandler;
 
-        private ListenerInterfaceHandler(GuavaEventBusEndpoint endpoint, 
Processor processor) {
-            this.delegateHandler = new CamelEventHandler(endpoint, processor);
+        private ListenerInterfaceHandler(GuavaEventBusConsumer consumer, 
GuavaEventBusEndpoint endpoint, Processor processor) {
+            this.delegateHandler = new CamelEventHandler(consumer, endpoint, 
processor);
         }
 
         @Override
diff --git 
a/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusEndpoint.java
 
b/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusEndpoint.java
index d246eb6..83181b8 100644
--- 
a/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusEndpoint.java
+++ 
b/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusEndpoint.java
@@ -20,7 +20,6 @@ import com.google.common.eventbus.EventBus;
 import org.apache.camel.Category;
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
 import org.apache.camel.MultipleConsumersSupport;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
@@ -69,12 +68,6 @@ public class GuavaEventBusEndpoint extends DefaultEndpoint 
implements MultipleCo
         return true;
     }
 
-    public Exchange createExchange(Object event) {
-        Exchange exchange = createExchange();
-        exchange.getIn().setBody(event);
-        return exchange;
-    }
-
     public String getEventBusRef() {
         return eventBusRef;
     }

Reply via email to