Repository: camel
Updated Branches:
  refs/heads/master 47fa4edd4 -> f507f4eed


CAMEL-10849: Salesforce: subscription channel ...

...created per component

This changes the way configuration, specifically `initialReplayIdMap`
and `defaultReplayId` are configured on the CometD client.

The Bayeux CometD client is left at the Component level, this is to
limit the number of streaming clients connected to Salesforce as there
are limits on those.

The SubscriptionHelper was previously tied to the topic it subscribed to
on creation, removes that constraint. This allows per-endpoint
customization of the client (subscriptions) on subscription from
consumers.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f507f4ee
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f507f4ee
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f507f4ee

Branch: refs/heads/master
Commit: f507f4eed27a54dbfb30f019e7bbd3f8b4483920
Parents: 47fa4ed
Author: Zoran Regvart <zregv...@apache.org>
Authored: Wed Feb 22 13:03:51 2017 +0100
Committer: Zoran Regvart <zregv...@apache.org>
Committed: Wed Feb 22 13:12:38 2017 +0100

----------------------------------------------------------------------
 .../salesforce/SalesforceComponent.java         |  4 +-
 .../salesforce/SalesforceEndpoint.java          |  5 +-
 .../salesforce/SalesforceEndpointConfig.java    |  3 +-
 .../streaming/CometDReplayExtension.java        | 15 ++-
 .../internal/streaming/SubscriptionHelper.java  | 85 ++++++++++-------
 .../streaming/SubscriptionHelperTest.java       | 98 ++++++++------------
 6 files changed, 108 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/f507f4ee/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
index 2a5effd..e95668d 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceComponent.java
@@ -361,10 +361,10 @@ public class SalesforceComponent extends 
UriEndpointComponent implements Endpoin
         }
     }
 
-    public SubscriptionHelper getSubscriptionHelper(String topicName) throws 
Exception {
+    public SubscriptionHelper getSubscriptionHelper() throws Exception {
         if (subscriptionHelper == null) {
             // lazily create subscription helper
-            subscriptionHelper = new SubscriptionHelper(this, topicName);
+            subscriptionHelper = new SubscriptionHelper(this);
 
             // also start the helper to connect to Salesforce
             ServiceHelper.startService(subscriptionHelper);

http://git-wip-us.apache.org/repos/asf/camel/blob/f507f4ee/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java
index cdcfe29..4b541c5 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpoint.java
@@ -20,6 +20,7 @@ import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.component.salesforce.internal.OperationName;
+import 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.impl.SynchronousDelegateProducer;
 import org.apache.camel.spi.UriEndpoint;
@@ -74,8 +75,8 @@ public class SalesforceEndpoint extends DefaultEndpoint {
                     operationName.value()));
         }
 
-        final SalesforceConsumer consumer = new SalesforceConsumer(this, 
processor,
-            getComponent().getSubscriptionHelper(topicName));
+        final SubscriptionHelper subscriptionHelper = 
getComponent().getSubscriptionHelper();
+        final SalesforceConsumer consumer = new SalesforceConsumer(this, 
processor, subscriptionHelper);
         configureConsumer(consumer);
         return consumer;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/f507f4ee/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
----------------------------------------------------------------------
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
index 5e9c255..d8b37a2 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/SalesforceEndpointConfig.java
@@ -20,6 +20,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 
@@ -628,7 +629,7 @@ public class SalesforceEndpointConfig implements Cloneable {
     }
 
     public Map<String, Integer> getInitialReplayIdMap() {
-        return initialReplayIdMap;
+        return 
Optional.ofNullable(initialReplayIdMap).orElse(Collections.emptyMap());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/f507f4ee/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/CometDReplayExtension.java
----------------------------------------------------------------------
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/CometDReplayExtension.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/CometDReplayExtension.java
index 1f9f78a..e061dc4 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/CometDReplayExtension.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/CometDReplayExtension.java
@@ -61,22 +61,21 @@ import 
org.cometd.bayeux.client.ClientSession.Extension.Adapter;
  * @author yzhao
  * @since 198 (Winter '16)
  */
-public class CometDReplayExtension<V> extends Adapter {
+public class CometDReplayExtension extends Adapter {
     private static final String EXTENSION_NAME = "replay";
-    private final ConcurrentMap<String, V> dataMap = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, Integer> dataMap = new 
ConcurrentHashMap<>();
     private final AtomicBoolean supported = new AtomicBoolean();
 
-    public CometDReplayExtension(Map<String, V> dataMap) {
-        this.dataMap.putAll(dataMap);
+    public void addTopicReplayId(final String topicName, final int replayId) {
+        dataMap.put(topicName, replayId);
     }
 
     @Override
-    @SuppressWarnings("unchecked")
     public boolean rcv(ClientSession session, Message.Mutable message) {
-        Object data = message.get(EXTENSION_NAME);
-        if (this.supported.get() && data != null) {
+        Integer replayId = (Integer)message.get(EXTENSION_NAME);
+        if (this.supported.get() && replayId != null) {
             try {
-                dataMap.put(message.getChannel(), (V) data);
+                dataMap.put(message.getChannel(), replayId);
             } catch (ClassCastException e) {
                 return false;
             }

http://git-wip-us.apache.org/repos/asf/camel/blob/f507f4ee/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
----------------------------------------------------------------------
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
index a467fdb..d27f440 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java
@@ -17,25 +17,29 @@
 package org.apache.camel.component.salesforce.internal.streaming;
 
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Stream;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 
 import org.apache.camel.CamelException;
+import org.apache.camel.EndpointConfiguration;
 import org.apache.camel.component.salesforce.SalesforceComponent;
 import org.apache.camel.component.salesforce.SalesforceConsumer;
+import org.apache.camel.component.salesforce.SalesforceEndpoint;
+import org.apache.camel.component.salesforce.SalesforceEndpointConfig;
 import org.apache.camel.component.salesforce.SalesforceHttpClient;
 import org.apache.camel.component.salesforce.api.SalesforceException;
 import org.apache.camel.component.salesforce.internal.SalesforceSession;
 import org.apache.camel.support.ServiceSupport;
 import org.cometd.bayeux.Message;
-import org.cometd.bayeux.client.ClientSession;
 import org.cometd.bayeux.client.ClientSession.Extension;
 import org.cometd.bayeux.client.ClientSessionChannel;
 import org.cometd.client.BayeuxClient;
@@ -56,6 +60,8 @@ import static org.cometd.bayeux.Message.SUBSCRIPTION_FIELD;
 
 public class SubscriptionHelper extends ServiceSupport {
 
+    static final CometDReplayExtension REPLAY_EXTENSION = new 
CometDReplayExtension();
+
     private static final Logger LOG = 
LoggerFactory.getLogger(SubscriptionHelper.class);
 
     private static final int CONNECT_TIMEOUT = 110;
@@ -65,7 +71,7 @@ public class SubscriptionHelper extends ServiceSupport {
     private static final String EXCEPTION_FIELD = "exception";
     private static final int DISCONNECT_INTERVAL = 5000;
 
-    final BayeuxClient client;
+    BayeuxClient client;
 
     private final SalesforceComponent component;
     private final SalesforceSession session;
@@ -87,15 +93,12 @@ public class SubscriptionHelper extends ServiceSupport {
     private volatile boolean reconnecting;
     private final AtomicLong restartBackoff;
 
-    public SubscriptionHelper(SalesforceComponent component, String topicName) 
throws Exception {
+    public SubscriptionHelper(final SalesforceComponent component) throws 
SalesforceException {
         this.component = component;
         this.session = component.getSession();
 
         this.listenerMap = new ConcurrentHashMap<SalesforceConsumer, 
ClientSessionChannel.MessageListener>();
 
-        // create CometD client
-        this.client = createClient(component, topicName);
-
         restartBackoff = new AtomicLong(0);
         backoffIncrement = component.getConfig().getBackoffIncrement();
         maxBackoff = component.getConfig().getMaxBackoff();
@@ -104,6 +107,9 @@ public class SubscriptionHelper extends ServiceSupport {
     @Override
     protected void doStart() throws Exception {
 
+        // create CometD client
+        this.client = createClient(component);
+
         // reset all error conditions
         handshakeError = null;
         handshakeException = null;
@@ -315,9 +321,11 @@ public class SubscriptionHelper extends ServiceSupport {
         if (!disconnected) {
             LOG.warn("Could not disconnect client connected to: {} after: {} 
msec.", getEndpointUrl(component), timeout);
         }
+
+        client = null;
     }
 
-    static BayeuxClient createClient(final SalesforceComponent component, 
final String topicName) throws Exception {
+    static BayeuxClient createClient(final SalesforceComponent component) 
throws SalesforceException {
         // use default Jetty client from SalesforceComponent, its shared by 
all consumers
         final SalesforceHttpClient httpClient = 
component.getConfig().getHttpClient();
 
@@ -343,29 +351,10 @@ public class SubscriptionHelper extends ServiceSupport {
         };
 
         BayeuxClient client = new BayeuxClient(getEndpointUrl(component), 
transport);
-        Integer replayId = null;
-        String channelName = getChannelName(topicName);
-        Map<String, Integer> replayIdMap = 
component.getConfig().getInitialReplayIdMap();
-        if (replayIdMap != null) {
-            replayId = replayIdMap.getOrDefault(topicName, 
replayIdMap.get(channelName));
-        }
-        if (replayId == null) {
-            replayId = component.getConfig().getDefaultReplayId();
-        }
-        if (replayId != null) {
-            LOG.info("Sending replayId={} for channel {}", replayId, 
channelName);
-            List<Extension> extensions = client.getExtensions();
-            Extension ext = null;
-            for (Iterator<Extension> iter = extensions.iterator(); 
iter.hasNext(); ext = iter.next()) {
-                if (ext instanceof CometDReplayExtension) {
-                    iter.remove();
-                }
-            }
-            Map<String, Integer> dataMap = new HashMap<>();
-            dataMap.put(channelName, replayId);
-            ClientSession.Extension extension = new 
CometDReplayExtension<>(dataMap);
-            client.addExtension(extension);
-        }
+
+        // added eagerly to check for support during handshake
+        client.addExtension(REPLAY_EXTENSION);
+
         return client;
     }
 
@@ -373,6 +362,8 @@ public class SubscriptionHelper extends ServiceSupport {
         // create subscription for consumer
         final String channelName = getChannelName(topicName);
 
+        setupReplay((SalesforceEndpoint) consumer.getEndpoint());
+
         // channel message listener
         LOG.info("Subscribing to channel {}...", channelName);
         final ClientSessionChannel.MessageListener listener = new 
ClientSessionChannel.MessageListener() {
@@ -421,6 +412,38 @@ public class SubscriptionHelper extends ServiceSupport {
         clientChannel.subscribe(listener);
     }
 
+    void setupReplay(final SalesforceEndpoint endpoint) {
+        final String topicName = endpoint.getTopicName();
+
+        final Optional<Integer> replayId = determineReplayIdFor(endpoint, 
topicName);
+        if (replayId.isPresent()) {
+            final String channelName = getChannelName(topicName);
+
+            REPLAY_EXTENSION.addTopicReplayId(channelName, replayId.get());
+        }
+    }
+
+    static Optional<Integer> determineReplayIdFor(final SalesforceEndpoint 
endpoint, final String topicName) {
+        final String channelName = getChannelName(topicName);
+
+        final SalesforceComponent component = endpoint.getComponent();
+
+        final SalesforceEndpointConfig endpointConfiguration = 
endpoint.getConfiguration();
+        final Map<String, Integer> endpointInitialReplayIdMap = 
endpointConfiguration.getInitialReplayIdMap();
+        final Integer endpointReplayId = 
endpointInitialReplayIdMap.getOrDefault(topicName, 
endpointInitialReplayIdMap.get(channelName));
+        final Integer endpointDefaultReplayId = 
endpointConfiguration.getDefaultReplayId();
+
+        final SalesforceEndpointConfig componentConfiguration = 
component.getConfig();
+        final Map<String, Integer> componentInitialReplayIdMap = 
componentConfiguration.getInitialReplayIdMap();
+        final Integer componentReplayId = 
componentInitialReplayIdMap.getOrDefault(topicName, 
componentInitialReplayIdMap.get(channelName));
+        final Integer componentDefaultReplayId = 
componentConfiguration.getDefaultReplayId();
+
+        // the endpoint values have priority over component values, and the 
default values posteriority
+        // over give topic values
+        return Stream.of(endpointReplayId, componentReplayId, 
endpointDefaultReplayId, componentDefaultReplayId)
+            .filter(Objects::nonNull).findFirst();
+    }
+
     static String getChannelName(String topicName) {
         return "/topic/" + topicName;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/f507f4ee/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
index b5de54a..860e255 100644
--- 
a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
+++ 
b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelperTest.java
@@ -18,51 +18,21 @@ package 
org.apache.camel.component.salesforce.internal.streaming;
 
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
 import org.apache.camel.component.salesforce.SalesforceComponent;
+import org.apache.camel.component.salesforce.SalesforceEndpoint;
 import org.apache.camel.component.salesforce.SalesforceEndpointConfig;
-import org.apache.camel.component.salesforce.SalesforceHttpClient;
-import org.apache.camel.component.salesforce.internal.SalesforceSession;
-import org.cometd.bayeux.Channel;
-import org.cometd.bayeux.Message;
-import org.cometd.bayeux.client.ClientSession;
-import org.cometd.bayeux.client.ClientSession.Extension;
-import org.cometd.client.BayeuxClient;
-import org.cometd.common.HashMapMessage;
-import org.junit.Before;
 import org.junit.Test;
 
+import static 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.determineReplayIdFor;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class SubscriptionHelperTest {
 
-    static final ClientSession NOT_USED = null;
-
-    final SalesforceComponent component = mock(SalesforceComponent.class);
-
-    final SalesforceSession session = mock(SalesforceSession.class);
-
-    final SalesforceEndpointConfig config = 
mock(SalesforceEndpointConfig.class);
-
-    @Before
-    public void setupMocks() {
-        when(component.getSession()).thenReturn(session);
-
-        when(session.getInstanceUrl()).thenReturn("https://some.url";);
-
-        when(component.getConfig()).thenReturn(config);
-        
when(component.getConfig().getApiVersion()).thenReturn(SalesforceEndpointConfig.DEFAULT_VERSION);
-
-        
when(config.getHttpClient()).thenReturn(mock(SalesforceHttpClient.class));
-
-    }
-
     @Test
     public void shouldSupportInitialConfigMapWithTwoKeySyntaxes() throws 
Exception {
         final Map<String, Integer> initialReplayIdMap = new HashMap<>();
@@ -70,45 +40,57 @@ public class SubscriptionHelperTest {
         initialReplayIdMap.put("/topic/my-topic-1", 20);
         initialReplayIdMap.put("/topic/my-topic-2", 30);
 
-        when(config.getDefaultReplayId()).thenReturn(14);
+        final SalesforceEndpointConfig config = new SalesforceEndpointConfig();
+        config.setDefaultReplayId(14);
+        config.setInitialReplayIdMap(initialReplayIdMap);
 
-        when(config.getInitialReplayIdMap()).thenReturn(initialReplayIdMap);
+        final SalesforceComponent component = mock(SalesforceComponent.class);
+        final SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class);
 
-        assertEquals("Expecting replayId for `my-topic-1` to be 10, as short 
topic names have priority", (Object) 10,
-                
fetchReplayExtensionValue("my-topic-1").get("/topic/my-topic-1"));
+        when(endpoint.getComponent()).thenReturn(component);
+        when(endpoint.getConfiguration()).thenReturn(config);
+        when(component.getConfig()).thenReturn(new SalesforceEndpointConfig());
 
-        assertEquals("Expecting replayId for `my-topic-2` to be 30, the only 
one given", (Object) 30,
-                
fetchReplayExtensionValue("my-topic-2").get("/topic/my-topic-2"));
+        assertEquals("Expecting replayId for `my-topic-1` to be 10, as short 
topic names have priority",
+                     Optional.of(10), determineReplayIdFor(endpoint, 
"my-topic-1"));
 
-        assertEquals("Expecting replayId for `my-topic-3` to be 14, the 
default", (Object) 14,
-                
fetchReplayExtensionValue("my-topic-3").get("/topic/my-topic-3"));
-    }
+        assertEquals("Expecting replayId for `my-topic-2` to be 30, the only 
one given", Optional.of(30),
+                     determineReplayIdFor(endpoint, "my-topic-2"));
 
-    Map<String, Integer> fetchReplayExtensionValue(final String topicName) 
throws Exception {
-        final BayeuxClient client = SubscriptionHelper.createClient(component, 
topicName);
+        assertEquals("Expecting replayId for `my-topic-3` to be 14, the 
default", Optional.of(14),
+                     determineReplayIdFor(endpoint, "my-topic-3"));
+    }
 
-        final List<Extension> extensions = client.getExtensions();
+    @Test
+    public void precedenceShouldBeFollowed() {
+        final SalesforceEndpointConfig componentConfig = new 
SalesforceEndpointConfig();
+        componentConfig.setDefaultReplayId(1);
+        
componentConfig.setInitialReplayIdMap(Collections.singletonMap("my-topic-1", 
2));
+        
componentConfig.setInitialReplayIdMap(Collections.singletonMap("my-topic-2", 
3));
 
-        final Optional<Extension> extension = extensions.stream().filter(e -> 
e instanceof CometDReplayExtension)
-                .findFirst();
+        final SalesforceEndpointConfig endpointConfig = new 
SalesforceEndpointConfig();
+        endpointConfig.setDefaultReplayId(4);
+        
endpointConfig.setInitialReplayIdMap(Collections.singletonMap("my-topic-1", 5));
 
-        assertTrue("Client should be configured with CometDReplayExtension 
extension", extension.isPresent());
+        final SalesforceComponent component = mock(SalesforceComponent.class);
+        when(component.getConfig()).thenReturn(componentConfig);
 
-        final CometDReplayExtension cometDReplayExtension = 
(CometDReplayExtension) extension.get();
+        final SalesforceEndpoint endpoint = mock(SalesforceEndpoint.class);
+        when(endpoint.getComponent()).thenReturn(component);
+        when(endpoint.getConfiguration()).thenReturn(endpointConfig);
 
-        final Message.Mutable handshake = new HashMapMessage();
-        handshake.setChannel(Channel.META_HANDSHAKE);
-        handshake.put(Message.EXT_FIELD, Collections.singletonMap("replay", 
true));
+        assertEquals("Expecting replayId for `my-topic-1` to be 5, as endpoint 
configuration has priority",
+                     Optional.of(5), determineReplayIdFor(endpoint, 
"my-topic-1"));
 
-        cometDReplayExtension.rcvMeta(NOT_USED, handshake);
+        assertEquals("Expecting replayId for `my-topic-2` to be 3, as endpoint 
does not configure it",
+                     Optional.of(3), determineReplayIdFor(endpoint, 
"my-topic-2"));
 
-        final Message.Mutable subscription = new HashMapMessage();
-        subscription.setChannel(Channel.META_SUBSCRIBE);
-        cometDReplayExtension.sendMeta(NOT_USED, subscription);
+        assertEquals("Expecting replayId for `my-topic-3` to be 4, as it is 
endpoint's default",
+                     Optional.of(4), determineReplayIdFor(endpoint, 
"my-topic-3"));
 
-        @SuppressWarnings("unchecked")
-        final Map<String, Integer> replays = (Map<String, Integer>) 
subscription.getExt().get("replay");
+        endpointConfig.setDefaultReplayId(null);
 
-        return replays;
+        assertEquals("Expecting replayId for `my-topic-3` to be 1, as it is 
component's default when endpoint does not have a default",
+                     Optional.of(1), determineReplayIdFor(endpoint, 
"my-topic-3"));
     }
 }

Reply via email to