Repository: incubator-unomi
Updated Branches:
  refs/heads/master 139e1b751 -> a897ab843


http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a897ab84/extensions/salesforce-connector/services/src/main/java/org/apache/unomi/sfdc/services/internal/SFDCServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/extensions/salesforce-connector/services/src/main/java/org/apache/unomi/sfdc/services/internal/SFDCServiceImpl.java
 
b/extensions/salesforce-connector/services/src/main/java/org/apache/unomi/sfdc/services/internal/SFDCServiceImpl.java
new file mode 100644
index 0000000..7ef6bea
--- /dev/null
+++ 
b/extensions/salesforce-connector/services/src/main/java/org/apache/unomi/sfdc/services/internal/SFDCServiceImpl.java
@@ -0,0 +1,815 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.unomi.sfdc.services.internal;
+
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.util.ISO8601DateFormat;
+import org.apache.http.Header;
+import org.apache.http.HttpException;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.*;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+import org.apache.unomi.api.Profile;
+import org.apache.unomi.persistence.spi.PersistenceService;
+import org.apache.unomi.sfdc.services.SFDCConfiguration;
+import org.apache.unomi.sfdc.services.SFDCService;
+import org.apache.unomi.sfdc.services.SFDCSession;
+import org.cometd.bayeux.Channel;
+import org.cometd.bayeux.Message;
+import org.cometd.bayeux.client.ClientSessionChannel;
+import org.cometd.client.BayeuxClient;
+import org.cometd.client.transport.ClientTransport;
+import org.cometd.client.transport.LongPollingTransport;
+import org.eclipse.jetty.client.ContentExchange;
+import org.eclipse.jetty.client.HttpClient;
+import org.eclipse.jetty.util.ajax.JSON;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLEncoder;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.util.*;
+
+/**
+ * Implementation of the Salesforce connector interface
+ */
+public class SFDCServiceImpl implements SFDCService {
+    private static final Logger logger = 
LoggerFactory.getLogger(SFDCServiceImpl.class.getName());
+
+    private static final String REST_ENDPOINT_URI = "/services/data/v38.0";
+    private static final String STREAMING_ENDPOINT_URI = "/cometd/38.0";
+
+    private static final int CONNECTION_TIMEOUT = 20 * 1000;  // milliseconds
+    private static final int READ_TIMEOUT = 120 * 1000; // milliseconds
+
+    private SFDCConfiguration sfdcConfiguration;
+    private SFDCConfiguration defaultSFDCConfiguration;
+
+    private Set<String> sfdcLeadMandatoryFields = new TreeSet<>();
+    private Set<String> sfdcLeadUpdateableFields = new TreeSet<>();
+
+    private SFDCSession sfdcSession;
+    private DateFormat iso8601DateFormat = new ISO8601DateFormat();
+
+    private PersistenceService persistenceService;
+
+    public void setPersistenceService(PersistenceService persistenceService) {
+        this.persistenceService = persistenceService;
+    }
+
+    public void setDefaultSFDCConfiguration(SFDCConfiguration 
defaultSFDCConfiguration) {
+        this.defaultSFDCConfiguration = defaultSFDCConfiguration;
+    }
+
+    public SFDCSession getSFDCSession() {
+        return sfdcSession;
+    }
+
+    @Override
+    public SFDCConfiguration loadConfiguration() {
+        if (persistenceService == null) {
+            return null;
+        }
+        SFDCConfiguration sfdcConfiguration = 
persistenceService.load("sfdcConfiguration", SFDCConfiguration.class);
+        return sfdcConfiguration;
+    }
+
+    @Override
+    public boolean saveConfiguration(SFDCConfiguration sfdcConfiguration) {
+        if (persistenceService == null) {
+            return false;
+        }
+        boolean result = persistenceService.save(sfdcConfiguration);
+        if (result) {
+            this.sfdcConfiguration = sfdcConfiguration;
+            try {
+                if (login(sfdcConfiguration)) {
+                    return true;
+                }
+            } catch (HttpException e) {
+                logger.warn("Error trying to login with new configuration {}", 
sfdcConfiguration, e);
+                result = false;
+            } catch (IOException e) {
+                logger.warn("Error trying to login with new configuration {}", 
sfdcConfiguration, e);
+                result = false;
+            }
+        } else {
+            logger.error("Error trying to save new Salesforce connection 
configuration !");
+        }
+        return result;
+    }
+
+    public void start() {
+        try {
+            iso8601DateFormat = new ISO8601DateFormat();
+
+            SFDCConfiguration sfdcConfiguration = loadConfiguration();
+            if (sfdcConfiguration != null) {
+                this.sfdcConfiguration = sfdcConfiguration;
+            } else {
+                this.sfdcConfiguration = defaultSFDCConfiguration;
+            }
+
+            if (this.sfdcConfiguration.isComplete()) {
+                boolean loginSuccessful = login(this.sfdcConfiguration);
+                if (!loginSuccessful) {
+                    throw new Exception("Login failed");
+                }
+                sfdcLeadMandatoryFields = getLeadMandatoryFields();
+                // setupPushTopics(SFDCSession.getEndPoint(), 
SFDCSession.getSessionId());
+                logger.info("Salesforce connector initialized successfully.");
+            } else {
+                logger.warn("Salesforce connector is not yet configured.");
+            }
+        } catch (HttpException | IOException e) {
+            logger.error("Failed to init SFDCService properly", e);
+        } catch (Exception e) {
+            logger.error("Failed to init SFDCService properly", e);
+        }
+    }
+
+    public void stop() {
+    }
+
+    public Set<String> getRecentLeadIds() {
+        if (!isConnected()) {
+            return null;
+        }
+        Set<String> recentLeadIds = new LinkedHashSet<>();
+
+        String baseUrl = sfdcSession.getEndPoint() + REST_ENDPOINT_URI + 
"/sobjects/Lead";
+        HttpGet getRecentLeads = new HttpGet(baseUrl);
+
+        try {
+            Object responseObject = handleRequest(getRecentLeads);
+            if (responseObject == null) {
+                logger.warn("Couldn't retrieve recent leads");
+                return null;
+            }
+            Map<String, Object> queryResponse = (Map<String, Object>) 
responseObject;
+            if (queryResponse.containsKey("recentItems")) {
+                logger.debug("Response received from Salesforce: {}", 
queryResponse);
+                Object[] recentItems = (Object[]) 
queryResponse.get("recentItems");
+                for (Object recentItem : recentItems) {
+                    Map<String, String> recentItemMap = (Map<String, String>) 
recentItem;
+                    recentLeadIds.add(recentItemMap.get("Id"));
+                }
+            }
+
+        } catch (IOException e) {
+            logger.error("Error getting recent leads", e);
+        } catch (HttpException e) {
+            logger.error("Error getting recent leads", e);
+        }
+
+        return recentLeadIds;
+    }
+
+    public Map<String,Object> getSObject(String sobjectName, String objectId) {
+        if (!isConnected()) {
+            return null;
+        }
+        Map<String, Object> sobjectMap = new LinkedHashMap<>();
+
+        String baseUrl = sfdcSession.getEndPoint() + REST_ENDPOINT_URI + 
"/sobjects/" + sobjectName +"/" + objectId;
+        HttpGet getSObject = new HttpGet(baseUrl);
+
+        try {
+            Object responseObject = handleRequest(getSObject);
+            if (responseObject == null) {
+                logger.warn("Couldn't retrieve sobject {} with id {}", 
sobjectName, objectId);
+                return null;
+            }
+            Map<String, Object> queryResponse = (Map<String, Object>) 
responseObject;
+            if (queryResponse != null) {
+                logger.debug("Response received from Salesforce: {}", 
queryResponse);
+                sobjectMap = new LinkedHashMap<>(queryResponse);
+            }
+
+        } catch (IOException e) {
+            logger.error("Error getting sobject {} with id {}", sobjectName, 
objectId, e);
+        } catch (HttpException e) {
+            logger.error("Error getting sobject {} with id {}", sobjectName, 
objectId, e);
+        }
+        return sobjectMap;
+    }
+
+    public Map<String,Object> getSObjectDescribe(String sobjectName) {
+        Map<String, Object> sobjectDescribe = new LinkedHashMap<>();
+        if (!isConnected()) {
+            return null;
+        }
+
+        String baseUrl = sfdcSession.getEndPoint() + REST_ENDPOINT_URI + 
"/sobjects/" + sobjectName +"/describe";
+        HttpGet getSObjectDescribe = new HttpGet(baseUrl);
+
+        try {
+            Object responseObject = handleRequest(getSObjectDescribe);
+            if (responseObject == null) {
+                logger.warn("Couldn't retrieve sobject {} describe", 
sobjectName);
+                return null;
+            }
+            Map<String, Object> queryResponse = (Map<String, Object>) 
responseObject;
+            if (queryResponse != null) {
+                logger.debug("Response received from Salesforce: {}", 
queryResponse);
+                sobjectDescribe = new LinkedHashMap<>(queryResponse);
+            }
+
+        } catch (IOException e) {
+            logger.error("Error getting sobject {}", sobjectName, e);
+        } catch (HttpException e) {
+            logger.error("Error getting sobject {}", sobjectName, e);
+        }
+        return sobjectDescribe;
+    }
+
+    public Map<String, Object> getLead(String leadId) {
+        return getSObject("Lead", leadId);
+    }
+
+    public Set<String> getLeadMandatoryFields() {
+        Set<String> mandatoryFields = new TreeSet<>();
+        if (!isConnected()) {
+            return null;
+        }
+        Map<String,Object> leadDescribe = getSObjectDescribe("Lead");
+        Object[] fields = (Object[]) leadDescribe.get("fields");
+        Set<String> updateableFields = new TreeSet<>();
+        Set<String> compoundFieldNames = new TreeSet<>();
+        for (Object field : fields) {
+            Map<String,Object> fieldDescribe = (Map<String,Object>) field;
+            String fieldName = (String) fieldDescribe.get("name");
+            String compoundFieldName = (String) 
fieldDescribe.get("compoundFieldName");
+            if (compoundFieldName != null) {
+                compoundFieldNames.add(compoundFieldName);
+            }
+            String fieldType = (String) fieldDescribe.get("type");
+            Boolean fieldUpdateable = (Boolean) 
fieldDescribe.get("updateable");
+            Boolean fieldCreateable = (Boolean) 
fieldDescribe.get("createable");
+            Boolean fieldDefaultedOnCreate = (Boolean) 
fieldDescribe.get("defaultedOnCreate");
+            Boolean fieldNillable = (Boolean) fieldDescribe.get("nillable");
+            if (fieldUpdateable) {
+                updateableFields.add(fieldName);
+            }
+            if (!fieldNillable && !fieldDefaultedOnCreate) {
+                mandatoryFields.add(fieldName);
+            }
+        }
+        mandatoryFields.removeAll(compoundFieldNames);
+        updateableFields.removeAll(compoundFieldNames);
+        sfdcLeadUpdateableFields = updateableFields;
+        return mandatoryFields;
+    }
+
+    public boolean deleteLead(String leadId) {
+        if (!isConnected()) {
+            return false;
+        }
+        String baseUrl = sfdcSession.getEndPoint() + REST_ENDPOINT_URI + 
"/sobjects/Lead/" + leadId;
+        HttpDelete deleteLead = new HttpDelete(baseUrl);
+        try {
+            Object responseObject = handleRequest(deleteLead);
+        } catch (IOException e) {
+            logger.error("Error deleting lead {}", leadId, e);
+        } catch (HttpException e) {
+            logger.error("Error deleting lead {}", leadId, e);
+        }
+        return true;
+    }
+
+    public Set<String> findLeadIdsByIdentifierValue(String 
identifierFieldValue) {
+        Set<String> results = new LinkedHashSet<String>();
+        if (!isConnected()) {
+            return results;
+        }
+        Object response = query("SELECT Id FROM Lead WHERE " + 
sfdcConfiguration.getSfdcIdentifierField() + "='" + identifierFieldValue + "'");
+        if (response == null) {
+            return results;
+        }
+        Map<String, Object> result = (Map<String, Object>) response;
+        Long totalSize = (Long) result.get("totalSize");
+        Boolean done = (Boolean) result.get("done");
+        Object[] recordObjects = (Object[]) result.get("records");
+        if (totalSize == null || totalSize < 1) {
+            return results;
+        }
+        for (Object recordObject : recordObjects) {
+            Map<String, Object> record = (Map<String, Object>) recordObject;
+            if (record.containsKey("Id")) {
+                results.add((String) record.get("Id"));
+            }
+        }
+        return results;
+    }
+
+    @Override
+    public String createOrUpdateLead(Profile profile) {
+        if (!isConnected()) {
+            return null;
+        }
+        // first we must check if an existing lead exists for the profile.
+        String unomiIdentifierValue = (String) 
profile.getProperty(sfdcConfiguration.getUnomiIdentifierField());
+        logger.info("Checking if we have a lead for identifier value {}...", 
unomiIdentifierValue);
+        Set<String> foundExistingSfdcLeadIds = 
findLeadIdsByIdentifierValue(unomiIdentifierValue);
+
+        Map<String, Object> sfdcLeadFields = new HashMap<>();
+        Map<String, Object> existingSfdcLeadFields = new HashMap<>();
+        Date sfdcLastModified = null;
+
+        if (foundExistingSfdcLeadIds.size() > 1) {
+            // we found multiple leads matching the identifier value !
+            logger.warn("Found multiple matching leads for identifier value 
{}, will use first matching one !", unomiIdentifierValue);
+        }
+
+        if (foundExistingSfdcLeadIds.size() > 0) {
+            logger.info("Found an existing lead, attempting to update it...");
+            // we found an existing lead we must update it
+            existingSfdcLeadFields = 
getLead(foundExistingSfdcLeadIds.iterator().next());
+            if (existingSfdcLeadFields.get("LastModifiedDate") != null) {
+                try {
+                    sfdcLastModified = iso8601DateFormat.parse((String) 
existingSfdcLeadFields.get("LastModifiedDate"));
+                } catch (ParseException e) {
+                    logger.error("Error parsing date {}", 
existingSfdcLeadFields.get("LastModifiedDate"), e);
+                }
+            }
+        } else {
+            logger.info("No existing lead found.");
+        }
+
+        for (String profilePropertyKey : profile.getProperties().keySet()) {
+            String sfdcFieldName = 
sfdcConfiguration.getUnomiToSfdcFieldMappings().get(profilePropertyKey);
+            if (sfdcFieldName == null) {
+                // we skip unmapped fields
+                continue;
+            }
+            Object unomiPropertyValue = 
profile.getProperties().get(profilePropertyKey);
+            if (existingSfdcLeadFields.get(sfdcFieldName) == null) {
+                // we only set the field if it didn't have a value.
+                logger.info("Setting SFDC field {} value to {}", 
sfdcFieldName, unomiPropertyValue);
+                sfdcLeadFields.put(sfdcFieldName, unomiPropertyValue);
+            } else {
+                // current strategy : Unomi field value wins if different from 
Salesforce value
+                // @todo we should probably improve this by tracking last 
modification dates on profile/lead properties
+                Object sfdcLeadFieldValue = 
existingSfdcLeadFields.get(sfdcFieldName);
+                if (!unomiPropertyValue.equals(sfdcLeadFieldValue)) {
+                    logger.info("Overwriting SFDC field {} value to {}", 
sfdcFieldName, unomiPropertyValue);
+                    sfdcLeadFields.put(sfdcFieldName, unomiPropertyValue);
+                }
+            }
+        }
+
+        if (sfdcLeadFields.size() == 0) {
+            logger.info("No SFDC field value to send, will not send anything 
to Salesforce.");
+            if (foundExistingSfdcLeadIds.size() == 0) {
+                return null;
+            } else {
+                return foundExistingSfdcLeadIds.iterator().next();
+            }
+        }
+
+        if (existingSfdcLeadFields.size() == 0) {
+            // if we are creating a lead, let's make sure we have all the 
mandatory fields before sending the request
+            boolean missingMandatoryFields = false;
+            for (String leadMandatoryFieldName : sfdcLeadMandatoryFields) {
+                if (sfdcLeadFields.get(leadMandatoryFieldName) == null) {
+                    logger.warn("Missing mandatory field {}, aborting sending 
to Salesforce", leadMandatoryFieldName);
+                    missingMandatoryFields = true;
+                }
+            }
+            if (missingMandatoryFields) {
+                return null;
+            }
+        }
+
+        String baseUrl = sfdcSession.getEndPoint() + REST_ENDPOINT_URI + 
"/sobjects/Lead";
+        HttpEntityEnclosingRequestBase request = new HttpPost(baseUrl);
+        if (foundExistingSfdcLeadIds.size() > 0) {
+            baseUrl = sfdcSession.getEndPoint() + REST_ENDPOINT_URI + 
"/sobjects/Lead/" + foundExistingSfdcLeadIds.iterator().next();
+            sfdcLeadFields.remove("Id");
+            request = new HttpPatch(baseUrl);
+        }
+
+        try {
+            ObjectMapper objectMapper = new ObjectMapper();
+            StringEntity requestEntity = new StringEntity(
+                    objectMapper.writeValueAsString(sfdcLeadFields),
+                    ContentType.APPLICATION_JSON);
+            request.setEntity(requestEntity);
+            Object responseObject = handleRequest(request);
+            if (responseObject == null) {
+                return null;
+            }
+            if (responseObject instanceof Map) {
+                Map<String, Object> responseData = (Map<String, Object>) 
responseObject;
+                if (responseData.get("id") != null) {
+                    String sfdcId = (String) responseData.get("id");
+                    logger.info("Lead successfully created/updated in 
Salesforce. sfdcId={}", sfdcId);
+                    return sfdcId;
+                }
+            }
+            logger.info("Response received from Salesforce: {}", 
responseObject);
+        } catch (IOException e) {
+            logger.error("Error creating or updating lead for profile {}", 
profile, e);
+        } catch (HttpException e) {
+            logger.error("Error creating or updating lead for profile {}", 
profile, e);
+        }
+
+        if (foundExistingSfdcLeadIds.size() == 0) {
+            return null;
+        } else {
+            return foundExistingSfdcLeadIds.iterator().next();
+        }
+    }
+
+    @Override
+    public boolean updateProfileFromLead(Profile profile) {
+        if (!isConnected()) {
+            return false;
+        }
+        String unomiIdentifierValue = (String) 
profile.getProperty(sfdcConfiguration.getUnomiIdentifierField());
+        Set<String> foundSfdcLeadIds = 
findLeadIdsByIdentifierValue(unomiIdentifierValue);
+        if (foundSfdcLeadIds.size() == 0) {
+            logger.info("No lead found in Salesforce corresponding to profile 
{}", profile);
+            // we didn't find a corresponding lead in salesforce.
+            return false;
+        } else if (foundSfdcLeadIds.size() > 1) {
+            logger.warn("Found multiple leads in Salesforce for identifier 
value {}, will use first one.", foundSfdcLeadIds);
+        } else {
+            logger.info("Found corresponding lead with identifier value {}", 
unomiIdentifierValue);
+        }
+        Map<String,Object> sfdcLead = 
getLead(foundSfdcLeadIds.iterator().next());
+        if (sfdcLead == null) {
+            logger.error("Error retrieving lead {} from Salesforce", 
foundSfdcLeadIds );
+            return false;
+        }
+        boolean profileUpdated = false;
+        for (Map.Entry<String,String> sfdcToUnomiFieldMappingEntry : 
sfdcConfiguration.getSfdcToUnomiFieldMappings().entrySet()) {
+            String sfdcFieldName = sfdcToUnomiFieldMappingEntry.getKey();
+            String unomiFieldName = sfdcToUnomiFieldMappingEntry.getValue();
+            if (sfdcLead.get(sfdcFieldName) != null) {
+                Object sfdcFieldValue = sfdcLead.get(sfdcFieldName);
+                if (sfdcFieldValue != null && 
!sfdcFieldValue.equals(profile.getProperty(unomiFieldName))) {
+                    profile.setProperty(unomiFieldName, sfdcFieldValue);
+                    profileUpdated = true;
+                }
+            }
+        }
+        logger.info("Updated profile {} from Salesforce lead {}", profile, 
sfdcLead);
+        return profileUpdated;
+    }
+
+    @Override
+    public Map<String,Object> query(String query) {
+        if (!isConnected()) {
+            return null;
+        }
+        // first we must check if an existing lead exists for the profile.
+
+        String baseUrl = null;
+        try {
+            baseUrl = sfdcSession.getEndPoint() + REST_ENDPOINT_URI + 
"/query?q=" + URLEncoder.encode(query, "UTF-8");
+            HttpGet get = new HttpGet(baseUrl);
+
+            Object responseObject = handleRequest(get);
+            if (responseObject == null) {
+                return null;
+            }
+            if (responseObject != null && responseObject instanceof Map) {
+                return (Map<String,Object>) responseObject;
+            }
+            return null;
+        } catch (UnsupportedEncodingException e) {
+            logger.error("Error executing query {}", query, e);
+            return null;
+        } catch (ClientProtocolException e) {
+            logger.error("Error executing query {}", query, e);
+            return null;
+        } catch (IOException e) {
+            logger.error("Error executing query {}", query, e);
+            return null;
+        } catch (HttpException e) {
+            logger.error("Error executing query {}", query, e);
+            return null;
+        }
+    }
+
+    @Override
+    public Map<String, Object> getLimits() {
+        if (!isConnected()) {
+            return null;
+        }
+        String baseUrl = null;
+        try {
+            baseUrl = sfdcSession.getEndPoint() + REST_ENDPOINT_URI + 
"/limits";
+            HttpGet get = new HttpGet(baseUrl);
+
+            Object responseObject = handleRequest(get);
+            if (responseObject == null) {
+                return null;
+            }
+
+            if (responseObject instanceof Map) {
+                return (Map<String,Object>) responseObject;
+            }
+            return null;
+        } catch (UnsupportedEncodingException e) {
+            logger.error("Error retrieving Salesforce API Limits", e);
+            return null;
+        } catch (ClientProtocolException e) {
+            logger.error("Error retrieving Salesforce API Limits", e);
+            return null;
+        } catch (IOException e) {
+            logger.error("Error retrieving Salesforce API Limits", e);
+            return null;
+        } catch (HttpException e) {
+            logger.error("Error retrieving Salesforce API Limits", e);
+            return null;
+        }
+    }
+
+    private BayeuxClient makeClient() throws Exception {
+        HttpClient httpClient = new HttpClient();
+        httpClient.setConnectTimeout(CONNECTION_TIMEOUT);
+        httpClient.setTimeout(READ_TIMEOUT);
+        httpClient.start();
+
+        if (sfdcSession == null) {
+            logger.error("Invalid session !");
+            return null;
+        }
+        logger.info("Login successful!\nServer URL: " + 
sfdcSession.getEndPoint()
+                + "\nSession ID=" + sfdcSession.getSessionId());
+
+        Map<String, Object> options = new HashMap<String, Object>();
+        options.put(ClientTransport.TIMEOUT_OPTION, READ_TIMEOUT);
+        LongPollingTransport transport = new LongPollingTransport(
+                options, httpClient) {
+
+            @Override
+            protected void customize(ContentExchange exchange) {
+                super.customize(exchange);
+                exchange.addRequestHeader("Authorization", "OAuth " + 
sfdcSession.getSessionId());
+            }
+        };
+
+        BayeuxClient client = new BayeuxClient(getSalesforceStreamingEndpoint(
+                sfdcSession.getEndPoint()), transport);
+        return client;
+    }
+
+    public void setupPushListener(String channelName, 
ClientSessionChannel.MessageListener messageListener) throws Exception {
+        if (!isConnected()) {
+            return;
+        }
+        final BayeuxClient client = makeClient();
+        if (client == null) {
+            throw new Exception("Login failed !");
+        }
+        client.getChannel(Channel.META_HANDSHAKE).addListener
+                (new ClientSessionChannel.MessageListener() {
+                    @Override
+                    public void onMessage(ClientSessionChannel channel, 
Message message) {
+
+                        logger.debug("[CHANNEL:META_HANDSHAKE]: " + message);
+
+                        boolean success = message.isSuccessful();
+                        if (!success) {
+                            String error = (String) message.get("error");
+                            if (error != null) {
+                                logger.error("Error during HANDSHAKE: " + 
error);
+                            }
+
+                            Exception exception = (Exception) 
message.get("exception");
+                            if (exception != null) {
+                                logger.error("Exception during HANDSHAKE: ", 
exception);
+                            }
+                        }
+                    }
+
+                });
+
+        client.getChannel(Channel.META_CONNECT).addListener(
+                new ClientSessionChannel.MessageListener() {
+                    public void onMessage(ClientSessionChannel channel, 
Message message) {
+
+                        logger.debug("[CHANNEL:META_CONNECT]: " + message);
+
+                        boolean success = message.isSuccessful();
+                        if (!success) {
+                            String error = (String) message.get("error");
+                            if (error != null) {
+                                logger.error("Error during CONNECT: " + error);
+                            }
+                        }
+                    }
+
+                });
+
+        client.getChannel(Channel.META_SUBSCRIBE).addListener(
+                new ClientSessionChannel.MessageListener() {
+
+                    public void onMessage(ClientSessionChannel channel, 
Message message) {
+
+                        logger.debug("[CHANNEL:META_SUBSCRIBE]: " + message);
+                        boolean success = message.isSuccessful();
+                        if (!success) {
+                            String error = (String) message.get("error");
+                            if (error != null) {
+                                logger.error("Error during SUBSCRIBE: " + 
error);
+                            }
+                        }
+                    }
+                });
+
+        client.handshake();
+        logger.debug("Waiting for handshake");
+
+        boolean handshaken = client.waitFor(10 * 1000, 
BayeuxClient.State.CONNECTED);
+        if (!handshaken) {
+            logger.error("Failed to handshake: " + client);
+        }
+
+        logger.info("Subscribing for channel: " + channelName);
+
+        client.getChannel(channelName).subscribe(messageListener);
+
+    }
+
+    private String getSalesforceStreamingEndpoint(String endpoint) throws 
MalformedURLException {
+        return new URL(endpoint + STREAMING_ENDPOINT_URI).toExternalForm();
+    }
+
+    private void setupPushTopics(String host, String sessionId) throws 
HttpException, IOException {
+
+        String baseUrl = host + REST_ENDPOINT_URI + "/query?q=" + 
URLEncoder.encode("SELECT Id from PushTopic WHERE name = 'LeadUpdates'", 
"UTF-8");
+        HttpGet get = new HttpGet(baseUrl);
+
+        Map<String, String> queryResponse = (Map<String, String>) 
handleRequest(get);
+
+        if (queryResponse != null && queryResponse.containsKey("count")) {
+            logger.info("Push topics setup successfully");
+        }
+    }
+
+    public boolean login(SFDCConfiguration sfdcConfiguration)
+            throws HttpException, IOException {
+        String baseUrl = sfdcConfiguration.getSfdcLoginEndpoint() + 
"/services/oauth2/token";
+        HttpPost oauthPost = new HttpPost(baseUrl);
+        List<BasicNameValuePair> parametersBody = new ArrayList<>();
+        parametersBody.add(new BasicNameValuePair("grant_type", "password"));
+        parametersBody.add(new BasicNameValuePair("username", 
sfdcConfiguration.getSfdcUserUsername()));
+        parametersBody.add(new BasicNameValuePair("password", 
sfdcConfiguration.getSfdcUserPassword() + 
sfdcConfiguration.getSfdcUserSecurityToken()));
+        parametersBody.add(new BasicNameValuePair("client_id", 
sfdcConfiguration.getSfdcConsumerKey()));
+        parametersBody.add(new BasicNameValuePair("client_secret", 
sfdcConfiguration.getSfdcConsumerSecret()));
+        oauthPost.setEntity(new UrlEncodedFormEntity(parametersBody, "UTF-8"));
+
+        Map<String, String> oauthLoginResponse = (Map<String,String>) 
handleRequest(oauthPost, 0, false);
+        if (oauthLoginResponse == null) {
+            return false;
+        }
+
+        sfdcSession = new SFDCSession(
+                oauthLoginResponse.get("access_token"),
+                oauthLoginResponse.get("instance_url"),
+                oauthLoginResponse.get("signature"),
+                oauthLoginResponse.get("id"),
+                oauthLoginResponse.get("token_type"),
+                oauthLoginResponse.get("issued_at"),
+                sfdcConfiguration.getSfdcSessionTimeout());
+        return true;
+    }
+
+    public void logout() {
+        sfdcSession = null;
+    }
+
+    private SFDCSession getValidSession() {
+        if (isSessionValid()) {
+            return sfdcSession;
+        }
+        boolean loginSuccessful = false;
+        try {
+            loginSuccessful = login(sfdcConfiguration);
+            if (loginSuccessful && sfdcSession != null) {
+                return sfdcSession;
+            }
+        } catch (HttpException e) {
+            logger.error("Error logging in", e);
+            return null;
+        } catch (IOException e) {
+            logger.error("Error logging in", e);
+            return null;
+        }
+        return null;
+    }
+
+    private boolean isSessionValid() {
+        if (sfdcSession == null) {
+            return false;
+        }
+        if (sfdcSession.isExpired()) {
+            return false;
+        }
+        return true;
+    }
+
+    private Object handleRequest(HttpUriRequest request) throws IOException, 
HttpException {
+        return handleRequest(request, 1, true);
+    }
+
+    private Object handleRequest(HttpUriRequest request, int retryCount, 
boolean addAuthorizationHeader) throws IOException, HttpException {
+        CloseableHttpClient client = HttpClientBuilder.create().build();
+        if (addAuthorizationHeader) {
+            SFDCSession sfdcSession = getValidSession();
+            if (sfdcSession == null) {
+                logger.error("Couldn't get a valid session !");
+                return null;
+            }
+            if (request.containsHeader("Authorization")) {
+                logger.debug("Replacing existing authorization header with an 
updated one.");
+                Header[] authorizationHeaders = 
request.getHeaders("Authorization");
+                for (Header authorizationHeader : authorizationHeaders) {
+                    request.removeHeader(authorizationHeader);
+                }
+            }
+            request.addHeader("Authorization", "Bearer " + 
sfdcSession.getSessionId());
+        }
+
+        CloseableHttpResponse response = client.execute(request);
+        if (response.getStatusLine().getStatusCode() >= 400) {
+            if ((response.getStatusLine().getStatusCode() == 401 || 
response.getStatusLine().getStatusCode() == 403) && retryCount > 0) {
+                // probably the session has expired, let's try to login again
+                logger.warn("Unauthorized request, attempting to login 
again...");
+                boolean loginSuccessful = login(sfdcConfiguration);
+                if (!loginSuccessful) {
+                    logger.error("Login failed, cannot execute request {}", 
request);
+                    return null;
+                }
+                logger.warn("Retrying request {} once again...", request);
+                return handleRequest(request, 0, true);
+            } else {
+                logger.error("Error executing request {}: {}-{}", request, 
response.getStatusLine().getStatusCode(), 
response.getStatusLine().getStatusCode());
+                if (response.getEntity() != null) {
+                    logger.error("Entity={}", 
EntityUtils.toString(response.getEntity()));
+                }
+            }
+            return null;
+        }
+        if (response.getEntity() == null) {
+            return null;
+        }
+        return JSON.parse(EntityUtils.toString(response.getEntity()));
+    }
+
+    public boolean isConfigured() {
+        if (!sfdcConfiguration.isComplete()) {
+            logger.warn("Connection to Salesforce is not properly configured 
!");
+            return false;
+        }
+        return true;
+    }
+
+    public boolean isConnected() {
+        if (!isConfigured()) {
+            return false;
+        }
+        if (sfdcSession == null) {
+            logger.warn("Not connected to SalesForce, operation will not 
execute.");
+            return false;
+        } else {
+            if (sfdcSession.isExpired()) {
+                logger.warn("Connection to Salesforce has expired, will 
reconnect on next request");
+                return true;
+            }
+        }
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a897ab84/extensions/salesforce-connector/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
----------------------------------------------------------------------
diff --git 
a/extensions/salesforce-connector/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
 
b/extensions/salesforce-connector/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
new file mode 100644
index 0000000..1b0b0ce
--- /dev/null
+++ 
b/extensions/salesforce-connector/services/src/main/resources/OSGI-INF/blueprint/blueprint.xml
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<blueprint xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+           
xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.1.0";
+           xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0";
+           xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 
http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd";>
+
+    <cm:property-placeholder persistent-id="org.apache.unomi.sfdc"
+                             update-strategy="reload">
+        <cm:default-properties>
+            <cm:property name="sfdc.login.endpoint" 
value="https://login.salesforce.com"/>
+            <cm:property name="sfdc.user.username" value=""/>
+            <cm:property name="sfdc.user.password" value=""/>
+            <cm:property name="sfdc.user.securityToken" value=""/>
+            <cm:property name="sfdc.consumer.key" value=""/>
+            <cm:property name="sfdc.consumer.secret" value=""/>
+            <cm:property name="sfdc.channel" value="/topic/LeadUpdates"/>
+            <cm:property name="sfdc.fields.mappings.identifier" value=""/>
+            <cm:property name="sfdc.fields.mappings" value=""/>
+            <cm:property name="sfdc.session.timeout" value="900000"/>
+        </cm:default-properties>
+    </cm:property-placeholder>
+
+    <reference id="persistenceService"
+               
interface="org.apache.unomi.persistence.spi.PersistenceService"/>
+
+    <bean id="defaultSFDCConfiguration" 
class="org.apache.unomi.sfdc.services.SFDCConfiguration">
+        <property name="itemId" value="sfdcConfiguration"/>
+        <property name="sfdcLoginEndpoint" value="${sfdc.login.endpoint}"/>
+        <property name="sfdcUserUsername" value="${sfdc.user.username}"/>
+        <property name="sfdcUserPassword" value="${sfdc.user.password}"/>
+        <property name="sfdcUserSecurityToken" 
value="${sfdc.user.securityToken}"/>
+        <property name="sfdcConsumerKey" value="${sfdc.consumer.key}"/>
+        <property name="sfdcConsumerSecret" value="${sfdc.consumer.secret}"/>
+        <property name="sfdcChannel" value="${sfdc.channel}"/>
+        <property name="sfdcFieldMappings" value="${sfdc.fields.mappings}"/>
+        <property name="sfdcFieldMappingsIdentifier" 
value="${sfdc.fields.mappings.identifier}"/>
+        <property name="sfdcSessionTimeout" value="${sfdc.session.timeout}" />
+    </bean>
+
+    <bean id="sfdcServiceImpl" 
class="org.apache.unomi.sfdc.services.internal.SFDCServiceImpl" 
init-method="start"
+          destroy-method="stop">
+        <property name="defaultSFDCConfiguration" 
ref="defaultSFDCConfiguration" />
+        <property name="persistenceService" ref="persistenceService" />
+    </bean>
+
+    <service id="sfdcService" ref="sfdcServiceImpl" auto-export="interfaces"/>
+
+</blueprint>

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a897ab84/extensions/salesforce-connector/services/src/main/resources/org.apache.unomi.sfdc.cfg
----------------------------------------------------------------------
diff --git 
a/extensions/salesforce-connector/services/src/main/resources/org.apache.unomi.sfdc.cfg
 
b/extensions/salesforce-connector/services/src/main/resources/org.apache.unomi.sfdc.cfg
new file mode 100644
index 0000000..939bddb
--- /dev/null
+++ 
b/extensions/salesforce-connector/services/src/main/resources/org.apache.unomi.sfdc.cfg
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+sfdc.login.endpoint=https://login.salesforce.com
+sfdc.user.username=
+sfdc.user.password=
+sfdc.user.securityToken=
+sfdc.consumer.key=
+sfdc.consumer.secret=
+sfdc.channel=/topic/LeadUpdates
+sfdc.fields.mappings=email=Email,firstName=FirstName,lastName=LastName,company=Company,phoneNumber=Phone,jobTitle=Title,city=City,zipCode=PostalCode,address=Street,sfdcStatus=Status,sfdcRating=Rating
+sfdc.fields.mappings.identifier=email=Email
+sfdc.session.timeout=900000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-unomi/blob/a897ab84/extensions/salesforce-connector/services/src/test/java/org/apache/unomi/sfdc/services/internal/SFDCServiceImplTest.java
----------------------------------------------------------------------
diff --git 
a/extensions/salesforce-connector/services/src/test/java/org/apache/unomi/sfdc/services/internal/SFDCServiceImplTest.java
 
b/extensions/salesforce-connector/services/src/test/java/org/apache/unomi/sfdc/services/internal/SFDCServiceImplTest.java
new file mode 100644
index 0000000..2a566a4
--- /dev/null
+++ 
b/extensions/salesforce-connector/services/src/test/java/org/apache/unomi/sfdc/services/internal/SFDCServiceImplTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.unomi.sfdc.services.internal;
+
+import org.apache.http.HttpException;
+import org.apache.unomi.api.Profile;
+import org.apache.unomi.sfdc.services.SFDCConfiguration;
+import org.cometd.bayeux.Message;
+import org.cometd.bayeux.client.ClientSessionChannel;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+
+/**
+ * A unit test class for testing the Salesforce Service implementation
+ */
+public class SFDCServiceImplTest {
+
+    private static SFDCServiceImpl sfdcServiceImpl;
+    private static boolean canRunTests = false;
+    private static SFDCConfiguration sfdcConfiguration;
+
+    @BeforeClass
+    public static void setUp() throws IOException {
+        sfdcServiceImpl = new SFDCServiceImpl();
+        // we must now configure it.
+        InputStream configPropertiesStream = 
SFDCServiceImplTest.class.getClassLoader().getResourceAsStream("org.apache.unomi.sfdc.cfg");
+        Properties properties = new Properties();
+        properties.load(configPropertiesStream);
+        sfdcConfiguration = new SFDCConfiguration();
+        
sfdcConfiguration.setSfdcLoginEndpoint(properties.getProperty("sfdc.login.endpoint"));
+        
sfdcConfiguration.setSfdcUserUsername(properties.getProperty("sfdc.user.username"));
+        
sfdcConfiguration.setSfdcUserPassword(properties.getProperty("sfdc.user.password"));
+        
sfdcConfiguration.setSfdcUserSecurityToken(properties.getProperty("sfdc.user.securityToken"));
+        
sfdcConfiguration.setSfdcConsumerKey(properties.getProperty("sfdc.consumer.key"));
+        
sfdcConfiguration.setSfdcConsumerSecret(properties.getProperty("sfdc.consumer.secret"));
+        
sfdcConfiguration.setSfdcChannel(properties.getProperty("sfdc.channel"));
+        
sfdcConfiguration.setSfdcFieldMappings(properties.getProperty("sfdc.fields.mappings"));
+        
sfdcConfiguration.setSfdcFieldMappingsIdentifier(properties.getProperty("sfdc.fields.mappings.identifier"));
+        if (System.getProperty("sfdcProperties") != null) {
+            Properties testProperties = new Properties();
+            InputStream testPropertiesInputStream = new 
FileInputStream(System.getProperty("sfdcProperties"));
+            testProperties.load(testPropertiesInputStream);
+            
sfdcConfiguration.setSfdcLoginEndpoint(testProperties.getProperty("sfdc.login.endpoint"));
+            
sfdcConfiguration.setSfdcUserUsername(testProperties.getProperty("sfdc.user.username"));
+            
sfdcConfiguration.setSfdcUserPassword(testProperties.getProperty("sfdc.user.password"));
+            
sfdcConfiguration.setSfdcUserSecurityToken(testProperties.getProperty("sfdc.user.securityToken"));
+            
sfdcConfiguration.setSfdcConsumerKey(testProperties.getProperty("sfdc.consumer.key"));
+            
sfdcConfiguration.setSfdcConsumerSecret(testProperties.getProperty("sfdc.consumer.secret"));
+            canRunTests = true;
+            sfdcServiceImpl.setDefaultSFDCConfiguration(sfdcConfiguration);
+            sfdcServiceImpl.start();
+        } else {
+            System.out.println("CANNOT RUN TESTS, PLEASE PROVIDE A PROPERTIES 
FILE WITH SALESFORCE CREDENTIALS AND REFERENCING IT USING 
-DsfdcProperties=FILEPATH !!!!!!");
+        }
+    }
+
+    @AfterClass
+    public static void shutdown() {
+        if (canRunTests) {
+            sfdcServiceImpl.stop();
+            sfdcServiceImpl = null;
+        }
+    }
+
+    private boolean checkCanRunTests() {
+        if (!canRunTests) {
+            System.out.println("CANNOT RUN TESTS, PLEASE PROVIDE A PROPERTIES 
FILE WITH SALESFORCE CREDENTIALS AND REFERENCING IT USING 
-DsfdcProperties=FILEPATH !!!!!!");
+        }
+        return canRunTests;
+    }
+
+    @Test
+    public void testGetLeads() {
+        if (!checkCanRunTests()) return;
+        Set<String> recentLeadIds = sfdcServiceImpl.getRecentLeadIds();
+        if (recentLeadIds == null || recentLeadIds.size() == 0) {
+            return;
+        }
+        for (String recentLeadId : recentLeadIds) {
+            Map<String,Object> leadFields = 
sfdcServiceImpl.getLead(recentLeadId);
+            if 
(leadFields.containsKey(sfdcConfiguration.getSfdcIdentifierField())) {
+                String leadIdentifierFieldValue = (String) 
leadFields.get(sfdcConfiguration.getSfdcIdentifierField());
+                if (leadIdentifierFieldValue == null) {
+                    System.out.println("Skipping lead with null identifier 
field value for field: " + sfdcConfiguration.getSfdcIdentifierField());
+                    continue;
+                }
+                Set<String> foundLeadIds = 
sfdcServiceImpl.findLeadIdsByIdentifierValue(leadIdentifierFieldValue);
+                assertTrue("Should find a single lead for identifier value " + 
leadIdentifierFieldValue, foundLeadIds.size() == 1);
+                assertEquals("Expected Id to be the same", 
foundLeadIds.iterator().next(), leadFields.get("Id"));
+            }
+        }
+    }
+
+    @Test
+    public void testGetLimits() {
+        if (!checkCanRunTests()) return;
+        Map<String,Object> limits = sfdcServiceImpl.getLimits();
+        assertNotNull("Limits object is null, an error occurred !", limits);
+    }
+
+    @Test
+    public void testCreateOrUpdateAndSyncLead() {
+        if (!checkCanRunTests()) return;
+        Profile profile = new Profile();
+        profile.setItemId(UUID.randomUUID().toString());
+        profile.setProperty("email", "te...@jahia.com");
+        profile.setProperty("firstName", "Serge");
+        String leadId = sfdcServiceImpl.createOrUpdateLead(profile);
+        assertNull("The lead creation should fail since we are missing 
mandatory fields.", leadId);
+        profile.setProperty("lastName", "Huber");
+        profile.setProperty("company", "Jahia Solutions Group");
+        profile.setProperty("phoneNumber", "+41223613424");
+        profile.setProperty("jobTitle", "CTO");
+        leadId = sfdcServiceImpl.createOrUpdateLead(profile);
+        // now let's try to update it.
+        profile.setProperty("company", "Jahia Solutions Group SA");
+        sfdcServiceImpl.createOrUpdateLead(profile);
+        boolean profileUpdated = 
sfdcServiceImpl.updateProfileFromLead(profile);
+        assertTrue("Profile should have been updated since we are reading 
status field", profileUpdated);
+        profile.setProperty("company", "Another value");
+        profileUpdated = sfdcServiceImpl.updateProfileFromLead(profile);
+        assertTrue("Profile should have been updated since data is not equal", 
profileUpdated);
+        if (leadId != null) {
+            sfdcServiceImpl.deleteLead(leadId);
+        }
+    }
+
+    @Test
+    public void testStreaming() throws Exception {
+        if (!checkCanRunTests()) return;
+        System.out.println("Running streaming client example....");
+
+        sfdcServiceImpl.setupPushListener(sfdcConfiguration.getSfdcChannel(), 
new ClientSessionChannel.MessageListener() {
+            @Override
+            public void onMessage(ClientSessionChannel clientSessionChannel, 
Message message) {
+                System.out.println("Received message for channel" + 
sfdcConfiguration.getSfdcChannel() + ":"+ message);
+            }
+        });
+
+        System.out.println("Waiting 10 seconds for streamed data from your 
organization ...");
+        int i=0;
+        while (i < 10) {
+            Thread.sleep(1000);
+            i++;
+        }
+
+    }
+
+    @Test
+    public void testFailedLogin() throws IOException, HttpException {
+        if (!checkCanRunTests()) return;
+        InputStream configPropertiesStream = 
SFDCServiceImplTest.class.getClassLoader().getResourceAsStream("org.apache.unomi.sfdc.cfg");
+        Properties properties = new Properties();
+        properties.load(configPropertiesStream);
+        String loginEndpoint = properties.getProperty("sfdc.login.endpoint");
+        Properties testProperties = new Properties();
+        if (System.getProperty("sfdcProperties") != null) {
+            sfdcServiceImpl.logout();
+            InputStream testPropertiesInputStream = new 
FileInputStream(System.getProperty("sfdcProperties"));
+            testProperties.load(testPropertiesInputStream);
+            if (testProperties.getProperty("sfdc.login.endpoint") != null) {
+                loginEndpoint = 
testProperties.getProperty("sfdc.login.endpoint");
+            }
+            String userUserName = 
testProperties.getProperty("sfdc.user.username");
+            String userPassword = 
testProperties.getProperty("sfdc.user.password");
+            String userSecurityToken = 
testProperties.getProperty("sfdc.user.securityToken");
+            String consumerKey = 
testProperties.getProperty("sfdc.consumer.key");
+            String consumerSecret = 
testProperties.getProperty("sfdc.consumer.secret");
+            SFDCConfiguration sfdcConfiguration = new SFDCConfiguration();
+            sfdcConfiguration.setSfdcLoginEndpoint(loginEndpoint);
+            sfdcConfiguration.setSfdcUserUsername(userUserName);
+            sfdcConfiguration.setSfdcUserPassword(userPassword + 
"wrongpassword");
+            sfdcConfiguration.setSfdcUserSecurityToken(userSecurityToken);
+            sfdcConfiguration.setSfdcConsumerKey(consumerKey);
+            sfdcConfiguration.setSfdcConsumerSecret(consumerSecret);
+            boolean loginSuccessful = sfdcServiceImpl.login(sfdcConfiguration);
+            assertNull("Session should not be valid since we used a wrong 
password !", sfdcServiceImpl.getSFDCSession());
+
+            // we login properly for other tests to execute properly.
+            sfdcConfiguration.setSfdcUserPassword(userPassword);
+            loginSuccessful = sfdcServiceImpl.login(sfdcConfiguration);
+            assertTrue("Login with proper credentials should have worked !", 
loginSuccessful);
+        }
+    }
+}

Reply via email to