hakidehari opened a new issue #1339:
URL: https://github.com/apache/camel-kafka-connector/issues/1339


   Hi all,
   
   I have been trying to troubleshoot some issues I have been seeing with the 
Camel Kafka Salesforce Source connector to no avail. I have two different kafka 
topics which each have 2 connectors flowing information into them via 
Salesforce streaming API using the Salesforce source connector.  There are a 
few weird things I am seeing.  The connectors seem to be reconnecting to the 
Salesforce streams every minute or so.  Here is an example of what I see in the 
logs
   
   `[2022-02-14 02:36:41,133] WARN [sf_account_change_connector|task-0] Connect 
failure: {failure={exception=org.cometd.common.TransportException: 
{httpCode=504}, message={clientId=y98udzrc3x45vv1jfrg9twg1f9e, 
channel=/meta/connect, id=30557, connectionType=long-polling}, httpCode=504, 
connectionType=long-polling}, channel=/meta/connect, id=30557, 
successful=false} 
(org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:174)
   [2022-02-14 02:36:41,252] INFO [sf_account_change_connector|task-0] 
Restarting on unexpected disconnect from Salesforce... 
(org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:262)
   [2022-02-14 02:36:41,525] INFO [sf_account_change_connector|task-0] Set 
Replay extension to replay from `-1` for channel `/data/AccountChangeEvent` 
(org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:547)
   [2022-02-14 02:36:41,525] INFO [sf_account_change_connector|task-0] 
Subscribing to channel /data/AccountChangeEvent... 
(org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:435)
   [2022-02-14 02:36:41,526] INFO [sf_account_change_connector|task-0] 
Successfully restarted! 
(org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:312)
   [2022-02-14 02:36:41,569] INFO [sf_account_change_connector|task-0] 
Subscribed to channel /data/AccountChangeEvent 
(org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:499)
   [2022-02-14 02:36:46,193] WARN [sf_order_p_event_connector|task-0] Connect 
failure: {failure={exception=org.cometd.common.TransportException: 
{httpCode=504}, message={clientId=139646d4ddo6w8r1v1xmp4rj3bli, 
channel=/meta/connect, id=30562, connectionType=long-polling}, httpCode=504, 
connectionType=long-polling}, channel=/meta/connect, id=30562, 
successful=false} 
(org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:174)
   [2022-02-14 02:36:46,210] INFO [sf_account_change_connector|task-0|offsets] 
WorkerSourceTask{id=sf_account_change_connector-0} flushing 0 outstanding 
messages for offset commit 
(org.apache.kafka.connect.runtime.WorkerSourceTask:487)
   [2022-02-14 02:36:46,336] INFO [sf_order_p_event_connector|task-0] 
Restarting on unexpected disconnect from Salesforce... 
(org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:262)
   [2022-02-14 02:36:46,392] INFO [sf_order_p_event_connector|task-0|offsets] 
WorkerSourceTask{id=sf_order_p_event_connector-0} flushing 0 outstanding 
messages for offset commit 
(org.apache.kafka.connect.runtime.WorkerSourceTask:487)
   [2022-02-14 02:36:46,596] INFO [sf_order_p_event_connector|task-0] Set 
Replay extension to replay from `-1` for channel `/event/Order_Completed__e` 
(org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:547)
   [2022-02-14 02:36:46,596] INFO [sf_order_p_event_connector|task-0] 
Subscribing to channel /event/Order_Completed__e... 
(org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:435)
   `
   
   Eventually, after a few hours or even a couple of days, some of the 
connectors fail to reconnect.  Here, I see two different errors.  I see either 
401::Authentication invalid error or 403::Unknown Client error.  Examples:
   
   `[2022-02-14 04:11:15,324] ERROR [sf_order_p_event_connector|task-0] Error 
restarting: Error during CONNECT: 403::Unknown client 
(org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:307)
   org.apache.camel.CamelException: Error during CONNECT: 403::Unknown client
           at 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.connect(SubscriptionHelper.java:230)
           at 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.doStart(SubscriptionHelper.java:212)
           at 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.performClientRestart(SubscriptionHelper.java:304)
           at 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.access$1000(SubscriptionHelper.java:60)
           at 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper$4.run(SubscriptionHelper.java:249)
           at 
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:882)
           at 
org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1036)
           at java.lang.Thread.run(Thread.java:748)
   `
   
   and
   
   `[2022-02-09 19:58:39,911] ERROR [sf_contact_change_connector|task-0] Error 
restarting: Exception during HANDSHAKE: 401::Authentication invalid 
(org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:307)
   org.apache.camel.CamelException: Exception during HANDSHAKE: 
401::Authentication invalid
           at 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.connect(SubscriptionHelper.java:223)
           at 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.doStart(SubscriptionHelper.java:212)
           at 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.performClientRestart(SubscriptionHelper.java:304)
           at 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.access$1000(SubscriptionHelper.java:60)
           at 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper$4.run(SubscriptionHelper.java:249)
           at 
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:882)
           at 
org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1036)
           at java.lang.Thread.run(Thread.java:748)
   Caused by: org.apache.camel.component.salesforce.api.SalesforceException: 
401::Authentication invalid
           at 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.getFailure(SubscriptionHelper.java:340)
           at 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.access$300(SubscriptionHelper.java:60)
           at 
org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper$1.onMessage(SubscriptionHelper.java:132)
           at 
org.cometd.common.AbstractClientSession$AbstractSessionChannel.notifyOnMessage(AbstractClientSession.java:583)
           at 
org.cometd.common.AbstractClientSession$AbstractSessionChannel.notifyMessageListeners(AbstractClientSession.java:568)
           at 
org.cometd.common.AbstractClientSession.notifyListeners(AbstractClientSession.java:308)
           at 
org.cometd.common.AbstractClientSession.lambda$receive$4(AbstractClientSession.java:269)
           at org.cometd.bayeux.Promise$2.succeed(Promise.java:103)
           at 
org.cometd.common.AsyncFoldLeft$AbstractLoop.run(AsyncFoldLeft.java:199)
           at org.cometd.common.AsyncFoldLeft.run(AsyncFoldLeft.java:93)
           at 
org.cometd.common.AbstractClientSession.extendIncoming(AbstractClientSession.java:103)
           at 
org.cometd.common.AbstractClientSession.receive(AbstractClientSession.java:263)
           at 
org.cometd.client.BayeuxClient.failHandshake(BayeuxClient.java:721)
           at 
org.cometd.client.BayeuxClient.processHandshake(BayeuxClient.java:707)
   `
   
   This is not specific to one connector but can happen on any of them.  Here 
is an example config of my Salesforce Source connector which I am using.  I am 
using the REFRESH_TOKEN authentication method and have set the refresh token to 
expire if it is not used for 365 days within Salesforce.  The authentication is 
good, as is evident by it working fine when I recreate the connectors.
   
   `{
    "name": "sf_account_change_connector",
    "config": {
           "camel.source.endpoint.rawPayload": true,
           "key.converter": "org.apache.kafka.connect.storage.StringConverter",
           "key.converter.schemas.enable": false,
           "value.converter.schemas.enable":false,
           
"value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter",
           "camel.source.marshal":"json-jackson",
           
"connector.class":"org.apache.camel.kafkaconnector.salesforce.CamelSalesforceSourceConnector",
           "camel.component.salesforce.loginUrl":"<redacted>",
           "camel.component.salesforce.instanceUrl":"<redacted>",
           "topics": "CISL_SF_ASSET_INFO_INPUT",
           "errors.tolerance":"all",
           "errors.retry.timeout":"-1",
           "camel.source.path.topicName":"/data/AccountChangeEvent",
           "camel.source.endpoint.replayId":"-1",
           "camel.component.salesforce.authenticationType":"REFRESH_TOKEN",
           "camel.component.salesforce.clientId":"<redacted>",
           "camel.component.salesforce.clientSecret":"<redacted>",
           "camel.component.salesforce.refreshToken":"redacted",
           "camel.component.salesforce.userName":"<redacted>"
    }
   }
   `
   
   All the connectors have similar configs to this one, only changes are the 
streaming API path and topic names.   So my questions are this:
   
   Why is it reconnecting to the streams every so often?  Is this a setting I 
can change in the connector config itself or is this something SF specific? 
Also, why are there occasional 403 and 401 errors arising, even though my 
authentication is perfectly valid?  The connectors seem to stop working when 
one of these errors happen.
   
   Thanks in advance!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to