This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/exchange-factory by this push:
new 3609376 CAMEL-16222: PooledExchangeFactory experiment
3609376 is described below
commit 36093762e671bdd29156bf32092c0d6cb92798f8
Author: Claus Ibsen <[email protected]>
AuthorDate: Sun Feb 21 19:40:18 2021 +0100
CAMEL-16222: PooledExchangeFactory experiment
---
.../component/git/consumer/GitBranchConsumer.java | 2 +-
.../component/git/consumer/GitCommitConsumer.java | 2 +-
.../component/git/consumer/GitTagConsumer.java | 2 +-
.../component/github/consumer/CommitConsumer.java | 2 +-
.../component/github/consumer/EventsConsumer.java | 2 +-
.../consumer/PullRequestCommentConsumer.java | 2 +-
.../github/consumer/PullRequestConsumer.java | 2 +-
.../component/github/consumer/TagConsumer.java | 2 +-
.../mail/stream/GoogleMailStreamConsumer.java | 43 ++++++++++++++-
.../mail/stream/GoogleMailStreamEndpoint.java | 42 --------------
.../google/pubsub/GooglePubsubConsumer.java | 8 +--
.../pubsub/consumer/CamelMessageReceiver.java | 9 ++-
.../sheets/stream/GoogleSheetsStreamConsumer.java | 48 ++++++++++++++--
.../sheets/stream/GoogleSheetsStreamEndpoint.java | 37 -------------
.../google/storage/GoogleCloudStorageConsumer.java | 64 +++++++++++++++++++++-
.../google/storage/GoogleCloudStorageEndpoint.java | 59 --------------------
.../apache/camel/component/gora/GoraConsumer.java | 10 +---
.../guava/eventbus/CamelEventHandler.java | 18 ++++--
.../guava/eventbus/FilteringCamelEventHandler.java | 5 +-
.../guava/eventbus/GuavaEventBusConsumer.java | 8 +--
.../guava/eventbus/GuavaEventBusEndpoint.java | 7 ---
21 files changed, 187 insertions(+), 187 deletions(-)
diff --git
a/components/camel-git/src/main/java/org/apache/camel/component/git/consumer/GitBranchConsumer.java
b/components/camel-git/src/main/java/org/apache/camel/component/git/consumer/GitBranchConsumer.java
index 49d9456..6dc075d 100644
---
a/components/camel-git/src/main/java/org/apache/camel/component/git/consumer/GitBranchConsumer.java
+++
b/components/camel-git/src/main/java/org/apache/camel/component/git/consumer/GitBranchConsumer.java
@@ -40,7 +40,7 @@ public class GitBranchConsumer extends AbstractGitConsumer {
List<Ref> call =
getGit().branchList().setListMode(ListMode.ALL).call();
for (Ref ref : call) {
if (!branchesConsumed.contains(ref.getName())) {
- Exchange e = getEndpoint().createExchange();
+ Exchange e = createExchange(true);
e.getMessage().setBody(ref.getName());
e.getMessage().setHeader(GitConstants.GIT_BRANCH_LEAF,
ref.getLeaf().getName());
e.getMessage().setHeader(GitConstants.GIT_BRANCH_OBJECT_ID,
ref.getObjectId().getName());
diff --git
a/components/camel-git/src/main/java/org/apache/camel/component/git/consumer/GitCommitConsumer.java
b/components/camel-git/src/main/java/org/apache/camel/component/git/consumer/GitCommitConsumer.java
index 06e203b..122a85a 100644
---
a/components/camel-git/src/main/java/org/apache/camel/component/git/consumer/GitCommitConsumer.java
+++
b/components/camel-git/src/main/java/org/apache/camel/component/git/consumer/GitCommitConsumer.java
@@ -46,7 +46,7 @@ public class GitCommitConsumer extends AbstractGitConsumer {
}
for (RevCommit commit : commits) {
if (!commitsConsumed.contains(commit.getId())) {
- Exchange e = getEndpoint().createExchange();
+ Exchange e = createExchange(true);
e.getMessage().setBody(commit.getFullMessage());
e.getMessage().setHeader(GitConstants.GIT_COMMIT_ID,
commit.getId());
e.getMessage().setHeader(GitConstants.GIT_COMMIT_AUTHOR_NAME,
commit.getAuthorIdent().getName());
diff --git
a/components/camel-git/src/main/java/org/apache/camel/component/git/consumer/GitTagConsumer.java
b/components/camel-git/src/main/java/org/apache/camel/component/git/consumer/GitTagConsumer.java
index 31ea39a..7052764 100644
---
a/components/camel-git/src/main/java/org/apache/camel/component/git/consumer/GitTagConsumer.java
+++
b/components/camel-git/src/main/java/org/apache/camel/component/git/consumer/GitTagConsumer.java
@@ -39,7 +39,7 @@ public class GitTagConsumer extends AbstractGitConsumer {
List<Ref> call = getGit().tagList().call();
for (Ref ref : call) {
if (!tagsConsumed.contains(ref.getName())) {
- Exchange e = getEndpoint().createExchange();
+ Exchange e = createExchange(true);
e.getMessage().setBody(ref.getName());
e.getMessage().setHeader(GitConstants.GIT_BRANCH_LEAF,
ref.getLeaf().getName());
e.getMessage().setHeader(GitConstants.GIT_BRANCH_OBJECT_ID,
ref.getObjectId().getName());
diff --git
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java
index ac92e01..5a4bbf7 100644
---
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java
+++
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/CommitConsumer.java
@@ -71,7 +71,7 @@ public class CommitConsumer extends AbstractGitHubConsumer {
while (!newCommits.empty()) {
RepositoryCommit newCommit = newCommits.pop();
- Exchange e = getEndpoint().createExchange();
+ Exchange e = createExchange(true);
e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_AUTHOR,
newCommit.getAuthor().getName());
e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_COMMITTER,
newCommit.getCommitter().getName());
e.getMessage().setHeader(GitHubConstants.GITHUB_COMMIT_SHA,
newCommit.getSha());
diff --git
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/EventsConsumer.java
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/EventsConsumer.java
index 343fe45..a47e763 100644
---
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/EventsConsumer.java
+++
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/EventsConsumer.java
@@ -78,7 +78,7 @@ public class EventsConsumer extends AbstractGitHubConsumer {
lastEventId = Long.parseLong(latestEvent.getId());
for (Event event : newEvents) {
- Exchange exchange = getEndpoint().createExchange();
+ Exchange exchange = createExchange(true);
exchange.getMessage().setBody(event.getType());
exchange.getMessage().setHeader(GitHubConstants.GITHUB_EVENT_PAYLOAD,
event.getPayload());
getProcessor().process(exchange);
diff --git
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestCommentConsumer.java
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestCommentConsumer.java
index 7f2164b..fc4fa03 100644
---
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestCommentConsumer.java
+++
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestCommentConsumer.java
@@ -110,7 +110,7 @@ public class PullRequestCommentConsumer extends
AbstractGitHubConsumer {
while (!newComments.empty()) {
Comment newComment = newComments.pop();
- Exchange e = getEndpoint().createExchange();
+ Exchange e = createExchange(true);
e.getIn().setBody(newComment);
// Required by the producers. Set it here for convenience.
diff --git
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestConsumer.java
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestConsumer.java
index 9d6b804..61d839c 100644
---
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestConsumer.java
+++
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/PullRequestConsumer.java
@@ -76,7 +76,7 @@ public class PullRequestConsumer extends
AbstractGitHubConsumer {
while (!newPullRequests.empty()) {
PullRequest newPullRequest = newPullRequests.pop();
- Exchange e = getEndpoint().createExchange();
+ Exchange e = createExchange(true);
e.getIn().setBody(newPullRequest);
diff --git
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/TagConsumer.java
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/TagConsumer.java
index 559cbba..91a7107 100644
---
a/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/TagConsumer.java
+++
b/components/camel-github/src/main/java/org/apache/camel/component/github/consumer/TagConsumer.java
@@ -56,7 +56,7 @@ public class TagConsumer extends AbstractGitHubConsumer {
while (!newTags.empty()) {
RepositoryTag newTag = newTags.pop();
- Exchange e = getEndpoint().createExchange();
+ Exchange e = createExchange(true);
e.getIn().setBody(newTag);
getProcessor().process(e);
}
diff --git
a/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java
b/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java
index 4d4f5e2..da70219 100644
---
a/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java
+++
b/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamConsumer.java
@@ -16,17 +16,22 @@
*/
package org.apache.camel.component.google.mail.stream;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
+import com.google.api.client.util.Base64;
import com.google.api.services.gmail.Gmail;
import com.google.api.services.gmail.model.ListMessagesResponse;
import com.google.api.services.gmail.model.Message;
+import com.google.api.services.gmail.model.MessagePart;
+import com.google.api.services.gmail.model.MessagePartHeader;
import com.google.api.services.gmail.model.ModifyMessageRequest;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.Processor;
import org.apache.camel.spi.Synchronization;
@@ -83,7 +88,7 @@ public class GoogleMailStreamConsumer extends
ScheduledBatchPollingConsumer {
if (c.getMessages() != null) {
for (Message message : c.getMessages()) {
Message mess = getClient().users().messages().get("me",
message.getId()).setFormat("FULL").execute();
- Exchange exchange =
getEndpoint().createExchange(getEndpoint().getExchangePattern(), mess);
+ Exchange exchange =
createExchange(getEndpoint().getExchangePattern(), mess);
answer.add(exchange);
}
}
@@ -170,4 +175,40 @@ public class GoogleMailStreamConsumer extends
ScheduledBatchPollingConsumer {
}
}
+ public Exchange createExchange(ExchangePattern pattern,
com.google.api.services.gmail.model.Message mail) {
+ Exchange exchange = createExchange(true);
+ exchange.setPattern(pattern);
+ org.apache.camel.Message message = exchange.getIn();
+ exchange.getIn().setHeader(GoogleMailStreamConstants.MAIL_ID,
mail.getId());
+ List<MessagePart> parts = mail.getPayload().getParts();
+ if (parts != null && parts.get(0).getBody().getData() != null) {
+ byte[] bodyBytes =
Base64.decodeBase64(parts.get(0).getBody().getData().trim());
+ String body = new String(bodyBytes, StandardCharsets.UTF_8);
+ message.setBody(body);
+ }
+ configureHeaders(message, mail.getPayload().getHeaders());
+ return exchange;
+ }
+
+ private void configureHeaders(org.apache.camel.Message message,
List<MessagePartHeader> headers) {
+ for (MessagePartHeader header : headers) {
+ String headerName = header.getName();
+ if ("SUBJECT".equalsIgnoreCase(headerName)) {
+ message.setHeader(GoogleMailStreamConstants.MAIL_SUBJECT,
header.getValue());
+ }
+ if ("TO".equalsIgnoreCase(headerName)) {
+ message.setHeader(GoogleMailStreamConstants.MAIL_TO,
header.getValue());
+ }
+ if ("FROM".equalsIgnoreCase(headerName)) {
+ message.setHeader(GoogleMailStreamConstants.MAIL_FROM,
header.getValue());
+ }
+ if ("CC".equalsIgnoreCase(headerName)) {
+ message.setHeader(GoogleMailStreamConstants.MAIL_CC,
header.getValue());
+ }
+ if ("BCC".equalsIgnoreCase(headerName)) {
+ message.setHeader(GoogleMailStreamConstants.MAIL_BCC,
header.getValue());
+ }
+ }
+ }
+
}
diff --git
a/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamEndpoint.java
b/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamEndpoint.java
index 8c47b31..947f69a2 100644
---
a/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamEndpoint.java
+++
b/components/camel-google-mail/src/main/java/org/apache/camel/component/google/mail/stream/GoogleMailStreamEndpoint.java
@@ -16,22 +16,15 @@
*/
package org.apache.camel.component.google.mail.stream;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
-import com.google.api.client.util.Base64;
import com.google.api.services.gmail.Gmail;
import com.google.api.services.gmail.model.Label;
import com.google.api.services.gmail.model.ListLabelsResponse;
-import com.google.api.services.gmail.model.MessagePart;
-import com.google.api.services.gmail.model.MessagePartHeader;
import com.google.common.base.Splitter;
import org.apache.camel.Category;
import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.component.google.mail.GoogleMailClientFactory;
@@ -108,41 +101,6 @@ public class GoogleMailStreamEndpoint extends
ScheduledPollEndpoint {
return configuration;
}
- public Exchange createExchange(ExchangePattern pattern,
com.google.api.services.gmail.model.Message mail) {
- Exchange exchange = super.createExchange(pattern);
- Message message = exchange.getIn();
- exchange.getIn().setHeader(GoogleMailStreamConstants.MAIL_ID,
mail.getId());
- List<MessagePart> parts = mail.getPayload().getParts();
- if (parts != null && parts.get(0).getBody().getData() != null) {
- byte[] bodyBytes =
Base64.decodeBase64(parts.get(0).getBody().getData().trim());
- String body = new String(bodyBytes, StandardCharsets.UTF_8);
- message.setBody(body);
- }
- setHeaders(message, mail.getPayload().getHeaders());
- return exchange;
- }
-
- private void setHeaders(Message message, List<MessagePartHeader> headers) {
- for (MessagePartHeader header : headers) {
- String headerName = header.getName();
- if ("SUBJECT".equalsIgnoreCase(headerName)) {
- message.setHeader(GoogleMailStreamConstants.MAIL_SUBJECT,
header.getValue());
- }
- if ("TO".equalsIgnoreCase(headerName)) {
- message.setHeader(GoogleMailStreamConstants.MAIL_TO,
header.getValue());
- }
- if ("FROM".equalsIgnoreCase(headerName)) {
- message.setHeader(GoogleMailStreamConstants.MAIL_FROM,
header.getValue());
- }
- if ("CC".equalsIgnoreCase(headerName)) {
- message.setHeader(GoogleMailStreamConstants.MAIL_CC,
header.getValue());
- }
- if ("BCC".equalsIgnoreCase(headerName)) {
- message.setHeader(GoogleMailStreamConstants.MAIL_BCC,
header.getValue());
- }
- }
- }
-
private List<String> splitLabels(String labels) {
return Splitter.on(',').splitToList(labels);
}
diff --git
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
index 360dadc..8adc5e1 100644
---
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
+++
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/GooglePubsubConsumer.java
@@ -40,7 +40,7 @@ import org.apache.camel.support.DefaultConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-class GooglePubsubConsumer extends DefaultConsumer {
+public class GooglePubsubConsumer extends DefaultConsumer {
private Logger localLog;
@@ -123,7 +123,7 @@ class GooglePubsubConsumer extends DefaultConsumer {
private void asynchronousPull(String subscriptionName) {
while (isRunAllowed() && !isSuspendingOrSuspended()) {
- MessageReceiver messageReceiver = new
CamelMessageReceiver(endpoint, processor);
+ MessageReceiver messageReceiver = new
CamelMessageReceiver(GooglePubsubConsumer.this, endpoint, processor);
Subscriber subscriber =
endpoint.getComponent().getSubscriber(subscriptionName, messageReceiver);
try {
@@ -152,7 +152,7 @@ class GooglePubsubConsumer extends DefaultConsumer {
PullResponse pullResponse =
subscriber.pullCallable().call(pullRequest);
for (ReceivedMessage message :
pullResponse.getReceivedMessagesList()) {
PubsubMessage pubsubMessage = message.getMessage();
- Exchange exchange = endpoint.createExchange();
+ Exchange exchange = createExchange(true);
exchange.getIn().setBody(pubsubMessage.getData().toByteArray());
exchange.getIn().setHeader(GooglePubsubConstants.ACK_ID, message.getAckId());
@@ -171,7 +171,7 @@ class GooglePubsubConsumer extends DefaultConsumer {
try {
processor.process(exchange);
} catch (Exception e) {
- exchange.setException(e);
+ getExceptionHandler().handleException(e);
}
}
} catch (IOException e) {
diff --git
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
index 0b441b0..d26d649 100644
---
a/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
+++
b/components/camel-google-pubsub/src/main/java/org/apache/camel/component/google/pubsub/consumer/CamelMessageReceiver.java
@@ -24,6 +24,7 @@ import org.apache.camel.Exchange;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.Processor;
import org.apache.camel.component.google.pubsub.GooglePubsubConstants;
+import org.apache.camel.component.google.pubsub.GooglePubsubConsumer;
import org.apache.camel.component.google.pubsub.GooglePubsubEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,10 +32,12 @@ import org.slf4j.LoggerFactory;
public class CamelMessageReceiver implements MessageReceiver {
private final Logger localLog;
+ private final GooglePubsubConsumer consumer;
private final GooglePubsubEndpoint endpoint;
private final Processor processor;
- public CamelMessageReceiver(GooglePubsubEndpoint endpoint, Processor
processor) {
+ public CamelMessageReceiver(GooglePubsubConsumer consumer,
GooglePubsubEndpoint endpoint, Processor processor) {
+ this.consumer = consumer;
this.endpoint = endpoint;
this.processor = processor;
String loggerId = endpoint.getLoggerId();
@@ -50,7 +53,7 @@ public class CamelMessageReceiver implements MessageReceiver {
localLog.trace("Received message ID : {}",
pubsubMessage.getMessageId());
}
- Exchange exchange = endpoint.createExchange();
+ Exchange exchange = consumer.createExchange(true);
exchange.getIn().setBody(pubsubMessage.getData().toByteArray());
exchange.getIn().setHeader(GooglePubsubConstants.MESSAGE_ID,
pubsubMessage.getMessageId());
@@ -67,7 +70,7 @@ public class CamelMessageReceiver implements MessageReceiver {
try {
processor.process(exchange);
} catch (Exception e) {
- exchange.setException(e);
+ consumer.getExceptionHandler().handleException(e);
}
}
}
diff --git
a/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConsumer.java
b/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConsumer.java
index 5d382db..eb04b5d 100644
---
a/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConsumer.java
+++
b/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamConsumer.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.google.sheets.stream;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -29,6 +30,7 @@ import com.google.api.services.sheets.v4.model.Spreadsheet;
import com.google.api.services.sheets.v4.model.ValueRange;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
+import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.support.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
@@ -89,12 +91,12 @@ public class GoogleSheetsStreamConsumer extends
ScheduledBatchPollingConsumer {
if (getConfiguration().getMaxResults() > 0) {
valueRange.getValues().stream()
.limit(getConfiguration().getMaxResults())
- .map(values ->
getEndpoint().createExchange(rangeIndex.get(), valueIndex.incrementAndGet(),
+ .map(values ->
createExchange(rangeIndex.get(), valueIndex.incrementAndGet(),
valueRange.getRange(),
valueRange.getMajorDimension(), values))
.forEach(answer::add);
} else {
valueRange.getValues().stream()
- .map(values ->
getEndpoint().createExchange(rangeIndex.get(), valueIndex.incrementAndGet(),
+ .map(values ->
createExchange(rangeIndex.get(), valueIndex.incrementAndGet(),
valueRange.getRange(),
valueRange.getMajorDimension(), values))
.forEach(answer::add);
}
@@ -112,7 +114,7 @@ public class GoogleSheetsStreamConsumer extends
ScheduledBatchPollingConsumer {
.collect(Collectors.toList()));
}
})
- .map(valueRange ->
getEndpoint().createExchange(rangeIndex.incrementAndGet(), valueRange))
+ .map(valueRange ->
createExchange(rangeIndex.incrementAndGet(), valueRange))
.forEach(answer::add);
}
}
@@ -122,7 +124,7 @@ public class GoogleSheetsStreamConsumer extends
ScheduledBatchPollingConsumer {
request.setIncludeGridData(getConfiguration().isIncludeGridData());
Spreadsheet spreadsheet = request.execute();
- answer.add(getEndpoint().createExchange(spreadsheet));
+ answer.add(createExchange(spreadsheet));
}
return processBatch(CastUtils.cast(answer));
@@ -148,4 +150,42 @@ public class GoogleSheetsStreamConsumer extends
ScheduledBatchPollingConsumer {
return total;
}
+
+ public Exchange createExchange(int rangeIndex, ValueRange valueRange) {
+ Exchange exchange = createExchange(true);
+ exchange.setPattern(getEndpoint().getExchangePattern());
+ Message message = exchange.getIn();
+ exchange.getIn().setHeader(GoogleSheetsStreamConstants.SPREADSHEET_ID,
+ getEndpoint().getConfiguration().getSpreadsheetId());
+ exchange.getIn().setHeader(GoogleSheetsStreamConstants.RANGE,
valueRange.getRange());
+ exchange.getIn().setHeader(GoogleSheetsStreamConstants.RANGE_INDEX,
rangeIndex);
+
exchange.getIn().setHeader(GoogleSheetsStreamConstants.MAJOR_DIMENSION,
valueRange.getMajorDimension());
+ message.setBody(valueRange);
+ return exchange;
+ }
+
+ public Exchange createExchange(int rangeIndex, int valueIndex, String
range, String majorDimension, List<Object> values) {
+ Exchange exchange = createExchange(true);
+ exchange.setPattern(getEndpoint().getExchangePattern());
+ Message message = exchange.getIn();
+ exchange.getIn().setHeader(GoogleSheetsStreamConstants.SPREADSHEET_ID,
+ getEndpoint().getConfiguration().getSpreadsheetId());
+ exchange.getIn().setHeader(GoogleSheetsStreamConstants.RANGE_INDEX,
rangeIndex);
+ exchange.getIn().setHeader(GoogleSheetsStreamConstants.VALUE_INDEX,
valueIndex);
+ exchange.getIn().setHeader(GoogleSheetsStreamConstants.RANGE, range);
+
exchange.getIn().setHeader(GoogleSheetsStreamConstants.MAJOR_DIMENSION,
majorDimension);
+ message.setBody(values);
+ return exchange;
+ }
+
+ public Exchange createExchange(Spreadsheet spreadsheet) {
+ Exchange exchange = createExchange(true);
+ exchange.setPattern(getEndpoint().getExchangePattern());
+ Message message = exchange.getIn();
+ exchange.getIn().setHeader(GoogleSheetsStreamConstants.SPREADSHEET_ID,
spreadsheet.getSpreadsheetId());
+
exchange.getIn().setHeader(GoogleSheetsStreamConstants.SPREADSHEET_URL,
spreadsheet.getSpreadsheetUrl());
+ message.setBody(spreadsheet);
+ return exchange;
+ }
+
}
diff --git
a/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamEndpoint.java
b/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamEndpoint.java
index 18062f3..c17f148 100644
---
a/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamEndpoint.java
+++
b/components/camel-google-sheets/src/main/java/org/apache/camel/component/google/sheets/stream/GoogleSheetsStreamEndpoint.java
@@ -16,15 +16,9 @@
*/
package org.apache.camel.component.google.sheets.stream;
-import java.util.List;
-
import com.google.api.services.sheets.v4.Sheets;
-import com.google.api.services.sheets.v4.model.Spreadsheet;
-import com.google.api.services.sheets.v4.model.ValueRange;
import org.apache.camel.Category;
import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.component.google.sheets.GoogleSheetsClientFactory;
@@ -80,35 +74,4 @@ public class GoogleSheetsStreamEndpoint extends
ScheduledPollEndpoint {
return configuration;
}
- public Exchange createExchange(int rangeIndex, ValueRange valueRange) {
- Exchange exchange = super.createExchange(getExchangePattern());
- Message message = exchange.getIn();
- exchange.getIn().setHeader(GoogleSheetsStreamConstants.SPREADSHEET_ID,
configuration.getSpreadsheetId());
- exchange.getIn().setHeader(GoogleSheetsStreamConstants.RANGE,
valueRange.getRange());
- exchange.getIn().setHeader(GoogleSheetsStreamConstants.RANGE_INDEX,
rangeIndex);
-
exchange.getIn().setHeader(GoogleSheetsStreamConstants.MAJOR_DIMENSION,
valueRange.getMajorDimension());
- message.setBody(valueRange);
- return exchange;
- }
-
- public Exchange createExchange(int rangeIndex, int valueIndex, String
range, String majorDimension, List<Object> values) {
- Exchange exchange = super.createExchange(getExchangePattern());
- Message message = exchange.getIn();
- exchange.getIn().setHeader(GoogleSheetsStreamConstants.SPREADSHEET_ID,
configuration.getSpreadsheetId());
- exchange.getIn().setHeader(GoogleSheetsStreamConstants.RANGE_INDEX,
rangeIndex);
- exchange.getIn().setHeader(GoogleSheetsStreamConstants.VALUE_INDEX,
valueIndex);
- exchange.getIn().setHeader(GoogleSheetsStreamConstants.RANGE, range);
-
exchange.getIn().setHeader(GoogleSheetsStreamConstants.MAJOR_DIMENSION,
majorDimension);
- message.setBody(values);
- return exchange;
- }
-
- public Exchange createExchange(Spreadsheet spreadsheet) {
- Exchange exchange = super.createExchange(getExchangePattern());
- Message message = exchange.getIn();
- exchange.getIn().setHeader(GoogleSheetsStreamConstants.SPREADSHEET_ID,
spreadsheet.getSpreadsheetId());
-
exchange.getIn().setHeader(GoogleSheetsStreamConstants.SPREADSHEET_URL,
spreadsheet.getSpreadsheetUrl());
- message.setBody(spreadsheet);
- return exchange;
- }
}
diff --git
a/components/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageConsumer.java
b/components/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageConsumer.java
index 935b7e6..b8cede8 100644
---
a/components/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageConsumer.java
+++
b/components/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageConsumer.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.google.storage;
+import java.io.ByteArrayOutputStream;
+import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
@@ -28,8 +30,11 @@ import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.CopyRequest;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Message;
import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
@@ -73,7 +78,7 @@ public class GoogleCloudStorageConsumer extends
ScheduledBatchPollingConsumer {
String fileName = getConfiguration().getObjectName();
String bucketName = getConfiguration().getBucketName();
- Queue<Exchange> exchanges = new LinkedList<>();
+ Queue<Exchange> exchanges;
if (fileName != null) {
LOG.trace("Getting object in bucket [{}] with file name [{}]...",
bucketName, fileName);
@@ -101,7 +106,7 @@ public class GoogleCloudStorageConsumer extends
ScheduledBatchPollingConsumer {
protected Queue<Exchange> createExchanges(Blob blob, String key) {
Queue<Exchange> answer = new LinkedList<>();
- Exchange exchange = getEndpoint().createExchange(blob, key);
+ Exchange exchange = createExchange(blob, key);
answer.add(exchange);
return answer;
}
@@ -115,7 +120,7 @@ public class GoogleCloudStorageConsumer extends
ScheduledBatchPollingConsumer {
try {
for (Blob blob : blobList) {
if (includeObject(blob)) {
- Exchange exchange = getEndpoint().createExchange(blob,
blob.getBlobId().getName());
+ Exchange exchange = createExchange(blob,
blob.getBlobId().getName());
answer.add(exchange);
}
}
@@ -248,4 +253,57 @@ public class GoogleCloudStorageConsumer extends
ScheduledBatchPollingConsumer {
public GoogleCloudStorageEndpoint getEndpoint() {
return (GoogleCloudStorageEndpoint) super.getEndpoint();
}
+
+ public Exchange createExchange(Blob blob, String key) {
+ return createExchange(getEndpoint().getExchangePattern(), blob, key);
+ }
+
+ public Exchange createExchange(ExchangePattern pattern, Blob blob, String
key) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Getting object with key [{}] from bucket [{}]...", key,
getConfiguration().getBucketName());
+ LOG.trace("Got object [{}]", blob);
+ }
+
+ Exchange exchange = createExchange(true);
+ exchange.setPattern(pattern);
+ Message message = exchange.getIn();
+
+ if (getConfiguration().isIncludeBody()) {
+ try {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ blob.downloadTo(baos);
+ message.setBody(baos.toByteArray());
+ } catch (Exception e) {
+ throw new RuntimeCamelException(e);
+ }
+ } else {
+ message.setBody(blob);
+ }
+
+ message.setHeader(GoogleCloudStorageConstants.OBJECT_NAME, key);
+ message.setHeader(GoogleCloudStorageConstants.BUCKET_NAME,
getConfiguration().getBucketName());
+ //OTHER METADATA
+ message.setHeader(GoogleCloudStorageConstants.CACHE_CONTROL,
blob.getCacheControl());
+
message.setHeader(GoogleCloudStorageConstants.METADATA_COMPONENT_COUNT,
blob.getComponentCount());
+ message.setHeader(GoogleCloudStorageConstants.CONTENT_DISPOSITION,
blob.getContentDisposition());
+ message.setHeader(GoogleCloudStorageConstants.CONTENT_ENCODING,
blob.getContentEncoding());
+
message.setHeader(GoogleCloudStorageConstants.METADATA_CONTENT_LANGUAGE,
blob.getContentLanguage());
+ message.setHeader(GoogleCloudStorageConstants.CONTENT_TYPE,
blob.getContentType());
+ message.setHeader(GoogleCloudStorageConstants.METADATA_CUSTOM_TIME,
blob.getCustomTime());
+ message.setHeader(GoogleCloudStorageConstants.METADATA_CRC32C_HEX,
blob.getCrc32cToHexString());
+ message.setHeader(GoogleCloudStorageConstants.METADATA_ETAG,
blob.getEtag());
+ message.setHeader(GoogleCloudStorageConstants.METADATA_GENERATION,
blob.getGeneration());
+ message.setHeader(GoogleCloudStorageConstants.METADATA_BLOB_ID,
blob.getBlobId());
+ message.setHeader(GoogleCloudStorageConstants.METADATA_KMS_KEY_NAME,
blob.getKmsKeyName());
+ message.setHeader(GoogleCloudStorageConstants.CONTENT_MD5,
blob.getMd5ToHexString());
+ message.setHeader(GoogleCloudStorageConstants.METADATA_MEDIA_LINK,
blob.getMediaLink());
+ message.setHeader(GoogleCloudStorageConstants.METADATA_METAGENERATION,
blob.getMetageneration());
+ message.setHeader(GoogleCloudStorageConstants.CONTENT_LENGTH,
blob.getSize());
+ message.setHeader(GoogleCloudStorageConstants.METADATA_STORAGE_CLASS,
blob.getStorageClass());
+ message.setHeader(GoogleCloudStorageConstants.METADATA_CREATE_TIME,
blob.getCreateTime());
+ message.setHeader(GoogleCloudStorageConstants.METADATA_LAST_UPDATE,
new Date(blob.getUpdateTime()));
+
+ return exchange;
+ }
+
}
diff --git
a/components/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageEndpoint.java
b/components/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageEndpoint.java
index f3fe59a..79da387 100644
---
a/components/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageEndpoint.java
+++
b/components/camel-google-storage/src/main/java/org/apache/camel/component/google/storage/GoogleCloudStorageEndpoint.java
@@ -16,10 +16,6 @@
*/
package org.apache.camel.component.google.storage;
-import java.io.ByteArrayOutputStream;
-import java.util.Date;
-
-import com.google.cloud.storage.Blob;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.BucketInfo.Builder;
@@ -27,12 +23,8 @@ import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageClass;
import org.apache.camel.Category;
import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
-import org.apache.camel.ExchangePattern;
-import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
-import org.apache.camel.RuntimeCamelException;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.support.ScheduledPollEndpoint;
@@ -123,55 +115,4 @@ public class GoogleCloudStorageEndpoint extends
ScheduledPollEndpoint {
return storageClient;
}
- public Exchange createExchange(Blob blob, String key) {
- return createExchange(getExchangePattern(), blob, key);
- }
-
- public Exchange createExchange(ExchangePattern pattern, Blob blob, String
key) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Getting object with key [{}] from bucket [{}]...", key,
getConfiguration().getBucketName());
- LOG.trace("Got object [{}]", blob);
- }
-
- Exchange exchange = super.createExchange(pattern);
- Message message = exchange.getIn();
-
- if (configuration.isIncludeBody()) {
- try {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- blob.downloadTo(baos);
- message.setBody(baos.toByteArray());
- } catch (Exception e) {
- throw new RuntimeCamelException(e);
- }
- } else {
- message.setBody(blob);
- }
-
- message.setHeader(GoogleCloudStorageConstants.OBJECT_NAME, key);
- message.setHeader(GoogleCloudStorageConstants.BUCKET_NAME,
getConfiguration().getBucketName());
- //OTHER METADATA
- message.setHeader(GoogleCloudStorageConstants.CACHE_CONTROL,
blob.getCacheControl());
-
message.setHeader(GoogleCloudStorageConstants.METADATA_COMPONENT_COUNT,
blob.getComponentCount());
- message.setHeader(GoogleCloudStorageConstants.CONTENT_DISPOSITION,
blob.getContentDisposition());
- message.setHeader(GoogleCloudStorageConstants.CONTENT_ENCODING,
blob.getContentEncoding());
-
message.setHeader(GoogleCloudStorageConstants.METADATA_CONTENT_LANGUAGE,
blob.getContentLanguage());
- message.setHeader(GoogleCloudStorageConstants.CONTENT_TYPE,
blob.getContentType());
- message.setHeader(GoogleCloudStorageConstants.METADATA_CUSTOM_TIME,
blob.getCustomTime());
- message.setHeader(GoogleCloudStorageConstants.METADATA_CRC32C_HEX,
blob.getCrc32cToHexString());
- message.setHeader(GoogleCloudStorageConstants.METADATA_ETAG,
blob.getEtag());
- message.setHeader(GoogleCloudStorageConstants.METADATA_GENERATION,
blob.getGeneration());
- message.setHeader(GoogleCloudStorageConstants.METADATA_BLOB_ID,
blob.getBlobId());
- message.setHeader(GoogleCloudStorageConstants.METADATA_KMS_KEY_NAME,
blob.getKmsKeyName());
- message.setHeader(GoogleCloudStorageConstants.CONTENT_MD5,
blob.getMd5ToHexString());
- message.setHeader(GoogleCloudStorageConstants.METADATA_MEDIA_LINK,
blob.getMediaLink());
- message.setHeader(GoogleCloudStorageConstants.METADATA_METAGENERATION,
blob.getMetageneration());
- message.setHeader(GoogleCloudStorageConstants.CONTENT_LENGTH,
blob.getSize());
- message.setHeader(GoogleCloudStorageConstants.METADATA_STORAGE_CLASS,
blob.getStorageClass());
- message.setHeader(GoogleCloudStorageConstants.METADATA_CREATE_TIME,
blob.getCreateTime());
- message.setHeader(GoogleCloudStorageConstants.METADATA_LAST_UPDATE,
new Date(blob.getUpdateTime()));
-
- return exchange;
- }
-
}
diff --git
a/components/camel-gora/src/main/java/org/apache/camel/component/gora/GoraConsumer.java
b/components/camel-gora/src/main/java/org/apache/camel/component/gora/GoraConsumer.java
index 8bd0fc4..876c109 100644
---
a/components/camel-gora/src/main/java/org/apache/camel/component/gora/GoraConsumer.java
+++
b/components/camel-gora/src/main/java/org/apache/camel/component/gora/GoraConsumer.java
@@ -76,7 +76,7 @@ public class GoraConsumer extends ScheduledPollConsumer {
@Override
protected int poll() throws Exception {
- final Exchange exchange = this.getEndpoint().createExchange();
+ final Exchange exchange = createExchange(true);
// compute time (approx) since last update
if (firstRun) {
@@ -88,13 +88,7 @@ public class GoraConsumer extends ScheduledPollConsumer {
//proceed with query
final Result result = query.execute();
- try {
- getProcessor().process(exchange);
- } finally {
- if (exchange.getException() != null) {
- getExceptionHandler().handleException("Error processing
exchange", exchange, exchange.getException());
- }
- }
+ getProcessor().process(exchange);
return Long.valueOf(result.getOffset()).intValue();
}
diff --git
a/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/CamelEventHandler.java
b/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/CamelEventHandler.java
index b5d8bcc..9b53f57 100644
---
a/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/CamelEventHandler.java
+++
b/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/CamelEventHandler.java
@@ -31,14 +31,16 @@ import org.slf4j.LoggerFactory;
public class CamelEventHandler {
protected final Logger log =
LoggerFactory.getLogger(CamelEventHandler.class);
- protected final GuavaEventBusEndpoint eventBusEndpoint;
+ protected final GuavaEventBusConsumer consumer;
+ protected final GuavaEventBusEndpoint endpoint;
protected final AsyncProcessor processor;
- public CamelEventHandler(GuavaEventBusEndpoint eventBusEndpoint, Processor
processor) {
- ObjectHelper.notNull(eventBusEndpoint, "eventBusEndpoint");
+ public CamelEventHandler(GuavaEventBusConsumer consumer,
GuavaEventBusEndpoint endpoint, Processor processor) {
+ ObjectHelper.notNull(endpoint, "eventBusEndpoint");
ObjectHelper.notNull(processor, "processor");
- this.eventBusEndpoint = eventBusEndpoint;
+ this.consumer = consumer;
+ this.endpoint = endpoint;
this.processor = AsyncProcessorConverterHelper.convert(processor);
}
@@ -49,7 +51,7 @@ public class CamelEventHandler {
*/
public void doEventReceived(Object event) {
log.trace("Received event: {}", event);
- final Exchange exchange = eventBusEndpoint.createExchange(event);
+ final Exchange exchange = createExchange(event);
log.debug("Processing event: {}", event);
// use async processor to support async routing engine
processor.process(exchange, new AsyncCallback() {
@@ -60,4 +62,10 @@ public class CamelEventHandler {
});
}
+ public Exchange createExchange(Object event) {
+ Exchange exchange = consumer.createExchange(true);
+ exchange.getIn().setBody(event);
+ return exchange;
+ }
+
}
diff --git
a/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/FilteringCamelEventHandler.java
b/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/FilteringCamelEventHandler.java
index 7b01d3b..c950d3f 100644
---
a/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/FilteringCamelEventHandler.java
+++
b/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/FilteringCamelEventHandler.java
@@ -27,8 +27,9 @@ public class FilteringCamelEventHandler extends
CamelEventHandler {
private final Class<?> eventClass;
- public FilteringCamelEventHandler(GuavaEventBusEndpoint eventBusEndpoint,
Processor processor, Class<?> eventClass) {
- super(eventBusEndpoint, processor);
+ public FilteringCamelEventHandler(GuavaEventBusConsumer consumer,
GuavaEventBusEndpoint endpoint, Processor processor,
+ Class<?> eventClass) {
+ super(consumer, endpoint, processor);
this.eventClass = eventClass;
}
diff --git
a/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumer.java
b/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumer.java
index 1740117..bdf9500 100644
---
a/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumer.java
+++
b/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusConsumer.java
@@ -50,7 +50,7 @@ public class GuavaEventBusConsumer extends DefaultConsumer {
if (listenerInterface != null) {
this.eventHandler = createListenerInterfaceProxy(endpoint,
processor, listenerInterface);
} else {
- this.eventHandler = new FilteringCamelEventHandler(endpoint,
processor, eventClass);
+ this.eventHandler = new FilteringCamelEventHandler(this, endpoint,
processor, eventClass);
}
}
@@ -72,7 +72,7 @@ public class GuavaEventBusConsumer extends DefaultConsumer {
GuavaEventBusEndpoint endpoint, Processor processor, Class<?>
listenerInterface) {
ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
return Proxy.newProxyInstance(classLoader, new Class[] {
listenerInterface },
- new ListenerInterfaceHandler(endpoint, processor));
+ new ListenerInterfaceHandler(this, endpoint, processor));
}
private static final class ListenerInterfaceHandler implements
InvocationHandler {
@@ -81,8 +81,8 @@ public class GuavaEventBusConsumer extends DefaultConsumer {
private final CamelEventHandler delegateHandler;
- private ListenerInterfaceHandler(GuavaEventBusEndpoint endpoint,
Processor processor) {
- this.delegateHandler = new CamelEventHandler(endpoint, processor);
+ private ListenerInterfaceHandler(GuavaEventBusConsumer consumer,
GuavaEventBusEndpoint endpoint, Processor processor) {
+ this.delegateHandler = new CamelEventHandler(consumer, endpoint,
processor);
}
@Override
diff --git
a/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusEndpoint.java
b/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusEndpoint.java
index d246eb6..83181b8 100644
---
a/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusEndpoint.java
+++
b/components/camel-guava-eventbus/src/main/java/org/apache/camel/component/guava/eventbus/GuavaEventBusEndpoint.java
@@ -20,7 +20,6 @@ import com.google.common.eventbus.EventBus;
import org.apache.camel.Category;
import org.apache.camel.Component;
import org.apache.camel.Consumer;
-import org.apache.camel.Exchange;
import org.apache.camel.MultipleConsumersSupport;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
@@ -69,12 +68,6 @@ public class GuavaEventBusEndpoint extends DefaultEndpoint
implements MultipleCo
return true;
}
- public Exchange createExchange(Object event) {
- Exchange exchange = createExchange();
- exchange.getIn().setBody(event);
- return exchange;
- }
-
public String getEventBusRef() {
return eventBusRef;
}