Repository: camel Updated Branches: refs/heads/master 47fa4edd4 -> f507f4eed
CAMEL-10849: Salesforce: subscription channel ... ...created per component This changes the way configuration, specifically `initialReplayIdMap` and `defaultReplayId` are configured on the CometD client. The Bayeux CometD client is left at the Component level, this is to limit the number of streaming clients connected to Salesforce as there are limits on those. The SubscriptionHelper was previously tied to the topic it subscribed to on creation, removes that constraint. This allows per-endpoint customization of the client (subscriptions) on subscription from consumers. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f507f4ee Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f507f4ee Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f507f4ee Branch: refs/heads/master Commit: f507f4eed27a54dbfb30f019e7bbd3f8b4483920 Parents: 47fa4ed Author: Zoran Regvart <zregv...@apache.org> Authored: Wed Feb 22 13:03:51 2017 +0100 Committer: Zoran Regvart <zregv...@apache.org> Committed: Wed Feb 22 13:12:38 2017 +0100 ---------------------------------------------------------------------- .../salesforce/SalesforceComponent.java | 4 +- .../salesforce/SalesforceEndpoint.java | 5 +- .../salesforce/SalesforceEndpointConfig.java | 3 +- .../streaming/CometDReplayExtension.java | 15 ++- .../internal/streaming/SubscriptionHelper.java | 85 ++++++++++------- .../streaming/SubscriptionHelperTest.java | 98 ++++++++------------ 6 files changed, 108 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/f507f4ee/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java index 2a5effd..e95668d 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java @@ -361,10 +361,10 @@ public class SalesforceComponent extends UriEndpointComponent implements Endpoin } } - public SubscriptionHelper getSubscriptionHelper(String topicName) throws Exception { + public SubscriptionHelper getSubscriptionHelper() throws Exception { if (subscriptionHelper == null) { // lazily create subscription helper - subscriptionHelper = new SubscriptionHelper(this, topicName); + subscriptionHelper = new SubscriptionHelper(this); // also start the helper to connect to Salesforce ServiceHelper.startService(subscriptionHelper); http://git-wip-us.apache.org/repos/asf/camel/blob/f507f4ee/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java index cdcfe29..4b541c5 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java @@ -20,6 +20,7 @@ import org.apache.camel.Consumer; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.component.salesforce.internal.OperationName; +import org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.impl.SynchronousDelegateProducer; import org.apache.camel.spi.UriEndpoint; @@ -74,8 +75,8 @@ public class SalesforceEndpoint extends DefaultEndpoint { operationName.value())); } - final SalesforceConsumer consumer = new SalesforceConsumer(this, processor, - getComponent().getSubscriptionHelper(topicName)); + final SubscriptionHelper subscriptionHelper = getComponent().getSubscriptionHelper(); + final SalesforceConsumer consumer = new SalesforceConsumer(this, processor, subscriptionHelper); configureConsumer(consumer); return consumer; } http://git-wip-us.apache.org/repos/asf/camel/blob/f507f4ee/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java index 5e9c255..d8b37a2 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java @@ -20,6 +20,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import com.fasterxml.jackson.databind.ObjectMapper; @@ -628,7 +629,7 @@ public class SalesforceEndpointConfig implements Cloneable { } public Map<String, Integer> getInitialReplayIdMap() { - return initialReplayIdMap; + return Optional.ofNullable(initialReplayIdMap).orElse(Collections.emptyMap()); } /** http://git-wip-us.apache.org/repos/asf/camel/blob/f507f4ee/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/CometDReplayExtension.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/CometDReplayExtension.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/CometDReplayExtension.java index 1f9f78a..e061dc4 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/CometDReplayExtension.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/CometDReplayExtension.java @@ -61,22 +61,21 @@ import org.cometd.bayeux.client.ClientSession.Extension.Adapter; * @author yzhao * @since 198 (Winter '16) */ -public class CometDReplayExtension<V> extends Adapter { +public class CometDReplayExtension extends Adapter { private static final String EXTENSION_NAME = "replay"; - private final ConcurrentMap<String, V> dataMap = new ConcurrentHashMap<>(); + private final ConcurrentMap<String, Integer> dataMap = new ConcurrentHashMap<>(); private final AtomicBoolean supported = new AtomicBoolean(); - public CometDReplayExtension(Map<String, V> dataMap) { - this.dataMap.putAll(dataMap); + public void addTopicReplayId(final String topicName, final int replayId) { + dataMap.put(topicName, replayId); } @Override - @SuppressWarnings("unchecked") public boolean rcv(ClientSession session, Message.Mutable message) { - Object data = message.get(EXTENSION_NAME); - if (this.supported.get() && data != null) { + Integer replayId = (Integer)message.get(EXTENSION_NAME); + if (this.supported.get() && replayId != null) { try { - dataMap.put(message.getChannel(), (V) data); + dataMap.put(message.getChannel(), replayId); } catch (ClassCastException e) { return false; } http://git-wip-us.apache.org/repos/asf/camel/blob/f507f4ee/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java index a467fdb..d27f440 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java @@ -17,25 +17,29 @@ package org.apache.camel.component.salesforce.internal.streaming; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import org.apache.camel.CamelException; +import org.apache.camel.EndpointConfiguration; import org.apache.camel.component.salesforce.SalesforceComponent; import org.apache.camel.component.salesforce.SalesforceConsumer; +import org.apache.camel.component.salesforce.SalesforceEndpoint; +import org.apache.camel.component.salesforce.SalesforceEndpointConfig; import org.apache.camel.component.salesforce.SalesforceHttpClient; import org.apache.camel.component.salesforce.api.SalesforceException; import org.apache.camel.component.salesforce.internal.SalesforceSession; import org.apache.camel.support.ServiceSupport; import org.cometd.bayeux.Message; -import org.cometd.bayeux.client.ClientSession; import org.cometd.bayeux.client.ClientSession.Extension; import org.cometd.bayeux.client.ClientSessionChannel; import org.cometd.client.BayeuxClient; @@ -56,6 +60,8 @@ import static org.cometd.bayeux.Message.SUBSCRIPTION_FIELD; public class SubscriptionHelper extends ServiceSupport { + static final CometDReplayExtension REPLAY_EXTENSION = new CometDReplayExtension(); + private static final Logger LOG = LoggerFactory.getLogger(SubscriptionHelper.class); private static final int CONNECT_TIMEOUT = 110; @@ -65,7 +71,7 @@ public class SubscriptionHelper extends ServiceSupport { private static final String EXCEPTION_FIELD = "exception"; private static final int DISCONNECT_INTERVAL = 5000; - final BayeuxClient client; + BayeuxClient client; private final SalesforceComponent component; private final SalesforceSession session; @@ -87,15 +93,12 @@ public class SubscriptionHelper extends ServiceSupport { private volatile boolean reconnecting; private final AtomicLong restartBackoff; - public SubscriptionHelper(SalesforceComponent component, String topicName) throws Exception { + public SubscriptionHelper(final SalesforceComponent component) throws SalesforceException { this.component = component; this.session = component.getSession(); this.listenerMap = new ConcurrentHashMap<SalesforceConsumer, ClientSessionChannel.MessageListener>(); - // create CometD client - this.client = createClient(component, topicName); - restartBackoff = new AtomicLong(0); backoffIncrement = component.getConfig().getBackoffIncrement(); maxBackoff = component.getConfig().getMaxBackoff(); @@ -104,6 +107,9 @@ public class SubscriptionHelper extends ServiceSupport { @Override protected void doStart() throws Exception { + // create CometD client + this.client = createClient(component); + // reset all error conditions handshakeError = null; handshakeException = null; @@ -315,9 +321,11 @@ public class SubscriptionHelper extends ServiceSupport { if (!disconnected) { LOG.warn("Could not disconnect client connected to: {} after: {} msec.", getEndpointUrl(component), timeout); } + + client = null; } - static BayeuxClient createClient(final SalesforceComponent component, final String topicName) throws Exception { + static BayeuxClient createClient(final SalesforceComponent component) throws SalesforceException { // use default Jetty client from SalesforceComponent, its shared by all consumers final SalesforceHttpClient httpClient = component.getConfig().getHttpClient(); @@ -343,29 +351,10 @@ public class SubscriptionHelper extends ServiceSupport { }; BayeuxClient client = new BayeuxClient(getEndpointUrl(component), transport); - Integer replayId = null; - String channelName = getChannelName(topicName); - Map<String, Integer> replayIdMap = component.getConfig().getInitialReplayIdMap(); - if (replayIdMap != null) { - replayId = replayIdMap.getOrDefault(topicName, replayIdMap.get(channelName)); - } - if (replayId == null) { - replayId = component.getConfig().getDefaultReplayId(); - } - if (replayId != null) { - LOG.info("Sending replayId={} for channel {}", replayId, channelName); - List<Extension> extensions = client.getExtensions(); - Extension ext = null; - for (Iterator<Extension> iter = extensions.iterator(); iter.hasNext(); ext = iter.next()) { - if (ext instanceof CometDReplayExtension) { - iter.remove(); - } - } - Map<String, Integer> dataMap = new HashMap<>(); - dataMap.put(channelName, replayId); - ClientSession.Extension extension = new CometDReplayExtension<>(dataMap); - client.addExtension(extension); - } + + // added eagerly to check for support during handshake + client.addExtension(REPLAY_EXTENSION); + return client; } @@ -373,6 +362,8 @@ public class SubscriptionHelper extends ServiceSupport { // create subscription for consumer final String channelName = getChannelName(topicName); + setupReplay((SalesforceEndpoint) consumer.getEndpoint()); + // channel message listener LOG.info("Subscribing to channel {}...", channelName); final ClientSessionChannel.MessageListener listener = new ClientSessionChannel.MessageListener() { @@ -421,6 +412,38 @@ public class SubscriptionHelper extends ServiceSupport { clientChannel.subscribe(listener); } + void setupReplay(final SalesforceEndpoint endpoint) { + final String topicName = endpoint.getTopicName(); + + final Optional<Integer> replayId = determineReplayIdFor(endpoint, topicName); + if (replayId.isPresent()) { + final String channelName = getChannelName(topicName); + + REPLAY_EXTENSION.addTopicReplayId(channelName, replayId.get()); + } + } + + static Optional<Integer> determineReplayIdFor(final SalesforceEndpoint endpoint, final String topicName) { + final String channelName = getChannelName(topicName); + + final SalesforceComponent component = endpoint.getComponent(); + + final SalesforceEndpointConfig endpointConfiguration = endpoint.getConfiguration(); + final Map<String, Integer> endpointInitialReplayIdMap = endpointConfiguration.getInitialReplayIdMap(); + final Integer endpointReplayId = endpointInitialReplayIdMap.getOrDefault(topicName, endpointInitialReplayIdMap.get(channelName)); + final Integer endpointDefaultReplayId = endpointConfiguration.getDefaultReplayId(); + + final SalesforceEndpointConfig componentConfiguration = component.getConfig(); + final Map<String, Integer> componentInitialReplayIdMap = componentConfiguration.getInitialReplayIdMap(); + final Integer componentReplayId = componentInitialReplayIdMap.getOrDefault(topicName, componentInitialReplayIdMap.get(channelName)); + final Integer componentDefaultReplayId = componentConfiguration.getDefaultReplayId(); + + // the endpoint values have priority over component values, and the default values posteriority + // over give topic values + return Stream.of(endpointReplayId, componentReplayId, endpointDefaultReplayId, componentDefaultReplayId) + .filter(Objects::nonNull).findFirst(); + } + static String getChannelName(String topicName) { return "/topic/" + topicName; } http://git-wip-us.apache.org/repos/asf/camel/blob/f507f4ee/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java ---------------------------------------------------------------------- diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java index b5de54a..860e255 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java +++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java @@ -18,51 +18,21 @@ package org.apache.camel.component.salesforce.internal.streaming; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Optional; import org.apache.camel.component.salesforce.SalesforceComponent; +import org.apache.camel.component.salesforce.SalesforceEndpoint; import org.apache.camel.component.salesforce.SalesforceEndpointConfig; -import org.apache.camel.component.salesforce.SalesforceHttpClient; -import org.apache.camel.component.salesforce.internal.SalesforceSession; -import org.cometd.bayeux.Channel; -import org.cometd.bayeux.Message; -import org.cometd.bayeux.client.ClientSession; -import org.cometd.bayeux.client.ClientSession.Extension; -import org.cometd.client.BayeuxClient; -import org.cometd.common.HashMapMessage; -import org.junit.Before; import org.junit.Test; +import static org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.determineReplayIdFor; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class SubscriptionHelperTest { - static final ClientSession NOT_USED = null; - - final SalesforceComponent component = mock(SalesforceComponent.class); - - final SalesforceSession session = mock(SalesforceSession.class); - - final SalesforceEndpointConfig config = mock(SalesforceEndpointConfig.class); - - @Before - public void setupMocks() { - when(component.getSession()).thenReturn(session); - - when(session.getInstanceUrl()).thenReturn("https://some.url"); - - when(component.getConfig()).thenReturn(config); - when(component.getConfig().getApiVersion()).thenReturn(SalesforceEndpointConfig.DEFAULT_VERSION); - - when(config.getHttpClient()).thenReturn(mock(SalesforceHttpClient.class)); - - } - @Test public void shouldSupportInitialConfigMapWithTwoKeySyntaxes() throws Exception { final Map<String, Integer> initialReplayIdMap = new HashMap<>(); @@ -70,45 +40,57 @@ public class SubscriptionHelperTest { initialReplayIdMap.put("/topic/my-topic-1", 20); initialReplayIdMap.put("/topic/my-topic-2", 30); - when(config.getDefaultReplayId()).thenReturn(14); + final SalesforceEndpointConfig config = new SalesforceEndpointConfig(); + config.setDefaultReplayId(14); + config.setInitialReplayIdMap(initialReplayIdMap); - when(config.getInitialReplayIdMap()).thenReturn(initialReplayIdMap); + final SalesforceComponent component = mock(SalesforceComponent.class); + final SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class); - assertEquals("Expecting replayId for `my-topic-1` to be 10, as short topic names have priority", (Object) 10, - fetchReplayExtensionValue("my-topic-1").get("/topic/my-topic-1")); + when(endpoint.getComponent()).thenReturn(component); + when(endpoint.getConfiguration()).thenReturn(config); + when(component.getConfig()).thenReturn(new SalesforceEndpointConfig()); - assertEquals("Expecting replayId for `my-topic-2` to be 30, the only one given", (Object) 30, - fetchReplayExtensionValue("my-topic-2").get("/topic/my-topic-2")); + assertEquals("Expecting replayId for `my-topic-1` to be 10, as short topic names have priority", + Optional.of(10), determineReplayIdFor(endpoint, "my-topic-1")); - assertEquals("Expecting replayId for `my-topic-3` to be 14, the default", (Object) 14, - fetchReplayExtensionValue("my-topic-3").get("/topic/my-topic-3")); - } + assertEquals("Expecting replayId for `my-topic-2` to be 30, the only one given", Optional.of(30), + determineReplayIdFor(endpoint, "my-topic-2")); - Map<String, Integer> fetchReplayExtensionValue(final String topicName) throws Exception { - final BayeuxClient client = SubscriptionHelper.createClient(component, topicName); + assertEquals("Expecting replayId for `my-topic-3` to be 14, the default", Optional.of(14), + determineReplayIdFor(endpoint, "my-topic-3")); + } - final List<Extension> extensions = client.getExtensions(); + @Test + public void precedenceShouldBeFollowed() { + final SalesforceEndpointConfig componentConfig = new SalesforceEndpointConfig(); + componentConfig.setDefaultReplayId(1); + componentConfig.setInitialReplayIdMap(Collections.singletonMap("my-topic-1", 2)); + componentConfig.setInitialReplayIdMap(Collections.singletonMap("my-topic-2", 3)); - final Optional<Extension> extension = extensions.stream().filter(e -> e instanceof CometDReplayExtension) - .findFirst(); + final SalesforceEndpointConfig endpointConfig = new SalesforceEndpointConfig(); + endpointConfig.setDefaultReplayId(4); + endpointConfig.setInitialReplayIdMap(Collections.singletonMap("my-topic-1", 5)); - assertTrue("Client should be configured with CometDReplayExtension extension", extension.isPresent()); + final SalesforceComponent component = mock(SalesforceComponent.class); + when(component.getConfig()).thenReturn(componentConfig); - final CometDReplayExtension cometDReplayExtension = (CometDReplayExtension) extension.get(); + final SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class); + when(endpoint.getComponent()).thenReturn(component); + when(endpoint.getConfiguration()).thenReturn(endpointConfig); - final Message.Mutable handshake = new HashMapMessage(); - handshake.setChannel(Channel.META_HANDSHAKE); - handshake.put(Message.EXT_FIELD, Collections.singletonMap("replay", true)); + assertEquals("Expecting replayId for `my-topic-1` to be 5, as endpoint configuration has priority", + Optional.of(5), determineReplayIdFor(endpoint, "my-topic-1")); - cometDReplayExtension.rcvMeta(NOT_USED, handshake); + assertEquals("Expecting replayId for `my-topic-2` to be 3, as endpoint does not configure it", + Optional.of(3), determineReplayIdFor(endpoint, "my-topic-2")); - final Message.Mutable subscription = new HashMapMessage(); - subscription.setChannel(Channel.META_SUBSCRIBE); - cometDReplayExtension.sendMeta(NOT_USED, subscription); + assertEquals("Expecting replayId for `my-topic-3` to be 4, as it is endpoint's default", + Optional.of(4), determineReplayIdFor(endpoint, "my-topic-3")); - @SuppressWarnings("unchecked") - final Map<String, Integer> replays = (Map<String, Integer>) subscription.getExt().get("replay"); + endpointConfig.setDefaultReplayId(null); - return replays; + assertEquals("Expecting replayId for `my-topic-3` to be 1, as it is component's default when endpoint does not have a default", + Optional.of(1), determineReplayIdFor(endpoint, "my-topic-3")); } }