Repository: incubator-unomi Updated Branches: refs/heads/master 139e1b751 -> a897ab843
http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a897ab84/extensions/salesforce-connector/services/src/main/java/org/apache/unomi/sfdc/services/internal/SFDCServiceImpl.java ---------------------------------------------------------------------- diff --git a/extensions/salesforce-connector/services/src/main/java/org/apache/unomi/sfdc/services/internal/SFDCServiceImpl.java b/extensions/salesforce-connector/services/src/main/java/org/apache/unomi/sfdc/services/internal/SFDCServiceImpl.java new file mode 100644 index 0000000..7ef6bea --- /dev/null +++ b/extensions/salesforce-connector/services/src/main/java/org/apache/unomi/sfdc/services/internal/SFDCServiceImpl.java @@ -0,0 +1,815 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.unomi.sfdc.services.internal; + + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.util.ISO8601DateFormat; +import org.apache.http.Header; +import org.apache.http.HttpException; +import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.entity.UrlEncodedFormEntity; +import org.apache.http.client.methods.*; +import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.message.BasicNameValuePair; +import org.apache.http.util.EntityUtils; +import org.apache.unomi.api.Profile; +import org.apache.unomi.persistence.spi.PersistenceService; +import org.apache.unomi.sfdc.services.SFDCConfiguration; +import org.apache.unomi.sfdc.services.SFDCService; +import org.apache.unomi.sfdc.services.SFDCSession; +import org.cometd.bayeux.Channel; +import org.cometd.bayeux.Message; +import org.cometd.bayeux.client.ClientSessionChannel; +import org.cometd.client.BayeuxClient; +import org.cometd.client.transport.ClientTransport; +import org.cometd.client.transport.LongPollingTransport; +import org.eclipse.jetty.client.ContentExchange; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.util.ajax.JSON; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLEncoder; +import java.text.DateFormat; +import java.text.ParseException; +import java.util.*; + +/** + * Implementation of the Salesforce connector interface + */ +public class SFDCServiceImpl implements SFDCService { + private static final Logger logger = LoggerFactory.getLogger(SFDCServiceImpl.class.getName()); + + private static final String REST_ENDPOINT_URI = "/services/data/v38.0"; + private static final String STREAMING_ENDPOINT_URI = "/cometd/38.0"; + + private static final int CONNECTION_TIMEOUT = 20 * 1000; // milliseconds + private static final int READ_TIMEOUT = 120 * 1000; // milliseconds + + private SFDCConfiguration sfdcConfiguration; + private SFDCConfiguration defaultSFDCConfiguration; + + private Set<String> sfdcLeadMandatoryFields = new TreeSet<>(); + private Set<String> sfdcLeadUpdateableFields = new TreeSet<>(); + + private SFDCSession sfdcSession; + private DateFormat iso8601DateFormat = new ISO8601DateFormat(); + + private PersistenceService persistenceService; + + public void setPersistenceService(PersistenceService persistenceService) { + this.persistenceService = persistenceService; + } + + public void setDefaultSFDCConfiguration(SFDCConfiguration defaultSFDCConfiguration) { + this.defaultSFDCConfiguration = defaultSFDCConfiguration; + } + + public SFDCSession getSFDCSession() { + return sfdcSession; + } + + @Override + public SFDCConfiguration loadConfiguration() { + if (persistenceService == null) { + return null; + } + SFDCConfiguration sfdcConfiguration = persistenceService.load("sfdcConfiguration", SFDCConfiguration.class); + return sfdcConfiguration; + } + + @Override + public boolean saveConfiguration(SFDCConfiguration sfdcConfiguration) { + if (persistenceService == null) { + return false; + } + boolean result = persistenceService.save(sfdcConfiguration); + if (result) { + this.sfdcConfiguration = sfdcConfiguration; + try { + if (login(sfdcConfiguration)) { + return true; + } + } catch (HttpException e) { + logger.warn("Error trying to login with new configuration {}", sfdcConfiguration, e); + result = false; + } catch (IOException e) { + logger.warn("Error trying to login with new configuration {}", sfdcConfiguration, e); + result = false; + } + } else { + logger.error("Error trying to save new Salesforce connection configuration !"); + } + return result; + } + + public void start() { + try { + iso8601DateFormat = new ISO8601DateFormat(); + + SFDCConfiguration sfdcConfiguration = loadConfiguration(); + if (sfdcConfiguration != null) { + this.sfdcConfiguration = sfdcConfiguration; + } else { + this.sfdcConfiguration = defaultSFDCConfiguration; + } + + if (this.sfdcConfiguration.isComplete()) { + boolean loginSuccessful = login(this.sfdcConfiguration); + if (!loginSuccessful) { + throw new Exception("Login failed"); + } + sfdcLeadMandatoryFields = getLeadMandatoryFields(); + // setupPushTopics(SFDCSession.getEndPoint(), SFDCSession.getSessionId()); + logger.info("Salesforce connector initialized successfully."); + } else { + logger.warn("Salesforce connector is not yet configured."); + } + } catch (HttpException | IOException e) { + logger.error("Failed to init SFDCService properly", e); + } catch (Exception e) { + logger.error("Failed to init SFDCService properly", e); + } + } + + public void stop() { + } + + public Set<String> getRecentLeadIds() { + if (!isConnected()) { + return null; + } + Set<String> recentLeadIds = new LinkedHashSet<>(); + + String baseUrl = sfdcSession.getEndPoint() + REST_ENDPOINT_URI + "/sobjects/Lead"; + HttpGet getRecentLeads = new HttpGet(baseUrl); + + try { + Object responseObject = handleRequest(getRecentLeads); + if (responseObject == null) { + logger.warn("Couldn't retrieve recent leads"); + return null; + } + Map<String, Object> queryResponse = (Map<String, Object>) responseObject; + if (queryResponse.containsKey("recentItems")) { + logger.debug("Response received from Salesforce: {}", queryResponse); + Object[] recentItems = (Object[]) queryResponse.get("recentItems"); + for (Object recentItem : recentItems) { + Map<String, String> recentItemMap = (Map<String, String>) recentItem; + recentLeadIds.add(recentItemMap.get("Id")); + } + } + + } catch (IOException e) { + logger.error("Error getting recent leads", e); + } catch (HttpException e) { + logger.error("Error getting recent leads", e); + } + + return recentLeadIds; + } + + public Map<String,Object> getSObject(String sobjectName, String objectId) { + if (!isConnected()) { + return null; + } + Map<String, Object> sobjectMap = new LinkedHashMap<>(); + + String baseUrl = sfdcSession.getEndPoint() + REST_ENDPOINT_URI + "/sobjects/" + sobjectName +"/" + objectId; + HttpGet getSObject = new HttpGet(baseUrl); + + try { + Object responseObject = handleRequest(getSObject); + if (responseObject == null) { + logger.warn("Couldn't retrieve sobject {} with id {}", sobjectName, objectId); + return null; + } + Map<String, Object> queryResponse = (Map<String, Object>) responseObject; + if (queryResponse != null) { + logger.debug("Response received from Salesforce: {}", queryResponse); + sobjectMap = new LinkedHashMap<>(queryResponse); + } + + } catch (IOException e) { + logger.error("Error getting sobject {} with id {}", sobjectName, objectId, e); + } catch (HttpException e) { + logger.error("Error getting sobject {} with id {}", sobjectName, objectId, e); + } + return sobjectMap; + } + + public Map<String,Object> getSObjectDescribe(String sobjectName) { + Map<String, Object> sobjectDescribe = new LinkedHashMap<>(); + if (!isConnected()) { + return null; + } + + String baseUrl = sfdcSession.getEndPoint() + REST_ENDPOINT_URI + "/sobjects/" + sobjectName +"/describe"; + HttpGet getSObjectDescribe = new HttpGet(baseUrl); + + try { + Object responseObject = handleRequest(getSObjectDescribe); + if (responseObject == null) { + logger.warn("Couldn't retrieve sobject {} describe", sobjectName); + return null; + } + Map<String, Object> queryResponse = (Map<String, Object>) responseObject; + if (queryResponse != null) { + logger.debug("Response received from Salesforce: {}", queryResponse); + sobjectDescribe = new LinkedHashMap<>(queryResponse); + } + + } catch (IOException e) { + logger.error("Error getting sobject {}", sobjectName, e); + } catch (HttpException e) { + logger.error("Error getting sobject {}", sobjectName, e); + } + return sobjectDescribe; + } + + public Map<String, Object> getLead(String leadId) { + return getSObject("Lead", leadId); + } + + public Set<String> getLeadMandatoryFields() { + Set<String> mandatoryFields = new TreeSet<>(); + if (!isConnected()) { + return null; + } + Map<String,Object> leadDescribe = getSObjectDescribe("Lead"); + Object[] fields = (Object[]) leadDescribe.get("fields"); + Set<String> updateableFields = new TreeSet<>(); + Set<String> compoundFieldNames = new TreeSet<>(); + for (Object field : fields) { + Map<String,Object> fieldDescribe = (Map<String,Object>) field; + String fieldName = (String) fieldDescribe.get("name"); + String compoundFieldName = (String) fieldDescribe.get("compoundFieldName"); + if (compoundFieldName != null) { + compoundFieldNames.add(compoundFieldName); + } + String fieldType = (String) fieldDescribe.get("type"); + Boolean fieldUpdateable = (Boolean) fieldDescribe.get("updateable"); + Boolean fieldCreateable = (Boolean) fieldDescribe.get("createable"); + Boolean fieldDefaultedOnCreate = (Boolean) fieldDescribe.get("defaultedOnCreate"); + Boolean fieldNillable = (Boolean) fieldDescribe.get("nillable"); + if (fieldUpdateable) { + updateableFields.add(fieldName); + } + if (!fieldNillable && !fieldDefaultedOnCreate) { + mandatoryFields.add(fieldName); + } + } + mandatoryFields.removeAll(compoundFieldNames); + updateableFields.removeAll(compoundFieldNames); + sfdcLeadUpdateableFields = updateableFields; + return mandatoryFields; + } + + public boolean deleteLead(String leadId) { + if (!isConnected()) { + return false; + } + String baseUrl = sfdcSession.getEndPoint() + REST_ENDPOINT_URI + "/sobjects/Lead/" + leadId; + HttpDelete deleteLead = new HttpDelete(baseUrl); + try { + Object responseObject = handleRequest(deleteLead); + } catch (IOException e) { + logger.error("Error deleting lead {}", leadId, e); + } catch (HttpException e) { + logger.error("Error deleting lead {}", leadId, e); + } + return true; + } + + public Set<String> findLeadIdsByIdentifierValue(String identifierFieldValue) { + Set<String> results = new LinkedHashSet<String>(); + if (!isConnected()) { + return results; + } + Object response = query("SELECT Id FROM Lead WHERE " + sfdcConfiguration.getSfdcIdentifierField() + "='" + identifierFieldValue + "'"); + if (response == null) { + return results; + } + Map<String, Object> result = (Map<String, Object>) response; + Long totalSize = (Long) result.get("totalSize"); + Boolean done = (Boolean) result.get("done"); + Object[] recordObjects = (Object[]) result.get("records"); + if (totalSize == null || totalSize < 1) { + return results; + } + for (Object recordObject : recordObjects) { + Map<String, Object> record = (Map<String, Object>) recordObject; + if (record.containsKey("Id")) { + results.add((String) record.get("Id")); + } + } + return results; + } + + @Override + public String createOrUpdateLead(Profile profile) { + if (!isConnected()) { + return null; + } + // first we must check if an existing lead exists for the profile. + String unomiIdentifierValue = (String) profile.getProperty(sfdcConfiguration.getUnomiIdentifierField()); + logger.info("Checking if we have a lead for identifier value {}...", unomiIdentifierValue); + Set<String> foundExistingSfdcLeadIds = findLeadIdsByIdentifierValue(unomiIdentifierValue); + + Map<String, Object> sfdcLeadFields = new HashMap<>(); + Map<String, Object> existingSfdcLeadFields = new HashMap<>(); + Date sfdcLastModified = null; + + if (foundExistingSfdcLeadIds.size() > 1) { + // we found multiple leads matching the identifier value ! + logger.warn("Found multiple matching leads for identifier value {}, will use first matching one !", unomiIdentifierValue); + } + + if (foundExistingSfdcLeadIds.size() > 0) { + logger.info("Found an existing lead, attempting to update it..."); + // we found an existing lead we must update it + existingSfdcLeadFields = getLead(foundExistingSfdcLeadIds.iterator().next()); + if (existingSfdcLeadFields.get("LastModifiedDate") != null) { + try { + sfdcLastModified = iso8601DateFormat.parse((String) existingSfdcLeadFields.get("LastModifiedDate")); + } catch (ParseException e) { + logger.error("Error parsing date {}", existingSfdcLeadFields.get("LastModifiedDate"), e); + } + } + } else { + logger.info("No existing lead found."); + } + + for (String profilePropertyKey : profile.getProperties().keySet()) { + String sfdcFieldName = sfdcConfiguration.getUnomiToSfdcFieldMappings().get(profilePropertyKey); + if (sfdcFieldName == null) { + // we skip unmapped fields + continue; + } + Object unomiPropertyValue = profile.getProperties().get(profilePropertyKey); + if (existingSfdcLeadFields.get(sfdcFieldName) == null) { + // we only set the field if it didn't have a value. + logger.info("Setting SFDC field {} value to {}", sfdcFieldName, unomiPropertyValue); + sfdcLeadFields.put(sfdcFieldName, unomiPropertyValue); + } else { + // current strategy : Unomi field value wins if different from Salesforce value + // @todo we should probably improve this by tracking last modification dates on profile/lead properties + Object sfdcLeadFieldValue = existingSfdcLeadFields.get(sfdcFieldName); + if (!unomiPropertyValue.equals(sfdcLeadFieldValue)) { + logger.info("Overwriting SFDC field {} value to {}", sfdcFieldName, unomiPropertyValue); + sfdcLeadFields.put(sfdcFieldName, unomiPropertyValue); + } + } + } + + if (sfdcLeadFields.size() == 0) { + logger.info("No SFDC field value to send, will not send anything to Salesforce."); + if (foundExistingSfdcLeadIds.size() == 0) { + return null; + } else { + return foundExistingSfdcLeadIds.iterator().next(); + } + } + + if (existingSfdcLeadFields.size() == 0) { + // if we are creating a lead, let's make sure we have all the mandatory fields before sending the request + boolean missingMandatoryFields = false; + for (String leadMandatoryFieldName : sfdcLeadMandatoryFields) { + if (sfdcLeadFields.get(leadMandatoryFieldName) == null) { + logger.warn("Missing mandatory field {}, aborting sending to Salesforce", leadMandatoryFieldName); + missingMandatoryFields = true; + } + } + if (missingMandatoryFields) { + return null; + } + } + + String baseUrl = sfdcSession.getEndPoint() + REST_ENDPOINT_URI + "/sobjects/Lead"; + HttpEntityEnclosingRequestBase request = new HttpPost(baseUrl); + if (foundExistingSfdcLeadIds.size() > 0) { + baseUrl = sfdcSession.getEndPoint() + REST_ENDPOINT_URI + "/sobjects/Lead/" + foundExistingSfdcLeadIds.iterator().next(); + sfdcLeadFields.remove("Id"); + request = new HttpPatch(baseUrl); + } + + try { + ObjectMapper objectMapper = new ObjectMapper(); + StringEntity requestEntity = new StringEntity( + objectMapper.writeValueAsString(sfdcLeadFields), + ContentType.APPLICATION_JSON); + request.setEntity(requestEntity); + Object responseObject = handleRequest(request); + if (responseObject == null) { + return null; + } + if (responseObject instanceof Map) { + Map<String, Object> responseData = (Map<String, Object>) responseObject; + if (responseData.get("id") != null) { + String sfdcId = (String) responseData.get("id"); + logger.info("Lead successfully created/updated in Salesforce. sfdcId={}", sfdcId); + return sfdcId; + } + } + logger.info("Response received from Salesforce: {}", responseObject); + } catch (IOException e) { + logger.error("Error creating or updating lead for profile {}", profile, e); + } catch (HttpException e) { + logger.error("Error creating or updating lead for profile {}", profile, e); + } + + if (foundExistingSfdcLeadIds.size() == 0) { + return null; + } else { + return foundExistingSfdcLeadIds.iterator().next(); + } + } + + @Override + public boolean updateProfileFromLead(Profile profile) { + if (!isConnected()) { + return false; + } + String unomiIdentifierValue = (String) profile.getProperty(sfdcConfiguration.getUnomiIdentifierField()); + Set<String> foundSfdcLeadIds = findLeadIdsByIdentifierValue(unomiIdentifierValue); + if (foundSfdcLeadIds.size() == 0) { + logger.info("No lead found in Salesforce corresponding to profile {}", profile); + // we didn't find a corresponding lead in salesforce. + return false; + } else if (foundSfdcLeadIds.size() > 1) { + logger.warn("Found multiple leads in Salesforce for identifier value {}, will use first one.", foundSfdcLeadIds); + } else { + logger.info("Found corresponding lead with identifier value {}", unomiIdentifierValue); + } + Map<String,Object> sfdcLead = getLead(foundSfdcLeadIds.iterator().next()); + if (sfdcLead == null) { + logger.error("Error retrieving lead {} from Salesforce", foundSfdcLeadIds ); + return false; + } + boolean profileUpdated = false; + for (Map.Entry<String,String> sfdcToUnomiFieldMappingEntry : sfdcConfiguration.getSfdcToUnomiFieldMappings().entrySet()) { + String sfdcFieldName = sfdcToUnomiFieldMappingEntry.getKey(); + String unomiFieldName = sfdcToUnomiFieldMappingEntry.getValue(); + if (sfdcLead.get(sfdcFieldName) != null) { + Object sfdcFieldValue = sfdcLead.get(sfdcFieldName); + if (sfdcFieldValue != null && !sfdcFieldValue.equals(profile.getProperty(unomiFieldName))) { + profile.setProperty(unomiFieldName, sfdcFieldValue); + profileUpdated = true; + } + } + } + logger.info("Updated profile {} from Salesforce lead {}", profile, sfdcLead); + return profileUpdated; + } + + @Override + public Map<String,Object> query(String query) { + if (!isConnected()) { + return null; + } + // first we must check if an existing lead exists for the profile. + + String baseUrl = null; + try { + baseUrl = sfdcSession.getEndPoint() + REST_ENDPOINT_URI + "/query?q=" + URLEncoder.encode(query, "UTF-8"); + HttpGet get = new HttpGet(baseUrl); + + Object responseObject = handleRequest(get); + if (responseObject == null) { + return null; + } + if (responseObject != null && responseObject instanceof Map) { + return (Map<String,Object>) responseObject; + } + return null; + } catch (UnsupportedEncodingException e) { + logger.error("Error executing query {}", query, e); + return null; + } catch (ClientProtocolException e) { + logger.error("Error executing query {}", query, e); + return null; + } catch (IOException e) { + logger.error("Error executing query {}", query, e); + return null; + } catch (HttpException e) { + logger.error("Error executing query {}", query, e); + return null; + } + } + + @Override + public Map<String, Object> getLimits() { + if (!isConnected()) { + return null; + } + String baseUrl = null; + try { + baseUrl = sfdcSession.getEndPoint() + REST_ENDPOINT_URI + "/limits"; + HttpGet get = new HttpGet(baseUrl); + + Object responseObject = handleRequest(get); + if (responseObject == null) { + return null; + } + + if (responseObject instanceof Map) { + return (Map<String,Object>) responseObject; + } + return null; + } catch (UnsupportedEncodingException e) { + logger.error("Error retrieving Salesforce API Limits", e); + return null; + } catch (ClientProtocolException e) { + logger.error("Error retrieving Salesforce API Limits", e); + return null; + } catch (IOException e) { + logger.error("Error retrieving Salesforce API Limits", e); + return null; + } catch (HttpException e) { + logger.error("Error retrieving Salesforce API Limits", e); + return null; + } + } + + private BayeuxClient makeClient() throws Exception { + HttpClient httpClient = new HttpClient(); + httpClient.setConnectTimeout(CONNECTION_TIMEOUT); + httpClient.setTimeout(READ_TIMEOUT); + httpClient.start(); + + if (sfdcSession == null) { + logger.error("Invalid session !"); + return null; + } + logger.info("Login successful!\nServer URL: " + sfdcSession.getEndPoint() + + "\nSession ID=" + sfdcSession.getSessionId()); + + Map<String, Object> options = new HashMap<String, Object>(); + options.put(ClientTransport.TIMEOUT_OPTION, READ_TIMEOUT); + LongPollingTransport transport = new LongPollingTransport( + options, httpClient) { + + @Override + protected void customize(ContentExchange exchange) { + super.customize(exchange); + exchange.addRequestHeader("Authorization", "OAuth " + sfdcSession.getSessionId()); + } + }; + + BayeuxClient client = new BayeuxClient(getSalesforceStreamingEndpoint( + sfdcSession.getEndPoint()), transport); + return client; + } + + public void setupPushListener(String channelName, ClientSessionChannel.MessageListener messageListener) throws Exception { + if (!isConnected()) { + return; + } + final BayeuxClient client = makeClient(); + if (client == null) { + throw new Exception("Login failed !"); + } + client.getChannel(Channel.META_HANDSHAKE).addListener + (new ClientSessionChannel.MessageListener() { + @Override + public void onMessage(ClientSessionChannel channel, Message message) { + + logger.debug("[CHANNEL:META_HANDSHAKE]: " + message); + + boolean success = message.isSuccessful(); + if (!success) { + String error = (String) message.get("error"); + if (error != null) { + logger.error("Error during HANDSHAKE: " + error); + } + + Exception exception = (Exception) message.get("exception"); + if (exception != null) { + logger.error("Exception during HANDSHAKE: ", exception); + } + } + } + + }); + + client.getChannel(Channel.META_CONNECT).addListener( + new ClientSessionChannel.MessageListener() { + public void onMessage(ClientSessionChannel channel, Message message) { + + logger.debug("[CHANNEL:META_CONNECT]: " + message); + + boolean success = message.isSuccessful(); + if (!success) { + String error = (String) message.get("error"); + if (error != null) { + logger.error("Error during CONNECT: " + error); + } + } + } + + }); + + client.getChannel(Channel.META_SUBSCRIBE).addListener( + new ClientSessionChannel.MessageListener() { + + public void onMessage(ClientSessionChannel channel, Message message) { + + logger.debug("[CHANNEL:META_SUBSCRIBE]: " + message); + boolean success = message.isSuccessful(); + if (!success) { + String error = (String) message.get("error"); + if (error != null) { + logger.error("Error during SUBSCRIBE: " + error); + } + } + } + }); + + client.handshake(); + logger.debug("Waiting for handshake"); + + boolean handshaken = client.waitFor(10 * 1000, BayeuxClient.State.CONNECTED); + if (!handshaken) { + logger.error("Failed to handshake: " + client); + } + + logger.info("Subscribing for channel: " + channelName); + + client.getChannel(channelName).subscribe(messageListener); + + } + + private String getSalesforceStreamingEndpoint(String endpoint) throws MalformedURLException { + return new URL(endpoint + STREAMING_ENDPOINT_URI).toExternalForm(); + } + + private void setupPushTopics(String host, String sessionId) throws HttpException, IOException { + + String baseUrl = host + REST_ENDPOINT_URI + "/query?q=" + URLEncoder.encode("SELECT Id from PushTopic WHERE name = 'LeadUpdates'", "UTF-8"); + HttpGet get = new HttpGet(baseUrl); + + Map<String, String> queryResponse = (Map<String, String>) handleRequest(get); + + if (queryResponse != null && queryResponse.containsKey("count")) { + logger.info("Push topics setup successfully"); + } + } + + public boolean login(SFDCConfiguration sfdcConfiguration) + throws HttpException, IOException { + String baseUrl = sfdcConfiguration.getSfdcLoginEndpoint() + "/services/oauth2/token"; + HttpPost oauthPost = new HttpPost(baseUrl); + List<BasicNameValuePair> parametersBody = new ArrayList<>(); + parametersBody.add(new BasicNameValuePair("grant_type", "password")); + parametersBody.add(new BasicNameValuePair("username", sfdcConfiguration.getSfdcUserUsername())); + parametersBody.add(new BasicNameValuePair("password", sfdcConfiguration.getSfdcUserPassword() + sfdcConfiguration.getSfdcUserSecurityToken())); + parametersBody.add(new BasicNameValuePair("client_id", sfdcConfiguration.getSfdcConsumerKey())); + parametersBody.add(new BasicNameValuePair("client_secret", sfdcConfiguration.getSfdcConsumerSecret())); + oauthPost.setEntity(new UrlEncodedFormEntity(parametersBody, "UTF-8")); + + Map<String, String> oauthLoginResponse = (Map<String,String>) handleRequest(oauthPost, 0, false); + if (oauthLoginResponse == null) { + return false; + } + + sfdcSession = new SFDCSession( + oauthLoginResponse.get("access_token"), + oauthLoginResponse.get("instance_url"), + oauthLoginResponse.get("signature"), + oauthLoginResponse.get("id"), + oauthLoginResponse.get("token_type"), + oauthLoginResponse.get("issued_at"), + sfdcConfiguration.getSfdcSessionTimeout()); + return true; + } + + public void logout() { + sfdcSession = null; + } + + private SFDCSession getValidSession() { + if (isSessionValid()) { + return sfdcSession; + } + boolean loginSuccessful = false; + try { + loginSuccessful = login(sfdcConfiguration); + if (loginSuccessful && sfdcSession != null) { + return sfdcSession; + } + } catch (HttpException e) { + logger.error("Error logging in", e); + return null; + } catch (IOException e) { + logger.error("Error logging in", e); + return null; + } + return null; + } + + private boolean isSessionValid() { + if (sfdcSession == null) { + return false; + } + if (sfdcSession.isExpired()) { + return false; + } + return true; + } + + private Object handleRequest(HttpUriRequest request) throws IOException, HttpException { + return handleRequest(request, 1, true); + } + + private Object handleRequest(HttpUriRequest request, int retryCount, boolean addAuthorizationHeader) throws IOException, HttpException { + CloseableHttpClient client = HttpClientBuilder.create().build(); + if (addAuthorizationHeader) { + SFDCSession sfdcSession = getValidSession(); + if (sfdcSession == null) { + logger.error("Couldn't get a valid session !"); + return null; + } + if (request.containsHeader("Authorization")) { + logger.debug("Replacing existing authorization header with an updated one."); + Header[] authorizationHeaders = request.getHeaders("Authorization"); + for (Header authorizationHeader : authorizationHeaders) { + request.removeHeader(authorizationHeader); + } + } + request.addHeader("Authorization", "Bearer " + sfdcSession.getSessionId()); + } + + CloseableHttpResponse response = client.execute(request); + if (response.getStatusLine().getStatusCode() >= 400) { + if ((response.getStatusLine().getStatusCode() == 401 || response.getStatusLine().getStatusCode() == 403) && retryCount > 0) { + // probably the session has expired, let's try to login again + logger.warn("Unauthorized request, attempting to login again..."); + boolean loginSuccessful = login(sfdcConfiguration); + if (!loginSuccessful) { + logger.error("Login failed, cannot execute request {}", request); + return null; + } + logger.warn("Retrying request {} once again...", request); + return handleRequest(request, 0, true); + } else { + logger.error("Error executing request {}: {}-{}", request, response.getStatusLine().getStatusCode(), response.getStatusLine().getStatusCode()); + if (response.getEntity() != null) { + logger.error("Entity={}", EntityUtils.toString(response.getEntity())); + } + } + return null; + } + if (response.getEntity() == null) { + return null; + } + return JSON.parse(EntityUtils.toString(response.getEntity())); + } + + public boolean isConfigured() { + if (!sfdcConfiguration.isComplete()) { + logger.warn("Connection to Salesforce is not properly configured !"); + return false; + } + return true; + } + + public boolean isConnected() { + if (!isConfigured()) { + return false; + } + if (sfdcSession == null) { + logger.warn("Not connected to SalesForce, operation will not execute."); + return false; + } else { + if (sfdcSession.isExpired()) { + logger.warn("Connection to Salesforce has expired, will reconnect on next request"); + return true; + } + } + return true; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a897ab84/extensions/salesforce-connector/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml ---------------------------------------------------------------------- diff --git a/extensions/salesforce-connector/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml b/extensions/salesforce-connector/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml new file mode 100644 index 0000000..1b0b0ce --- /dev/null +++ b/extensions/salesforce-connector/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml @@ -0,0 +1,65 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<blueprint xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.1.0" + xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" + xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd"> + + <cm:property-placeholder persistent-id="org.apache.unomi.sfdc" + update-strategy="reload"> + <cm:default-properties> + <cm:property name="sfdc.login.endpoint" value="https://login.salesforce.com"/> + <cm:property name="sfdc.user.username" value=""/> + <cm:property name="sfdc.user.password" value=""/> + <cm:property name="sfdc.user.securityToken" value=""/> + <cm:property name="sfdc.consumer.key" value=""/> + <cm:property name="sfdc.consumer.secret" value=""/> + <cm:property name="sfdc.channel" value="/topic/LeadUpdates"/> + <cm:property name="sfdc.fields.mappings.identifier" value=""/> + <cm:property name="sfdc.fields.mappings" value=""/> + <cm:property name="sfdc.session.timeout" value="900000"/> + </cm:default-properties> + </cm:property-placeholder> + + <reference id="persistenceService" + interface="org.apache.unomi.persistence.spi.PersistenceService"/> + + <bean id="defaultSFDCConfiguration" class="org.apache.unomi.sfdc.services.SFDCConfiguration"> + <property name="itemId" value="sfdcConfiguration"/> + <property name="sfdcLoginEndpoint" value="${sfdc.login.endpoint}"/> + <property name="sfdcUserUsername" value="${sfdc.user.username}"/> + <property name="sfdcUserPassword" value="${sfdc.user.password}"/> + <property name="sfdcUserSecurityToken" value="${sfdc.user.securityToken}"/> + <property name="sfdcConsumerKey" value="${sfdc.consumer.key}"/> + <property name="sfdcConsumerSecret" value="${sfdc.consumer.secret}"/> + <property name="sfdcChannel" value="${sfdc.channel}"/> + <property name="sfdcFieldMappings" value="${sfdc.fields.mappings}"/> + <property name="sfdcFieldMappingsIdentifier" value="${sfdc.fields.mappings.identifier}"/> + <property name="sfdcSessionTimeout" value="${sfdc.session.timeout}" /> + </bean> + + <bean id="sfdcServiceImpl" class="org.apache.unomi.sfdc.services.internal.SFDCServiceImpl" init-method="start" + destroy-method="stop"> + <property name="defaultSFDCConfiguration" ref="defaultSFDCConfiguration" /> + <property name="persistenceService" ref="persistenceService" /> + </bean> + + <service id="sfdcService" ref="sfdcServiceImpl" auto-export="interfaces"/> + +</blueprint> http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a897ab84/extensions/salesforce-connector/services/src/main/resources/org.apache.unomi.sfdc.cfg ---------------------------------------------------------------------- diff --git a/extensions/salesforce-connector/services/src/main/resources/org.apache.unomi.sfdc.cfg b/extensions/salesforce-connector/services/src/main/resources/org.apache.unomi.sfdc.cfg new file mode 100644 index 0000000..939bddb --- /dev/null +++ b/extensions/salesforce-connector/services/src/main/resources/org.apache.unomi.sfdc.cfg @@ -0,0 +1,26 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +sfdc.login.endpoint=https://login.salesforce.com +sfdc.user.username= +sfdc.user.password= +sfdc.user.securityToken= +sfdc.consumer.key= +sfdc.consumer.secret= +sfdc.channel=/topic/LeadUpdates +sfdc.fields.mappings=email=Email,firstName=FirstName,lastName=LastName,company=Company,phoneNumber=Phone,jobTitle=Title,city=City,zipCode=PostalCode,address=Street,sfdcStatus=Status,sfdcRating=Rating +sfdc.fields.mappings.identifier=email=Email +sfdc.session.timeout=900000 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a897ab84/extensions/salesforce-connector/services/src/test/java/org/apache/unomi/sfdc/services/internal/SFDCServiceImplTest.java ---------------------------------------------------------------------- diff --git a/extensions/salesforce-connector/services/src/test/java/org/apache/unomi/sfdc/services/internal/SFDCServiceImplTest.java b/extensions/salesforce-connector/services/src/test/java/org/apache/unomi/sfdc/services/internal/SFDCServiceImplTest.java new file mode 100644 index 0000000..2a566a4 --- /dev/null +++ b/extensions/salesforce-connector/services/src/test/java/org/apache/unomi/sfdc/services/internal/SFDCServiceImplTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.unomi.sfdc.services.internal; + +import org.apache.http.HttpException; +import org.apache.unomi.api.Profile; +import org.apache.unomi.sfdc.services.SFDCConfiguration; +import org.cometd.bayeux.Message; +import org.cometd.bayeux.client.ClientSessionChannel; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; + +import static org.junit.Assert.*; + +/** + * A unit test class for testing the Salesforce Service implementation + */ +public class SFDCServiceImplTest { + + private static SFDCServiceImpl sfdcServiceImpl; + private static boolean canRunTests = false; + private static SFDCConfiguration sfdcConfiguration; + + @BeforeClass + public static void setUp() throws IOException { + sfdcServiceImpl = new SFDCServiceImpl(); + // we must now configure it. + InputStream configPropertiesStream = SFDCServiceImplTest.class.getClassLoader().getResourceAsStream("org.apache.unomi.sfdc.cfg"); + Properties properties = new Properties(); + properties.load(configPropertiesStream); + sfdcConfiguration = new SFDCConfiguration(); + sfdcConfiguration.setSfdcLoginEndpoint(properties.getProperty("sfdc.login.endpoint")); + sfdcConfiguration.setSfdcUserUsername(properties.getProperty("sfdc.user.username")); + sfdcConfiguration.setSfdcUserPassword(properties.getProperty("sfdc.user.password")); + sfdcConfiguration.setSfdcUserSecurityToken(properties.getProperty("sfdc.user.securityToken")); + sfdcConfiguration.setSfdcConsumerKey(properties.getProperty("sfdc.consumer.key")); + sfdcConfiguration.setSfdcConsumerSecret(properties.getProperty("sfdc.consumer.secret")); + sfdcConfiguration.setSfdcChannel(properties.getProperty("sfdc.channel")); + sfdcConfiguration.setSfdcFieldMappings(properties.getProperty("sfdc.fields.mappings")); + sfdcConfiguration.setSfdcFieldMappingsIdentifier(properties.getProperty("sfdc.fields.mappings.identifier")); + if (System.getProperty("sfdcProperties") != null) { + Properties testProperties = new Properties(); + InputStream testPropertiesInputStream = new FileInputStream(System.getProperty("sfdcProperties")); + testProperties.load(testPropertiesInputStream); + sfdcConfiguration.setSfdcLoginEndpoint(testProperties.getProperty("sfdc.login.endpoint")); + sfdcConfiguration.setSfdcUserUsername(testProperties.getProperty("sfdc.user.username")); + sfdcConfiguration.setSfdcUserPassword(testProperties.getProperty("sfdc.user.password")); + sfdcConfiguration.setSfdcUserSecurityToken(testProperties.getProperty("sfdc.user.securityToken")); + sfdcConfiguration.setSfdcConsumerKey(testProperties.getProperty("sfdc.consumer.key")); + sfdcConfiguration.setSfdcConsumerSecret(testProperties.getProperty("sfdc.consumer.secret")); + canRunTests = true; + sfdcServiceImpl.setDefaultSFDCConfiguration(sfdcConfiguration); + sfdcServiceImpl.start(); + } else { + System.out.println("CANNOT RUN TESTS, PLEASE PROVIDE A PROPERTIES FILE WITH SALESFORCE CREDENTIALS AND REFERENCING IT USING -DsfdcProperties=FILEPATH !!!!!!"); + } + } + + @AfterClass + public static void shutdown() { + if (canRunTests) { + sfdcServiceImpl.stop(); + sfdcServiceImpl = null; + } + } + + private boolean checkCanRunTests() { + if (!canRunTests) { + System.out.println("CANNOT RUN TESTS, PLEASE PROVIDE A PROPERTIES FILE WITH SALESFORCE CREDENTIALS AND REFERENCING IT USING -DsfdcProperties=FILEPATH !!!!!!"); + } + return canRunTests; + } + + @Test + public void testGetLeads() { + if (!checkCanRunTests()) return; + Set<String> recentLeadIds = sfdcServiceImpl.getRecentLeadIds(); + if (recentLeadIds == null || recentLeadIds.size() == 0) { + return; + } + for (String recentLeadId : recentLeadIds) { + Map<String,Object> leadFields = sfdcServiceImpl.getLead(recentLeadId); + if (leadFields.containsKey(sfdcConfiguration.getSfdcIdentifierField())) { + String leadIdentifierFieldValue = (String) leadFields.get(sfdcConfiguration.getSfdcIdentifierField()); + if (leadIdentifierFieldValue == null) { + System.out.println("Skipping lead with null identifier field value for field: " + sfdcConfiguration.getSfdcIdentifierField()); + continue; + } + Set<String> foundLeadIds = sfdcServiceImpl.findLeadIdsByIdentifierValue(leadIdentifierFieldValue); + assertTrue("Should find a single lead for identifier value " + leadIdentifierFieldValue, foundLeadIds.size() == 1); + assertEquals("Expected Id to be the same", foundLeadIds.iterator().next(), leadFields.get("Id")); + } + } + } + + @Test + public void testGetLimits() { + if (!checkCanRunTests()) return; + Map<String,Object> limits = sfdcServiceImpl.getLimits(); + assertNotNull("Limits object is null, an error occurred !", limits); + } + + @Test + public void testCreateOrUpdateAndSyncLead() { + if (!checkCanRunTests()) return; + Profile profile = new Profile(); + profile.setItemId(UUID.randomUUID().toString()); + profile.setProperty("email", "te...@jahia.com"); + profile.setProperty("firstName", "Serge"); + String leadId = sfdcServiceImpl.createOrUpdateLead(profile); + assertNull("The lead creation should fail since we are missing mandatory fields.", leadId); + profile.setProperty("lastName", "Huber"); + profile.setProperty("company", "Jahia Solutions Group"); + profile.setProperty("phoneNumber", "+41223613424"); + profile.setProperty("jobTitle", "CTO"); + leadId = sfdcServiceImpl.createOrUpdateLead(profile); + // now let's try to update it. + profile.setProperty("company", "Jahia Solutions Group SA"); + sfdcServiceImpl.createOrUpdateLead(profile); + boolean profileUpdated = sfdcServiceImpl.updateProfileFromLead(profile); + assertTrue("Profile should have been updated since we are reading status field", profileUpdated); + profile.setProperty("company", "Another value"); + profileUpdated = sfdcServiceImpl.updateProfileFromLead(profile); + assertTrue("Profile should have been updated since data is not equal", profileUpdated); + if (leadId != null) { + sfdcServiceImpl.deleteLead(leadId); + } + } + + @Test + public void testStreaming() throws Exception { + if (!checkCanRunTests()) return; + System.out.println("Running streaming client example...."); + + sfdcServiceImpl.setupPushListener(sfdcConfiguration.getSfdcChannel(), new ClientSessionChannel.MessageListener() { + @Override + public void onMessage(ClientSessionChannel clientSessionChannel, Message message) { + System.out.println("Received message for channel" + sfdcConfiguration.getSfdcChannel() + ":"+ message); + } + }); + + System.out.println("Waiting 10 seconds for streamed data from your organization ..."); + int i=0; + while (i < 10) { + Thread.sleep(1000); + i++; + } + + } + + @Test + public void testFailedLogin() throws IOException, HttpException { + if (!checkCanRunTests()) return; + InputStream configPropertiesStream = SFDCServiceImplTest.class.getClassLoader().getResourceAsStream("org.apache.unomi.sfdc.cfg"); + Properties properties = new Properties(); + properties.load(configPropertiesStream); + String loginEndpoint = properties.getProperty("sfdc.login.endpoint"); + Properties testProperties = new Properties(); + if (System.getProperty("sfdcProperties") != null) { + sfdcServiceImpl.logout(); + InputStream testPropertiesInputStream = new FileInputStream(System.getProperty("sfdcProperties")); + testProperties.load(testPropertiesInputStream); + if (testProperties.getProperty("sfdc.login.endpoint") != null) { + loginEndpoint = testProperties.getProperty("sfdc.login.endpoint"); + } + String userUserName = testProperties.getProperty("sfdc.user.username"); + String userPassword = testProperties.getProperty("sfdc.user.password"); + String userSecurityToken = testProperties.getProperty("sfdc.user.securityToken"); + String consumerKey = testProperties.getProperty("sfdc.consumer.key"); + String consumerSecret = testProperties.getProperty("sfdc.consumer.secret"); + SFDCConfiguration sfdcConfiguration = new SFDCConfiguration(); + sfdcConfiguration.setSfdcLoginEndpoint(loginEndpoint); + sfdcConfiguration.setSfdcUserUsername(userUserName); + sfdcConfiguration.setSfdcUserPassword(userPassword + "wrongpassword"); + sfdcConfiguration.setSfdcUserSecurityToken(userSecurityToken); + sfdcConfiguration.setSfdcConsumerKey(consumerKey); + sfdcConfiguration.setSfdcConsumerSecret(consumerSecret); + boolean loginSuccessful = sfdcServiceImpl.login(sfdcConfiguration); + assertNull("Session should not be valid since we used a wrong password !", sfdcServiceImpl.getSFDCSession()); + + // we login properly for other tests to execute properly. + sfdcConfiguration.setSfdcUserPassword(userPassword); + loginSuccessful = sfdcServiceImpl.login(sfdcConfiguration); + assertTrue("Login with proper credentials should have worked !", loginSuccessful); + } + } +}