hakidehari opened a new issue #1339: URL: https://github.com/apache/camel-kafka-connector/issues/1339
Hi all, I have been trying to troubleshoot some issues I have been seeing with the Camel Kafka Salesforce Source connector to no avail. I have two different kafka topics which each have 2 connectors flowing information into them via Salesforce streaming API using the Salesforce source connector. There are a few weird things I am seeing. The connectors seem to be reconnecting to the Salesforce streams every minute or so. Here is an example of what I see in the logs `[2022-02-14 02:36:41,133] WARN [sf_account_change_connector|task-0] Connect failure: {failure={exception=org.cometd.common.TransportException: {httpCode=504}, message={clientId=y98udzrc3x45vv1jfrg9twg1f9e, channel=/meta/connect, id=30557, connectionType=long-polling}, httpCode=504, connectionType=long-polling}, channel=/meta/connect, id=30557, successful=false} (org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:174) [2022-02-14 02:36:41,252] INFO [sf_account_change_connector|task-0] Restarting on unexpected disconnect from Salesforce... (org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:262) [2022-02-14 02:36:41,525] INFO [sf_account_change_connector|task-0] Set Replay extension to replay from `-1` for channel `/data/AccountChangeEvent` (org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:547) [2022-02-14 02:36:41,525] INFO [sf_account_change_connector|task-0] Subscribing to channel /data/AccountChangeEvent... (org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:435) [2022-02-14 02:36:41,526] INFO [sf_account_change_connector|task-0] Successfully restarted! (org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:312) [2022-02-14 02:36:41,569] INFO [sf_account_change_connector|task-0] Subscribed to channel /data/AccountChangeEvent (org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:499) [2022-02-14 02:36:46,193] WARN [sf_order_p_event_connector|task-0] Connect failure: {failure={exception=org.cometd.common.TransportException: {httpCode=504}, message={clientId=139646d4ddo6w8r1v1xmp4rj3bli, channel=/meta/connect, id=30562, connectionType=long-polling}, httpCode=504, connectionType=long-polling}, channel=/meta/connect, id=30562, successful=false} (org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:174) [2022-02-14 02:36:46,210] INFO [sf_account_change_connector|task-0|offsets] WorkerSourceTask{id=sf_account_change_connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487) [2022-02-14 02:36:46,336] INFO [sf_order_p_event_connector|task-0] Restarting on unexpected disconnect from Salesforce... (org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:262) [2022-02-14 02:36:46,392] INFO [sf_order_p_event_connector|task-0|offsets] WorkerSourceTask{id=sf_order_p_event_connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:487) [2022-02-14 02:36:46,596] INFO [sf_order_p_event_connector|task-0] Set Replay extension to replay from `-1` for channel `/event/Order_Completed__e` (org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:547) [2022-02-14 02:36:46,596] INFO [sf_order_p_event_connector|task-0] Subscribing to channel /event/Order_Completed__e... (org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:435) ` Eventually, after a few hours or even a couple of days, some of the connectors fail to reconnect. Here, I see two different errors. I see either 401::Authentication invalid error or 403::Unknown Client error. Examples: `[2022-02-14 04:11:15,324] ERROR [sf_order_p_event_connector|task-0] Error restarting: Error during CONNECT: 403::Unknown client (org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:307) org.apache.camel.CamelException: Error during CONNECT: 403::Unknown client at org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.connect(SubscriptionHelper.java:230) at org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.doStart(SubscriptionHelper.java:212) at org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.performClientRestart(SubscriptionHelper.java:304) at org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.access$1000(SubscriptionHelper.java:60) at org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper$4.run(SubscriptionHelper.java:249) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:882) at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1036) at java.lang.Thread.run(Thread.java:748) ` and `[2022-02-09 19:58:39,911] ERROR [sf_contact_change_connector|task-0] Error restarting: Exception during HANDSHAKE: 401::Authentication invalid (org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper:307) org.apache.camel.CamelException: Exception during HANDSHAKE: 401::Authentication invalid at org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.connect(SubscriptionHelper.java:223) at org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.doStart(SubscriptionHelper.java:212) at org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.performClientRestart(SubscriptionHelper.java:304) at org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.access$1000(SubscriptionHelper.java:60) at org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper$4.run(SubscriptionHelper.java:249) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:882) at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1036) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.camel.component.salesforce.api.SalesforceException: 401::Authentication invalid at org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.getFailure(SubscriptionHelper.java:340) at org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper.access$300(SubscriptionHelper.java:60) at org.apache.camel.component.salesforce.internal.streaming.SubscriptionHelper$1.onMessage(SubscriptionHelper.java:132) at org.cometd.common.AbstractClientSession$AbstractSessionChannel.notifyOnMessage(AbstractClientSession.java:583) at org.cometd.common.AbstractClientSession$AbstractSessionChannel.notifyMessageListeners(AbstractClientSession.java:568) at org.cometd.common.AbstractClientSession.notifyListeners(AbstractClientSession.java:308) at org.cometd.common.AbstractClientSession.lambda$receive$4(AbstractClientSession.java:269) at org.cometd.bayeux.Promise$2.succeed(Promise.java:103) at org.cometd.common.AsyncFoldLeft$AbstractLoop.run(AsyncFoldLeft.java:199) at org.cometd.common.AsyncFoldLeft.run(AsyncFoldLeft.java:93) at org.cometd.common.AbstractClientSession.extendIncoming(AbstractClientSession.java:103) at org.cometd.common.AbstractClientSession.receive(AbstractClientSession.java:263) at org.cometd.client.BayeuxClient.failHandshake(BayeuxClient.java:721) at org.cometd.client.BayeuxClient.processHandshake(BayeuxClient.java:707) ` This is not specific to one connector but can happen on any of them. Here is an example config of my Salesforce Source connector which I am using. I am using the REFRESH_TOKEN authentication method and have set the refresh token to expire if it is not used for 365 days within Salesforce. The authentication is good, as is evident by it working fine when I recreate the connectors. `{ "name": "sf_account_change_connector", "config": { "camel.source.endpoint.rawPayload": true, "key.converter": "org.apache.kafka.connect.storage.StringConverter", "key.converter.schemas.enable": false, "value.converter.schemas.enable":false, "value.converter":"org.apache.kafka.connect.converters.ByteArrayConverter", "camel.source.marshal":"json-jackson", "connector.class":"org.apache.camel.kafkaconnector.salesforce.CamelSalesforceSourceConnector", "camel.component.salesforce.loginUrl":"<redacted>", "camel.component.salesforce.instanceUrl":"<redacted>", "topics": "CISL_SF_ASSET_INFO_INPUT", "errors.tolerance":"all", "errors.retry.timeout":"-1", "camel.source.path.topicName":"/data/AccountChangeEvent", "camel.source.endpoint.replayId":"-1", "camel.component.salesforce.authenticationType":"REFRESH_TOKEN", "camel.component.salesforce.clientId":"<redacted>", "camel.component.salesforce.clientSecret":"<redacted>", "camel.component.salesforce.refreshToken":"redacted", "camel.component.salesforce.userName":"<redacted>" } } ` All the connectors have similar configs to this one, only changes are the streaming API path and topic names. So my questions are this: Why is it reconnecting to the streams every so often? Is this a setting I can change in the connector config itself or is this something SF specific? Also, why are there occasional 403 and 401 errors arising, even though my authentication is perfectly valid? The connectors seem to stop working when one of these errors happen. Thanks in advance! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org