This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch camel-2.25.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.25.x by this push: new e692131 CAMEL-15233: Fix Replay Extension to correctly get replayId from message (#3975) e692131 is described below commit e6921313a25084f23dd6562b1480f724b9cf981a Author: edgarc-ciandt <edg...@ciandt.com> AuthorDate: Tue Jul 7 02:51:01 2020 -0300 CAMEL-15233: Fix Replay Extension to correctly get replayId from message (#3975) --- .../internal/client/DefaultBulkApiClient.java | 5 +- ...tDReplayExtension.java => ReplayExtension.java} | 86 +++++++------- .../internal/streaming/SubscriptionHelper.java | 2 +- .../internal/streaming/ReplayExtensionTest.java | 127 +++++++++++++++++++++ 4 files changed, 170 insertions(+), 50 deletions(-) diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java index bf14908..6a83d62 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/client/DefaultBulkApiClient.java @@ -34,6 +34,9 @@ import javax.xml.parsers.SAXParserFactory; import javax.xml.transform.Source; import javax.xml.transform.sax.SAXSource; +import org.xml.sax.InputSource; +import org.xml.sax.SAXException; + import org.apache.camel.component.salesforce.SalesforceHttpClient; import org.apache.camel.component.salesforce.api.SalesforceException; import org.apache.camel.component.salesforce.api.dto.RestError; @@ -53,8 +56,6 @@ import org.eclipse.jetty.client.util.InputStreamContentProvider; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.util.StringUtil; -import org.xml.sax.InputSource; -import org.xml.sax.SAXException; public class DefaultBulkApiClient extends AbstractClientBase implements BulkApiClient { diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/CometDReplayExtension.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java similarity index 55% rename from components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/CometDReplayExtension.java rename to components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java index f0156ef..96355e1 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/CometDReplayExtension.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtension.java @@ -14,40 +14,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -/** - * Copyright (c) 2016, Salesforce Developers +/* + * Copyright (c) 2016, salesforce.com, inc. * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are met: - * - * 1. Redistributions of source code must retain the above copyright notice, - * this list of conditions and the following disclaimer. - * 2. Redistributions in binary form must reproduce the above copyright - * notice, this list of conditions and the following disclaimer in the - * documentation and/or other materials provided with the distribution. - * 3. Neither the name of the copyright holder nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" - * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE - * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE - * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE - * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR - * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS - * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN - * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) - * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - **/ + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.TXT file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ package org.apache.camel.component.salesforce.internal.streaming; + import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import org.cometd.bayeux.Channel; import org.cometd.bayeux.Message; @@ -55,14 +36,16 @@ import org.cometd.bayeux.client.ClientSession; import org.cometd.bayeux.client.ClientSession.Extension.Adapter; /** - * CometDReplayExtension, typical usages are the following: - * {@code client.addExtension(new CometDReplayExtension<>(replayMap));} + * The Bayeux extension for replay * - * @author yzhao - * @since 198 (Winter '16) + * @author hal.hildebrand + * @since API v37.0 */ -public class CometDReplayExtension extends Adapter { +public class ReplayExtension extends Adapter { private static final String EXTENSION_NAME = "replay"; + private static final String EVENT_KEY = "event"; + private static final String REPLAY_ID_KEY = "replayId"; + private final ConcurrentMap<String, Long> dataMap = new ConcurrentHashMap<>(); private final AtomicBoolean supported = new AtomicBoolean(); @@ -72,20 +55,11 @@ public class CometDReplayExtension extends Adapter { @Override public boolean rcv(ClientSession session, Message.Mutable message) { - final Object value = message.get(EXTENSION_NAME); - - final Long replayId; - if (value instanceof Long) { - replayId = (Long)value; - } else if (value instanceof Number) { - replayId = ((Number)value).longValue(); - } else { - replayId = null; - } - + Long replayId = getReplayId(message); if (this.supported.get() && replayId != null) { try { - dataMap.put(message.getChannel(), replayId); + String channel = topicWithoutQueryString(message.getChannel()); + dataMap.put(channel, replayId); } catch (ClassCastException e) { return false; } @@ -101,7 +75,6 @@ public class CometDReplayExtension extends Adapter { this.supported.set(ext != null && Boolean.TRUE.equals(ext.get(EXTENSION_NAME))); break; default: - break; } return true; } @@ -118,8 +91,27 @@ public class CometDReplayExtension extends Adapter { } break; default: - break; } return true; } -} + + private static Long getReplayId(Message.Mutable message) { + Map<String, Object> data = message.getDataAsMap(); + @SuppressWarnings("unchecked") + Optional<Long> optional = resolve(() -> (Long)((Map<String, Object>)data.get(EVENT_KEY)).get(REPLAY_ID_KEY)); + return optional.orElse(null); + } + + private static <T> Optional<T> resolve(Supplier<T> resolver) { + try { + T result = resolver.get(); + return Optional.ofNullable(result); + } catch (NullPointerException e) { + return Optional.empty(); + } + } + + private static String topicWithoutQueryString(String fullTopic) { + return fullTopic.split("\\?")[0]; + } +} \ No newline at end of file diff --git a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java index c10aa92..ea1f6d0 100644 --- a/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java +++ b/components/camel-salesforce/camel-salesforce-component/src/main/java/org/apache/camel/component/salesforce/internal/streaming/SubscriptionHelper.java @@ -58,7 +58,7 @@ import static org.cometd.bayeux.Message.SUBSCRIPTION_FIELD; public class SubscriptionHelper extends ServiceSupport { - static final CometDReplayExtension REPLAY_EXTENSION = new CometDReplayExtension(); + static final ReplayExtension REPLAY_EXTENSION = new ReplayExtension(); private static final Logger LOG = LoggerFactory.getLogger(SubscriptionHelper.class); diff --git a/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtensionTest.java b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtensionTest.java new file mode 100644 index 0000000..287234c --- /dev/null +++ b/components/camel-salesforce/camel-salesforce-component/src/test/java/org/apache/camel/component/salesforce/internal/streaming/ReplayExtensionTest.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.salesforce.internal.streaming; + +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; + +import org.cometd.bayeux.Channel; +import org.cometd.bayeux.Message; +import org.cometd.common.HashMapMessage; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class ReplayExtensionTest { + + static Message.Mutable createPushTopicMessage(boolean addReplayId) { + final Message.Mutable pushTopicMessage = new HashMapMessage(); + pushTopicMessage.put("clientId", "lxdl9o32njygi1gj47kgfaga4k"); + + final Map<String, Object> data = new HashMap<>(); + pushTopicMessage.put("data", data); + + final Map<String, Object> event = new HashMap<>(); + data.put("event", event); + + event.put("createdDate", "2016-09-16T19:45:27.454Z"); + if (addReplayId) { + event.put("replayId", 1L); + } + event.put("type", "created"); + + final Map<String, Object> sobject = new HashMap<>(); + data.put("sobject", sobject); + + sobject.put("Phone", "(415) 555-1212"); + sobject.put("Id", "001D000000KneakIAB"); + sobject.put("Name", "Blackbeard"); + + pushTopicMessage.put("channel", "/topic/AccountUpdates"); + return pushTopicMessage; + } + + static Message.Mutable createHandshakeMessage(Boolean isReplaySupported) { + final Message.Mutable handshakeMessage = new HashMapMessage(); + HashMap<String, Object> ext = new HashMap<>(); + handshakeMessage.put("ext", ext); + handshakeMessage.put("channel", Channel.META_HANDSHAKE); + ext.put("replay", isReplaySupported); + + return handshakeMessage; + } + + @SuppressWarnings("unchecked") + static ConcurrentMap<String, Long> getDataMap(ReplayExtension replayExtension) + throws NoSuchFieldException, IllegalAccessException { + Field field = ReplayExtension.class.getDeclaredField("dataMap"); + field.setAccessible(true); + + return (ConcurrentMap<String, Long>) field.get(replayExtension); + } + + @Test + public void shouldKeepPreviousValueIfReplayIdNotInMessageWhenIsSupported() + throws NoSuchFieldException, IllegalAccessException { + final Message.Mutable pushTopicMessage = createPushTopicMessage(false); + + final ReplayExtension replayExtension = new ReplayExtension(); + replayExtension.rcvMeta(null, createHandshakeMessage(true)); + + replayExtension.addChannelReplayId(pushTopicMessage.getChannel(), 123L); + + replayExtension.rcv(null, pushTopicMessage); + + ConcurrentMap<String, Long> dataMap = getDataMap(replayExtension); + + assertEquals(Long.valueOf(123L), dataMap.get("/topic/AccountUpdates")); + } + + @Test + public void shouldUpdateReplayIdFromMessageWhenIsSupported() throws NoSuchFieldException, IllegalAccessException { + final Message.Mutable pushTopicMessage = createPushTopicMessage(true); + + final ReplayExtension replayExtension = new ReplayExtension(); + replayExtension.rcvMeta(null, createHandshakeMessage(true)); + + replayExtension.addChannelReplayId(pushTopicMessage.getChannel(), 123L); + + replayExtension.rcv(null, pushTopicMessage); + + ConcurrentMap<String, Long> dataMap = getDataMap(replayExtension); + + assertEquals(Long.valueOf(1L), dataMap.get("/topic/AccountUpdates")); + + } + + @Test + public void shouldNotUpdateReplayIdFromMessageWhenIsNotSupported() + throws NoSuchFieldException, IllegalAccessException { + final Message.Mutable pushTopicMessage = createPushTopicMessage(true); + + final ReplayExtension replayExtension = new ReplayExtension(); + replayExtension.rcvMeta(null, createHandshakeMessage(false)); + + replayExtension.rcv(null, pushTopicMessage); + + ConcurrentMap<String, Long> dataMap = getDataMap(replayExtension); + + assertEquals(0, dataMap.size()); + } +}