http://git-wip-us.apache.org/repos/asf/usergrid/blob/c915a316/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
 
b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
deleted file mode 100644
index 6c1ffc0..0000000
--- 
a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java
+++ /dev/null
@@ -1,1352 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.usergrid.management.importer;
-
-import com.google.common.base.Preconditions;
-import org.apache.usergrid.batch.JobExecution;
-import org.apache.usergrid.batch.service.SchedulerService;
-import org.apache.usergrid.corepersistence.util.CpNamingUtils;
-import org.apache.usergrid.management.ManagementService;
-import org.apache.usergrid.persistence.*;
-import org.apache.usergrid.persistence.entities.FailedImportEntity;
-import org.apache.usergrid.persistence.entities.FileImport;
-import org.apache.usergrid.persistence.entities.Import;
-import org.apache.usergrid.persistence.entities.JobData;
-import org.apache.usergrid.persistence.exceptions.EntityNotFoundException;
-import org.apache.usergrid.persistence.Query;
-import org.apache.usergrid.persistence.Query.Level;
-import org.apache.usergrid.utils.InflectionUtils;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonParser;
-import org.codehaus.jackson.JsonToken;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import rx.Observable;
-import rx.Subscriber;
-import rx.functions.Action1;
-import rx.functions.Func2;
-import rx.schedulers.Schedulers;
-
-import javax.annotation.Nullable;
-import javax.annotation.PostConstruct;
-import java.io.File;
-import java.util.*;
-
-
-public class ImportServiceImpl implements ImportService {
-
-    public static final String IMPORT_ID = "importId";
-    public static final String IMPORT_JOB_NAME = "importJob";
-    public static final String FILE_IMPORT_ID = "fileImportId";
-    public static final String FILE_IMPORT_JOB_NAME = "fileImportJob";
-    public static final int HEARTBEAT_COUNT = 50;
-
-    public static final String APP_IMPORT_CONNECTION = "imports";
-    public static final String IMPORT_FILE_INCLUDES_CONNECTION = "files";
-
-    private static final Logger logger = 
LoggerFactory.getLogger(ImportServiceImpl.class);
-
-    int MAX_FILE_IMPORTS = 1000; // max number of file import jobs / import job
-
-    protected EntityManagerFactory emf;
-
-    private SchedulerService sch;
-
-    private ManagementService managementService;
-
-    private JsonFactory jsonFactory = new JsonFactory();
-
-
-    @PostConstruct
-    public void init() {
-    }
-
-
-    /**
-     * This schedules the main import Job.
-     *
-     * @param config configuration of the job to be scheduled
-     * @return it returns the UUID of the scheduled job
-     */
-    @Override
-    public Import schedule(final UUID application, Map<String, Object> config) 
throws Exception {
-
-        Preconditions.checkNotNull(config, "import information cannot be 
null");
-        Preconditions.checkNotNull(application, "application cannot be null");
-
-        final EntityManager rootEM;
-        try {
-            rootEM = emf.getEntityManager(emf.getManagementAppId());
-        } catch (Exception e) {
-            logger.error("application doesn't exist within the current 
context", e);
-            return null;
-        }
-
-        Import importEntity = new Import();
-        importEntity.setState(Import.State.CREATED);
-
-        // create the import entity to store all metadata about the import job
-        try {
-            importEntity = rootEM.create(importEntity);
-        } catch (Exception e) {
-            logger.error("Import entity creation failed", e);
-            return null;
-        }
-
-        // update state for import job to created
-
-        // set data to be transferred to importInfo
-        JobData jobData = new JobData();
-        jobData.setProperty("importInfo", config);
-        jobData.setProperty(IMPORT_ID, importEntity.getUuid());
-
-        long soonestPossible = System.currentTimeMillis() + 250; //sch grace 
period
-
-        // schedule import job
-        sch.createJob(IMPORT_JOB_NAME, soonestPossible, jobData);
-
-        // update state for import job to created
-        importEntity.setState(Import.State.SCHEDULED);
-        rootEM.update(importEntity);
-
-        final EntityRef appInfo = getApplicationInfoEntity(rootEM, 
application);
-
-        //now link it to the application
-        rootEM.createConnection(appInfo, APP_IMPORT_CONNECTION, importEntity);
-
-        return importEntity;
-    }
-
-
-    @Override
-    public Results getImports(final UUID applicationId, @Nullable final String 
ql, @Nullable final String cursor) {
-        Preconditions.checkNotNull(applicationId, "applicationId must be 
specified");
-
-        try {
-            final EntityManager rootEm = 
emf.getEntityManager(emf.getManagementAppId());
-            final Entity appInfo = getApplicationInfoEntity(rootEm, 
applicationId);
-
-            Query query = Query.fromQLNullSafe(ql);
-            query.setCursor(cursor);
-
-            //set our entity type
-            
query.setEntityType(Schema.getDefaultSchema().getEntityType(Import.class));
-
-            return rootEm.searchCollection(appInfo, APP_IMPORT_CONNECTION, 
query);
-        } catch (Exception e) {
-            throw new RuntimeException("Unable to get import entity", e);
-        }
-    }
-
-
-    @Override
-    public Import getImport(final UUID applicationId, final UUID importId) {
-        Preconditions.checkNotNull(applicationId, "applicationId must be 
specified");
-        Preconditions.checkNotNull(importId, "importId must be specified");
-
-        try {
-            final EntityManager rootEm = 
emf.getEntityManager(emf.getManagementAppId());
-
-            final Entity appInfo = getApplicationInfoEntity(rootEm, 
applicationId);
-            final Import importEntity = rootEm.get(importId, Import.class);
-
-            // check if it's on the path
-            if (!rootEm.isConnectionMember(appInfo, APP_IMPORT_CONNECTION, 
importEntity)) {
-                return null;
-            }
-
-            return importEntity;
-        } catch (Exception e) {
-            throw new RuntimeException("Unable to get import entity", e);
-        }
-
-    }
-
-
-    private Entity getApplicationInfoEntity(final EntityManager rootEm, final 
UUID applicationId) throws Exception {
-        final Entity entity = rootEm.get(new 
SimpleEntityRef(CpNamingUtils.APPLICATION_INFO, applicationId));
-
-        if (entity == null) {
-            throw new EntityNotFoundException("Cound not find application with 
id " + applicationId);
-        }
-
-        return entity;
-    }
-
-    @Override
-    public Results getFileImports(final UUID applicationId, final UUID 
importId,
-                                  @Nullable final String ql, @Nullable final 
String cursor) {
-
-        Preconditions.checkNotNull(applicationId, "applicationId must be 
specified");
-        Preconditions.checkNotNull(importId, "importId must be specified");
-
-        try {
-            final EntityManager rootEm = 
emf.getEntityManager(emf.getManagementAppId());
-
-
-            final Import importEntity = getImport(applicationId, importId);
-
-            Query query = Query.fromQLNullSafe(ql);
-            query.setCursor(cursor);
-            query.setConnectionType(IMPORT_FILE_INCLUDES_CONNECTION);
-            query.setResultsLevel(Level.ALL_PROPERTIES);
-
-
-            //set our entity type
-            
query.setEntityType(Schema.getDefaultSchema().getEntityType(FileImport.class));
-
-            return rootEm.searchTargetEntities(importEntity, query);
-        } catch (Exception e) {
-            throw new RuntimeException("Unable to get import entity", e);
-        }
-
-    }
-
-
-    @Override
-    public FileImport getFileImport(final UUID applicationId, final UUID 
importId, final UUID fileImportId) {
-        try {
-            final EntityManager rootEm = 
emf.getEntityManager(emf.getManagementAppId());
-
-            final Import importEntity = getImport(applicationId, importId);
-
-            if (importEntity == null) {
-                throw new EntityNotFoundException("Import not found with id " 
+ importId);
-            }
-
-            final FileImport fileImport = rootEm.get(importId, 
FileImport.class);
-
-
-            // check if it's on the path
-            if (!rootEm.isConnectionMember(importEntity, 
APP_IMPORT_CONNECTION, fileImport)) {
-                return null;
-            }
-
-            return fileImport;
-        } catch (Exception e) {
-            throw new RuntimeException("Unable to load file import", e);
-        }
-    }
-
-
-    @Override
-    public Results getFailedImportEntities(final UUID applicationId, final 
UUID importId, final UUID fileImportId,
-                                           @Nullable final String ql, 
@Nullable final String cursor) {
-
-        Preconditions.checkNotNull(applicationId, "applicationId must be 
specified");
-        Preconditions.checkNotNull(importId, "importId must be specified");
-        Preconditions.checkNotNull(fileImportId, "fileImportId must be 
specified");
-
-        try {
-            final EntityManager rootEm = 
emf.getEntityManager(emf.getManagementAppId());
-
-
-            final FileImport importEntity = getFileImport(applicationId, 
importId, fileImportId);
-
-            Query query = Query.fromQLNullSafe(ql);
-            query.setCursor(cursor);
-            query.setConnectionType(FileImportTracker.ERRORS_CONNECTION_NAME);
-            query.setResultsLevel(Level.ALL_PROPERTIES);
-
-
-            //set our entity type
-            
query.setEntityType(Schema.getDefaultSchema().getEntityType(FailedImportEntity.class));
-
-            return rootEm.searchTargetEntities(importEntity, query);
-        } catch (Exception e) {
-            throw new RuntimeException("Unable to get import entity", e);
-        }
-    }
-
-
-    @Override
-    public FailedImportEntity getFailedImportEntity(final UUID applicationId, 
final UUID importId,
-                                                    final UUID fileImportId, 
final UUID failedImportId) {
-        try {
-            final EntityManager rootEm = 
emf.getEntityManager(emf.getManagementAppId());
-
-
-            final FileImport importEntity = getFileImport(applicationId, 
importId, fileImportId);
-
-            if (importEntity == null) {
-                throw new EntityNotFoundException("Import not found with id " 
+ importId);
-            }
-
-
-            final FailedImportEntity fileImport = rootEm.get(importId, 
FailedImportEntity.class);
-
-
-            // check if it's on the path
-            if (!rootEm.isConnectionMember(importEntity, 
FileImportTracker.ERRORS_CONNECTION_NAME, fileImport)) {
-                return null;
-            }
-
-            return fileImport;
-        } catch (Exception e) {
-            throw new RuntimeException("Unable to load file import", e);
-        }
-    }
-
-
-    /**
-     * This schedules the sub  FileImport Job
-     *
-     * @param file file to be scheduled
-     * @return it returns the UUID of the scheduled job
-     * @throws Exception
-     */
-    private JobData createFileTask(Map<String, Object> config, String file, 
EntityRef importRef) throws Exception {
-
-        if (logger.isTraceEnabled()) {
-            logger.trace("scheduleFile() for import {}:{} file {}",
-                importRef.getType(), importRef.getType(), file);
-        }
-
-        EntityManager rootEM;
-
-        try {
-            rootEM = emf.getEntityManager(emf.getManagementAppId());
-        } catch (Exception e) {
-            logger.error("application doesn't exist within the current 
context");
-            return null;
-        }
-
-        // create a FileImport entity to store metadata about the fileImport 
job
-        UUID applicationId = (UUID) config.get("applicationId");
-        FileImport fileImport = new FileImport(file, applicationId);
-        fileImport = rootEM.create(fileImport);
-
-        Import importEntity = rootEM.get(importRef, Import.class);
-
-        try {
-            // create a connection between the main import job and the sub 
FileImport Job
-            rootEM.createConnection(importEntity, 
IMPORT_FILE_INCLUDES_CONNECTION, fileImport);
-
-            if (logger.isTraceEnabled()) {
-                logger.trace("Created connection from {}:{} to {}:{}",
-                        importEntity.getType(), importEntity.getUuid(),
-                        fileImport.getType(), fileImport.getUuid()
-                    );
-            }
-
-        } catch (Exception e) {
-            logger.error("Exception creating file task connection", 
e.getMessage());
-            return null;
-        }
-
-        // mark the File Import Job as created
-        fileImport.setState(FileImport.State.CREATED);
-        rootEM.update(fileImport);
-
-        // set data to be transferred to the FileImport Job
-        JobData jobData = new JobData();
-        jobData.setProperty("File", file);
-        jobData.setProperty(FILE_IMPORT_ID, fileImport.getUuid());
-        jobData.addProperties(config);
-
-        // update state of the job to Scheduled
-        fileImport.setState(FileImport.State.SCHEDULED);
-        rootEM.update(fileImport);
-
-        return jobData;
-    }
-
-
-    private int getConnectionCount(final Import importRoot) {
-
-        try {
-
-            EntityManager rootEM = 
emf.getEntityManager(emf.getManagementAppId());
-            Query query = Query.fromQL("select *");
-            query.setEntityType("file_import");
-            query.setConnectionType(IMPORT_FILE_INCLUDES_CONNECTION);
-            query.setLimit(MAX_FILE_IMPORTS);
-
-            // TODO, this won't work with more than 100 files
-            Results entities = rootEM.searchTargetEntities(importRoot, query);
-            return entities.size();
-
-            // see ImportConnectsTest()
-//            Results entities = rootEM.getTargetEntities(
-//              importRoot, "includes", null, Level.ALL_PROPERTIES );
-//            PagingResultsIterator itr = new PagingResultsIterator( entities 
);
-//            int count = 0;
-//            while ( itr.hasNext() ) {
-//                itr.next();
-//                count++;
-//            }
-//            return count;
-        } catch (Exception e) {
-            logger.error("application doesn't exist within the current 
context");
-            throw new RuntimeException(e);
-        }
-    }
-
-
-    /**
-     * Schedule the file tasks.  This must happen in 2 phases.  The first is 
linking the
-     * sub files to the master the second is scheduling them to run.
-     */
-    private JobData scheduleFileTasks(final JobData jobData) {
-
-        long soonestPossible = System.currentTimeMillis() + 250; //sch grace 
period
-
-        // schedule file import job
-        return sch.createJob(FILE_IMPORT_JOB_NAME, soonestPossible, jobData);
-    }
-
-
-    /**
-     * Query Entity Manager for the state of the Import Entity. This 
corresponds to the GET /import
-     */
-    @Override
-    public Import.State getState(UUID uuid) throws Exception {
-
-        Preconditions.checkNotNull(uuid, "uuid cannot be null");
-
-        EntityManager rootEm = emf.getEntityManager(emf.getManagementAppId());
-
-        //retrieve the import entity.
-        Import importUG = rootEm.get(uuid, Import.class);
-
-        if (importUG == null) {
-            throw new EntityNotFoundException("Could not find entity with uuid 
" + uuid);
-        }
-
-        return importUG.getState();
-    }
-
-
-    /**
-     * Query Entity Manager for the error message generated for an import job.
-     */
-    @Override
-    public String getErrorMessage(final UUID uuid) throws Exception {
-
-        //get application entity manager
-
-        if (uuid == null) {
-            logger.error("getErrorMessage(): UUID passed in cannot be null.");
-            return "UUID passed in cannot be null";
-        }
-
-        EntityManager rootEm = emf.getEntityManager(emf.getManagementAppId());
-
-        //retrieve the import entity.
-        Import importUG = rootEm.get(uuid, Import.class);
-
-        if (importUG == null) {
-            logger.error("getErrorMessage(): no entity with that uuid was 
found");
-            return "No Such Element found";
-        }
-        return importUG.getErrorMessage();
-    }
-
-
-    /**
-     * Returns the Import Entity that stores all meta-data for the particular 
import Job
-     *
-     * @param jobExecution the import job details
-     * @return Import Entity
-     */
-    @Override
-    public Import getImportEntity(final JobExecution jobExecution) throws 
Exception {
-
-        UUID importId = (UUID) 
jobExecution.getJobData().getProperty(IMPORT_ID);
-        EntityManager importManager = 
emf.getEntityManager(emf.getManagementAppId());
-
-        return importManager.get(importId, Import.class);
-    }
-
-
-    /**
-     * Returns the File Import Entity that stores all meta-data for the 
particular sub File import Job
-     *
-     * @return File Import Entity
-     */
-    @Override
-    public FileImport getFileImportEntity(final JobExecution jobExecution) 
throws Exception {
-        UUID fileImportId = (UUID) 
jobExecution.getJobData().getProperty(FILE_IMPORT_ID);
-        EntityManager em = emf.getEntityManager(emf.getManagementAppId());
-        return em.get(fileImportId, FileImport.class);
-    }
-
-
-    public EntityManagerFactory getEmf() {
-        return emf;
-    }
-
-
-    public void setEmf(final EntityManagerFactory emf) {
-        this.emf = emf;
-    }
-
-
-    public ManagementService getManagementService() {
-
-        return managementService;
-    }
-
-
-    public void setManagementService(final ManagementService 
managementService) {
-        this.managementService = managementService;
-    }
-
-
-    public void setSch(final SchedulerService sch) {
-        this.sch = sch;
-    }
-
-
-    /**
-     * This method creates sub-jobs for each file i.e. File Import Jobs.
-     *
-     * @param jobExecution the job created by the scheduler with all the 
required config data
-     */
-    @Override
-    public void doImport(JobExecution jobExecution) throws Exception {
-
-        if (logger.isTraceEnabled()) {
-            logger.trace("doImport()");
-        }
-
-        @SuppressWarnings("unchecked")
-        Map<String, Object> config = (Map<String, Object>) 
jobExecution.getJobData().getProperty("importInfo");
-        if (config == null) {
-            logger.error("doImport(): Import Information passed through is 
null");
-            return;
-        }
-
-        @SuppressWarnings("unchecked")
-        Map<String, Object> properties = (Map<String, Object>) 
config.get("properties");
-        @SuppressWarnings("unchecked")
-        Map<String, Object> storage_info = (Map<String, Object>) 
properties.get("storage_info");
-
-        String bucketName = (String) storage_info.get("bucket_location");
-        String accessId = (String) storage_info.get("s3_access_id");
-        String secretKey = (String) storage_info.get("s3_key");
-
-        // get Import Entity from the management app, update it to show that 
job has started
-
-        final EntityManager rootEM = 
emf.getEntityManager(emf.getManagementAppId());
-        UUID importId = (UUID) 
jobExecution.getJobData().getProperty(IMPORT_ID);
-        Import importEntity = rootEM.get(importId, Import.class);
-
-        importEntity.setState(Import.State.STARTED);
-        importEntity.setStarted(System.currentTimeMillis());
-        importEntity.setErrorMessage(" ");
-        rootEM.update(importEntity);
-        if (logger.isTraceEnabled()) {
-            logger.trace("doImport(): updated state");
-        }
-
-        // if no S3 importer was passed in then create one
-
-        S3Import s3Import;
-        Object s3PlaceHolder = 
jobExecution.getJobData().getProperty("s3Import");
-        try {
-            if (s3PlaceHolder != null) {
-                s3Import = (S3Import) s3PlaceHolder;
-            } else {
-                s3Import = new S3ImportImpl();
-            }
-        } catch (Exception e) {
-            logger.error("doImport(): Error creating S3Import", e);
-            importEntity.setErrorMessage(e.getMessage());
-            importEntity.setState(Import.State.FAILED);
-            rootEM.update(importEntity);
-            return;
-        }
-
-        // get list of all JSON files in S3 bucket
-
-        final List<String> bucketFiles;
-        try {
-
-            if (config.get("organizationId") == null) {
-                logger.error("doImport(): No organization could be found");
-                importEntity.setErrorMessage("No organization could be found");
-                importEntity.setState(Import.State.FAILED);
-                rootEM.update(importEntity);
-                return;
-
-            } else {
-
-                if (config.get("applicationId") == null) {
-                    throw new UnsupportedOperationException("Import 
applications not supported");
-
-                } else {
-                    bucketFiles = s3Import.getBucketFileNames(bucketName, 
".json", accessId, secretKey);
-                }
-            }
-
-        } catch (OrganizationNotFoundException | ApplicationNotFoundException 
e) {
-            importEntity.setErrorMessage(e.getMessage());
-            importEntity.setState(Import.State.FAILED);
-            rootEM.update(importEntity);
-            return;
-        }
-
-
-        // schedule a FileImport job for each file found in the bucket
-
-        if (bucketFiles.isEmpty()) {
-            importEntity.setState(Import.State.FINISHED);
-            importEntity.setErrorMessage("No files found in the bucket: " + 
bucketName);
-            rootEM.update(importEntity);
-
-        } else {
-
-            Map<String, Object> fileMetadata = new HashMap<>();
-            ArrayList<Map<String, Object>> value = new ArrayList<>();
-            final List<JobData> fileJobs = new ArrayList<>(bucketFiles.size());
-
-            // create the Entity Connection and set up metadata for each job
-
-            for (String bucketFile : bucketFiles) {
-                final JobData jobData = createFileTask(config, bucketFile, 
importEntity);
-                fileJobs.add(jobData);
-            }
-
-            int retries = 0;
-            int maxRetries = 60;
-            boolean done = false;
-            while (!done && retries++ < maxRetries) {
-
-                final int count = getConnectionCount(importEntity);
-
-                if (count == fileJobs.size()) {
-                    if (logger.isTraceEnabled()) {
-                        logger.trace("Got ALL {} of {} expected connections", 
count, fileJobs.size());
-                    }
-                    done = true;
-                } else {
-                    if (logger.isTraceEnabled()) {
-                        logger.trace("Got {} of {} expected connections. 
Waiting...", count, fileJobs.size());
-                    }
-                    Thread.sleep(1000);
-                }
-            }
-            if (retries >= maxRetries) {
-                throw new RuntimeException("Max retries was reached");
-            }
-
-            // schedule each job
-
-            for (JobData jobData : fileJobs) {
-
-                final JobData scheduled = scheduleFileTasks(jobData);
-
-                Map<String, Object> fileJobID = new HashMap<>();
-                fileJobID.put("FileName", scheduled.getProperty("File"));
-                fileJobID.put("JobID", scheduled.getUuid());
-                value.add(fileJobID);
-            }
-
-            fileMetadata.put("files", value);
-            importEntity.addProperties(fileMetadata);
-            importEntity.setFileCount(fileJobs.size());
-            rootEM.update(importEntity);
-        }
-    }
-
-
-    @Override
-    public void downloadAndImportFile(JobExecution jobExecution) {
-
-        // get values we need
-
-        @SuppressWarnings("unchecked")
-        Map<String, Object> properties = (Map<String, Object>) 
jobExecution.getJobData().getProperty("properties");
-        if (properties == null) {
-            logger.error("downloadAndImportFile(): Import Information passed 
through is null");
-            return;
-        }
-        @SuppressWarnings("unchecked")
-        Map<String, Object> storage_info = (Map<String, Object>) 
properties.get("storage_info");
-
-        String bucketName = (String) storage_info.get("bucket_location");
-        String accessId = (String) storage_info.get("s3_access_id");
-        String secretKey = (String) storage_info.get("s3_key");
-
-        EntityManager rootEM = emf.getEntityManager(emf.getManagementAppId());
-
-        // get the file import entity
-
-        FileImport fileImport;
-        try {
-            fileImport = getFileImportEntity(jobExecution);
-        } catch (Exception e) {
-            logger.error("Error updating fileImport to set state of file 
import", e);
-            return;
-        }
-
-        // tracker flushes every 100 entities
-        final FileImportTracker tracker = new FileImportTracker(emf, 
fileImport, 100);
-
-        String fileName = 
jobExecution.getJobData().getProperty("File").toString();
-        UUID targetAppId = (UUID) 
jobExecution.getJobData().getProperty("applicationId");
-
-        // is job already done?
-        if (FileImport.State.FAILED.equals(fileImport.getState())
-            || FileImport.State.FINISHED.equals(fileImport.getState())) {
-            return;
-        }
-
-        // update FileImport Entity to indicate that we have started
-
-        if (logger.isDebugEnabled()) {
-            logger.debug("downloadAndImportFile() for file {} ", fileName);
-        }
-
-        try {
-            rootEM.update(fileImport);
-            fileImport.setState(FileImport.State.STARTED);
-            rootEM.update(fileImport);
-
-            if (rootEM.get(targetAppId) == null) {
-                tracker.fatal("Application " + targetAppId + " does not 
exist");
-                return;
-            }
-
-        } catch (Exception e) {
-            tracker.fatal("Application " + targetAppId + " does not exist");
-            checkIfComplete(rootEM, fileImport);
-            return;
-        }
-
-        EntityManager targetEm = emf.getEntityManager(targetAppId);
-
-        // download file from S3, if no S3 importer was passed in then create 
one
-
-        File downloadedFile = null;
-        S3Import s3Import;
-        Object s3PlaceHolder = 
jobExecution.getJobData().getProperty("s3Import");
-        try {
-            if (s3PlaceHolder != null) {
-                s3Import = (S3Import) s3PlaceHolder;
-            } else {
-                s3Import = new S3ImportImpl();
-            }
-        } catch (Exception e) {
-            tracker.fatal("Error connecting to S3: " + e.getMessage());
-            checkIfComplete(rootEM, fileImport);
-            return;
-        }
-
-        try {
-            downloadedFile = s3Import.copyFileFromBucket(
-                fileName, bucketName, accessId, secretKey);
-        } catch (Exception e) {
-            tracker.fatal("Error downloading file: " + e.getMessage());
-            checkIfComplete(rootEM, fileImport);
-            return;
-        }
-
-        // parse JSON data, create Entities and Connections from import data
-
-        try {
-            parseEntitiesAndConnectionsFromJson(
-                jobExecution, downloadedFile, targetEm, rootEM, fileImport, 
tracker);
-
-        } catch (Exception e) {
-            tracker.fatal(e.getMessage());
-        }
-
-        checkIfComplete(rootEM, fileImport);
-    }
-
-
-    private Import getImportEntity(final EntityManager rootEm, final 
FileImport fileImport) {
-        try {
-            Results importJobResults =
-                rootEm.getSourceEntities(fileImport, 
IMPORT_FILE_INCLUDES_CONNECTION,
-                    null, Level.ALL_PROPERTIES);
-
-            List<Entity> importEntities = importJobResults.getEntities();
-            final Import importEntity = (Import) 
importEntities.get(0).toTypedEntity();
-            return importEntity;
-        } catch (Exception e) {
-            throw new RuntimeException("Unable to import entity");
-        }
-    }
-
-    /**
-     * Check if we're the last job on failure
-     */
-    private void checkIfComplete(final EntityManager rootEM, final FileImport 
fileImport) {
-        int failCount = 0;
-        int successCount = 0;
-
-        final Import importEntity = getImportEntity(rootEM, fileImport);
-
-        try {
-
-            // wait for query index to catch up
-
-            // TODO: better way to wait for indexes to catch up
-            try {
-                Thread.sleep(5000);
-            } catch (Exception intentionallyIgnored) {
-                // intentionally ignored
-            }
-
-            // get file import entities for this import job
-
-            Query query = new Query();
-            
query.setEntityType(Schema.getDefaultSchema().getEntityType(FileImport.class));
-            query.setConnectionType(IMPORT_FILE_INCLUDES_CONNECTION);
-            query.setLimit(MAX_FILE_IMPORTS);
-
-            Results entities = rootEM.searchTargetEntities(importEntity, 
query);
-            PagingResultsIterator itr = new PagingResultsIterator(entities);
-
-            if (!itr.hasNext()) {
-                logger.warn("Found no FileImport entities for import {}, 
unable to check if complete",
-                        importEntity.getUuid());
-                return;
-            }
-
-            if (logger.isTraceEnabled()) {
-                logger.trace("Checking {} file import jobs to see if we are 
done for file {}",
-                    new Object[]{entities.size(), fileImport.getFileName()});
-            }
-
-            // loop through entities, count different types of status
-
-            while (itr.hasNext()) {
-                FileImport fi = (FileImport) itr.next();
-
-                switch (fi.getState()) {
-                    case FAILED:     // failed, but we may not be complete so 
continue checking
-                        failCount++;
-                        break;
-                    case FINISHED:   // finished, we can continue checking
-                        successCount++;
-                        continue;
-                    default:         // not something we recognize as 
complete, short circuit
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("not done yet, bail out... {}", 
fi.getState().toString());
-                        }
-                        return;
-                }
-            }
-        } catch (Exception e) {
-            failCount++;
-            if (importEntity != null) {
-                importEntity.setErrorMessage("Error determining status of file 
import jobs");
-            }
-            if (logger.isDebugEnabled()) {
-                logger.debug("Error determining status of file import jobs", 
e);
-            }
-        }
-
-        if (logger.isTraceEnabled()) {
-            logger.trace("successCount = {} failCount = {}", successCount, 
failCount);
-        }
-
-        if (importEntity != null) {
-            if (logger.isTraceEnabled()) {
-                logger.trace("FINISHED");
-            }
-
-            if (failCount == 0) {
-                importEntity.setState(Import.State.FINISHED);
-            } else {
-                // we had failures, set it to failed
-                importEntity.setState(Import.State.FAILED);
-            }
-
-            try {
-                rootEM.update(importEntity);
-                if (logger.isTraceEnabled()) {
-                    logger.trace("Updated import entity {}:{} with state {}",
-                        importEntity.getType(), importEntity.getUuid(), 
importEntity.getState());
-                }
-            } catch (Exception e) {
-                logger.error("Error updating import entity", e);
-            }
-        }
-
-
-    }
-
-
-    /**
-     * Gets the JSON parser for given file
-     *
-     * @param collectionFile the file for which JSON parser is required
-     */
-    private JsonParser getJsonParserForFile(File collectionFile) throws 
Exception {
-        JsonParser jp = jsonFactory.createJsonParser(collectionFile);
-        jp.setCodec(new ObjectMapper());
-        return jp;
-    }
-
-
-    /**
-     * Imports the entity's connecting references (collections, connections 
and dictionaries)
-     *
-     * @param execution  The job jobExecution currently running
-     * @param file       The file to be imported
-     * @param em         Entity Manager for the application being imported
-     * @param rootEm     Entity manager for the root applicaition
-     * @param fileImport The file import entity
-     */
-    private void parseEntitiesAndConnectionsFromJson(
-        final JobExecution execution,
-        final File file,
-        final EntityManager em,
-        final EntityManager rootEm,
-        final FileImport fileImport,
-        final FileImportTracker tracker) throws Exception {
-
-
-        // tracker flushes every 100 entities
-        //final FileImportTracker tracker = new FileImportTracker( emf, 
fileImport, 100 );
-
-        // function to execute for each write event
-        final Action1<WriteEvent> doWork = new Action1<WriteEvent>() {
-            @Override
-            public void call(WriteEvent writeEvent) {
-                writeEvent.doWrite(em, fileImport, tracker);
-            }
-        };
-
-        // invokes the heartbeat every HEARTBEAT_COUNT operations
-        final Func2<Integer, WriteEvent, Integer> heartbeatReducer = new 
Func2<Integer, WriteEvent, Integer>() {
-            @Override
-            public Integer call(final Integer integer, final WriteEvent 
writeEvent) {
-                final int next = integer.intValue() + 1;
-                if (next % HEARTBEAT_COUNT == 0) {
-                    execution.heartbeat();
-                }
-                return next;
-            }
-        };
-
-
-        // FIRST PASS: import all entities in the file
-
-
-        // observable that parses JSON and emits write events
-        JsonParser jp = getJsonParserForFile(file);
-
-        // TODO: move JSON parser into observable creation so open/close 
happens within the stream
-        // entitiesOnly = true
-        final JsonEntityParserObservable jsonObservableEntities =
-            new JsonEntityParserObservable(jp, em, rootEm, fileImport, 
tracker, true);
-
-        final Observable<WriteEvent> entityEventObservable = 
Observable.create(jsonObservableEntities);
-
-        // only take while our stats tell us we should continue processing
-        // potentially skip the first n if this is a resume operation
-        final int entityNumSkip = (int) tracker.getTotalEntityCount();
-
-
-        entityEventObservable.takeWhile(writeEvent -> 
!tracker.shouldStopProcessingEntities()).skip(entityNumSkip)
-            .flatMap(writeEvent -> {
-                return Observable.just(writeEvent).doOnNext(doWork);
-            }, 10).reduce(0, heartbeatReducer).toBlocking().last();
-
-
-        jp.close();
-
-        if (FileImport.State.FAILED.equals(fileImport.getState())) {
-            if (logger.isTraceEnabled()) {
-                logger.trace("\n\nFailed to completely write entities, 
skipping second phase. File: {}\n",
-                    fileImport.getFileName());
-            }
-            return;
-        }
-        if (logger.isTraceEnabled()) {
-            logger.trace("\n\nWrote entities. File: {}\n", 
fileImport.getFileName());
-        }
-
-
-        // SECOND PASS: import all connections and dictionaries
-
-
-        // observable that parses JSON and emits write events
-        jp = getJsonParserForFile(file);
-
-        // TODO: move JSON parser into observable creation so open/close 
happens within the stream
-        // entitiesOnly = false
-        final JsonEntityParserObservable jsonObservableOther =
-            new JsonEntityParserObservable(jp, em, rootEm, fileImport, 
tracker, false);
-
-        final Observable<WriteEvent> otherEventObservable = 
Observable.create(jsonObservableOther);
-
-        // only take while our stats tell us we should continue processing
-        // potentially skip the first n if this is a resume operation
-        final int connectionNumSkip = (int) tracker.getTotalConnectionCount();
-
-        // with this code we get asynchronous behavior and 
testImportWithMultipleFiles will fail
-        final int connectionCount = otherEventObservable.takeWhile(
-            writeEvent -> 
!tracker.shouldStopProcessingConnections()).skip(connectionNumSkip).flatMap(entityWrapper
 -> {
-            return 
Observable.just(entityWrapper).doOnNext(doWork).subscribeOn(Schedulers.io());
-
-        }, 10).reduce(0, heartbeatReducer).toBlocking().last();
-
-        jp.close();
-
-        if (logger.isTraceEnabled()) {
-            logger.trace("\n\nparseEntitiesAndConnectionsFromJson(): Wrote 
others for file {}\n",
-                fileImport.getFileName());
-        }
-
-        if (FileImport.State.FAILED.equals(fileImport.getState())) {
-            if (logger.isDebugEnabled()) {
-                logger.debug("\n\nparseEntitiesAndConnectionsFromJson(): 
failed to completely write entities\n");
-            }
-            return;
-        }
-
-        // flush the job statistics
-        tracker.complete();
-
-        if (FileImport.State.FAILED.equals(fileImport.getState())) {
-            if (logger.isDebugEnabled()) {
-                logger.debug("\n\nFailed to completely write connections and 
dictionaries. File: {}\n",
-                    fileImport.getFileName());
-            }
-            return;
-        }
-
-        if (logger.isTraceEnabled()) {
-            logger.trace("\n\nWrote connections and dictionaries. File: {}\n", 
fileImport.getFileName());
-        }
-    }
-
-
-    private interface WriteEvent {
-        public void doWrite(EntityManager em, FileImport fileImport, 
FileImportTracker tracker);
-    }
-
-
-    private final class EntityEvent implements WriteEvent {
-        UUID entityUuid;
-        String entityType;
-        Map<String, Object> properties;
-
-        EntityEvent(UUID entityUuid, String entityType, Map<String, Object> 
properties) {
-            this.entityUuid = entityUuid;
-            this.entityType = entityType;
-            this.properties = properties;
-        }
-
-
-        // Creates entities
-        @Override
-        public void doWrite(EntityManager em, FileImport fileImport, 
FileImportTracker tracker) {
-            try {
-                if (logger.isTraceEnabled()) {
-                    logger.trace("Writing imported entity {}:{} into app {}",
-                        entityType, entityUuid, em.getApplication().getUuid());
-                }
-
-                em.create(entityUuid, entityType, properties);
-
-                tracker.entityWritten();
-
-            } catch (Exception e) {
-                logger.error("Error writing entity. From file:{}", 
fileImport.getFileName(), e);
-
-                tracker.entityFailed(e.getMessage() + " From file: " + 
fileImport.getFileName());
-            }
-        }
-    }
-
-
-    private final class ConnectionEvent implements WriteEvent {
-        EntityRef ownerEntityRef;
-        String connectionType;
-        EntityRef entityRef;
-
-        ConnectionEvent(EntityRef ownerEntityRef, String connectionType, 
EntityRef entryRef) {
-            this.ownerEntityRef = ownerEntityRef;
-            this.connectionType = connectionType;
-            this.entityRef = entryRef;
-        }
-
-        // creates connections between entities
-        @Override
-        public void doWrite(EntityManager em, FileImport fileImport, 
FileImportTracker tracker) {
-
-            try {
-                // TODO: do we need to ensure that all Entity events happen 
first?
-                // TODO: what happens if ConnectionEvents  happen before all 
entities are saved?
-
-                // Connections are specified as UUIDs with no type
-                if (entityRef.getType() == null) {
-                    entityRef = em.get(ownerEntityRef.getUuid());
-                }
-
-                if (logger.isTraceEnabled()) {
-                    logger.trace("Creating connection from {}:{} to {}:{}",
-                            ownerEntityRef.getType(), ownerEntityRef.getUuid(),
-                            entityRef.getType(), entityRef.getUuid());
-                }
-
-                em.createConnection(ownerEntityRef, connectionType, entityRef);
-
-                tracker.connectionWritten();
-
-            } catch (Exception e) {
-                logger.error("Error writing connection. From file: {}", 
fileImport.getFileName(), e);
-
-                tracker.connectionFailed(e.getMessage() + " From file: " + 
fileImport.getFileName());
-            }
-        }
-    }
-
-
-    private final class DictionaryEvent implements WriteEvent {
-
-        EntityRef ownerEntityRef;
-        String dictionaryName;
-        Map<String, Object> dictionary;
-
-        DictionaryEvent(EntityRef ownerEntityRef, String dictionaryName, 
Map<String, Object> dictionary) {
-            this.ownerEntityRef = ownerEntityRef;
-            this.dictionaryName = dictionaryName;
-            this.dictionary = dictionary;
-        }
-
-        // adds map to the dictionary
-        @Override
-        public void doWrite(EntityManager em, FileImport fileImport, 
FileImportTracker stats) {
-            try {
-
-                if (logger.isTraceEnabled()) {
-                    logger.trace("Adding map to {}:{} dictionary {}",
-                        ownerEntityRef.getType(), ownerEntityRef.getType(), 
dictionaryName);
-                }
-
-                em.addMapToDictionary(ownerEntityRef, dictionaryName, 
dictionary);
-
-            } catch (Exception e) {
-                logger.error("Error writing dictionary. From file: " + 
fileImport.getFileName(), e);
-
-                // TODO add statistics for dictionary writes and failures
-            }
-        }
-    }
-
-
-    private final class JsonEntityParserObservable implements 
Observable.OnSubscribe<WriteEvent> {
-        public static final String COLLECTION_OBJECT_NAME = "collections";
-        private final JsonParser jp;
-        EntityManager em;
-        EntityManager rootEm;
-        FileImport fileImport;
-        FileImportTracker tracker;
-        boolean entitiesOnly;
-
-
-        JsonEntityParserObservable(
-            JsonParser parser,
-            EntityManager em,
-            EntityManager rootEm,
-            FileImport fileImport,
-            FileImportTracker tracker,
-            boolean entitiesOnly) {
-
-            this.jp = parser;
-            this.em = em;
-            this.rootEm = rootEm;
-            this.fileImport = fileImport;
-            this.tracker = tracker;
-            this.entitiesOnly = entitiesOnly;
-        }
-
-
-        @Override
-        public void call(final Subscriber<? super WriteEvent> subscriber) {
-            process(subscriber);
-        }
-
-
-        private void process(final Subscriber<? super WriteEvent> subscriber) {
-
-            try {
-
-                // we ignore imported entity type information, entities get 
the type of the collection
-                Stack<JsonToken> objectStartStack = new Stack();
-                Stack<String> objectNameStack = new Stack();
-                EntityRef lastEntity = null;
-
-                String entityType = null;
-
-                while (true) {
-
-                    JsonToken token = jp.nextToken();
-
-                    // nothing left to do.
-                    if (token == null) {
-                        break;
-                    }
-
-                    String name = jp.getCurrentName();
-
-
-                    // start of an object with a field name
-
-                    if (token.equals(JsonToken.START_OBJECT)) {
-
-                        objectStartStack.push(token);
-
-                        // nothing to do
-                        if (name == null) {
-                            continue;
-                        }
-
-
-                        if ("Metadata".equals(name)) {
-
-                            Map<String, Object> entityMap = 
jp.readValueAs(HashMap.class);
-
-                            UUID uuid = null;
-                            if (entityMap.get("uuid") != null) {
-                                uuid = UUID.fromString((String) 
entityMap.get("uuid"));
-                                lastEntity = new SimpleEntityRef(entityType, 
uuid);
-                            }
-
-                            if (entitiesOnly) {
-                                //logger.debug("{}Got entity with uuid {}", 
indent, lastEntity);
-
-                                WriteEvent event = new EntityEvent(uuid, 
entityType, entityMap);
-                                processWriteEvent(subscriber, event);
-                            }
-                            objectStartStack.pop();
-                        } else if ("connections".equals(name)) {
-
-                            Map<String, Object> connectionMap = 
jp.readValueAs(HashMap.class);
-
-                            for (String type : connectionMap.keySet()) {
-                                List targets = (List) connectionMap.get(type);
-
-                                for (Object targetObject : targets) {
-                                    UUID target = UUID.fromString((String) 
targetObject);
-
-                                    if (!entitiesOnly) {
-                                        //logger.debug("{}Got connection {} to 
{}",
-                                        //new Object[]{indent, type, 
target.toString()});
-
-                                        EntityRef entryRef = new 
SimpleEntityRef(target);
-                                        WriteEvent event = new 
ConnectionEvent(lastEntity, type, entryRef);
-                                        processWriteEvent(subscriber, event);
-                                    }
-                                }
-                            }
-
-                            objectStartStack.pop();
-
-                        } else if ("dictionaries".equals(name)) {
-
-                            Map<String, Object> dictionariesMap = 
jp.readValueAs(HashMap.class);
-                            for (String dname : dictionariesMap.keySet()) {
-                                Map dmap = (Map) dictionariesMap.get(dname);
-
-                                if (!entitiesOnly) {
-                                    //logger.debug("{}Got dictionary {} size 
{}",
-                                    //new Object[] {indent, dname, dmap.size() 
});
-
-                                    WriteEvent event = new 
DictionaryEvent(lastEntity, dname, dmap);
-                                    processWriteEvent(subscriber, event);
-                                }
-                            }
-
-                            objectStartStack.pop();
-
-                        } else {
-                            // push onto object names we don't immediately 
understand.  Used for parent detection
-                            objectNameStack.push(name);
-                        }
-
-                    } else if (token.equals(JsonToken.START_ARRAY)) {
-                        if (objectNameStack.size() == 1
-                            && 
COLLECTION_OBJECT_NAME.equals(objectNameStack.peek())) {
-                            entityType = InflectionUtils.singularize(name);
-                        }
-
-                    } else if (token.equals(JsonToken.END_OBJECT)) {
-                        objectStartStack.pop();
-                    }
-                }
-
-                if (subscriber != null) {
-                    subscriber.onCompleted();
-                }
-
-                if (logger.isTraceEnabled()) {
-                    logger.trace("process(): done parsing JSON");
-                }
-
-            } catch (Exception e) {
-
-                tracker.fatal(e.getMessage());
-
-                if (subscriber != null) {
-
-                    // don't need to blow up here, we handled the problem
-                    // if we blow up we may prevent in-flight entities from 
being written
-                    // subscriber.onError(e);
-
-                    // but we are done reading entities
-                    subscriber.onCompleted();
-                }
-            }
-        }
-
-        private void processWriteEvent(final Subscriber<? super WriteEvent> 
subscriber, WriteEvent writeEvent) {
-
-            if (subscriber == null) {
-
-                // this logic makes it easy to remove Rx for debugging purposes
-                // no Rx, just do it
-                writeEvent.doWrite(em, fileImport, tracker);
-
-            } else {
-                subscriber.onNext(writeEvent);
-            }
-        }
-
-    }
-}
-
-
-/**
- * Custom Exception class for Organization Not Found
- */
-class OrganizationNotFoundException extends Exception {
-    OrganizationNotFoundException(String s) {
-        super(s);
-    }
-}
-
-
-/**
- * Custom Exception class for Application Not Found
- */
-class ApplicationNotFoundException extends Exception {
-    ApplicationNotFoundException(String s) {
-        super(s);
-    }
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c915a316/stack/services/src/main/java/org/apache/usergrid/management/importer/S3Import.java
----------------------------------------------------------------------
diff --git 
a/stack/services/src/main/java/org/apache/usergrid/management/importer/S3Import.java
 
b/stack/services/src/main/java/org/apache/usergrid/management/importer/S3Import.java
deleted file mode 100644
index 7e09051..0000000
--- 
a/stack/services/src/main/java/org/apache/usergrid/management/importer/S3Import.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.usergrid.management.importer;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-
-/**
- * Plug-able S3Import interface.
- */
-public interface S3Import {
-
-    List<String> getBucketFileNames(
-        String bucketName, String endsWith, String accessId, String secretKey 
) throws Exception;
-
-    public File copyFileFromBucket(
-        String blobFileName, String bucketName, String accessId, String 
secretKey ) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c915a316/stack/services/src/main/java/org/apache/usergrid/management/importer/S3ImportImpl.java
----------------------------------------------------------------------
diff --git 
a/stack/services/src/main/java/org/apache/usergrid/management/importer/S3ImportImpl.java
 
b/stack/services/src/main/java/org/apache/usergrid/management/importer/S3ImportImpl.java
deleted file mode 100644
index 921de64..0000000
--- 
a/stack/services/src/main/java/org/apache/usergrid/management/importer/S3ImportImpl.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.usergrid.management.importer;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.inject.Module;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.RandomStringUtils;
-import org.jclouds.ContextBuilder;
-import org.jclouds.blobstore.BlobStore;
-import org.jclouds.blobstore.BlobStoreContext;
-import org.jclouds.blobstore.domain.Blob;
-import org.jclouds.blobstore.domain.MutableBlobMetadata;
-import org.jclouds.blobstore.domain.PageSet;
-import org.jclouds.blobstore.domain.StorageMetadata;
-import org.jclouds.blobstore.options.ListContainerOptions;
-import org.jclouds.http.config.JavaUrlHttpCommandExecutorServiceModule;
-import org.jclouds.logging.log4j.config.Log4JLoggingModule;
-import org.jclouds.netty.config.NettyPayloadModule;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-import java.nio.file.Files;
-import java.util.*;
-
-
-public class S3ImportImpl implements S3Import {
-    private static final Logger logger = 
LoggerFactory.getLogger(S3ImportImpl.class);
-
-
-    public File copyFileFromBucket(
-        String blobFileName, String bucketName, String accessId, String 
secretKey ) throws Exception {
-
-        // setup to use JCloud BlobStore interface to AWS S3
-
-        Properties overrides = new Properties();
-        overrides.setProperty("s3" + ".identity", accessId);
-        overrides.setProperty("s3" + ".credential", secretKey);
-
-        final Iterable<? extends Module> MODULES = ImmutableSet.of(
-            new JavaUrlHttpCommandExecutorServiceModule(),
-            new Log4JLoggingModule(),
-            new NettyPayloadModule());
-
-        BlobStoreContext context = ContextBuilder.newBuilder("s3")
-            .credentials(accessId, secretKey)
-            .modules(MODULES)
-            .overrides(overrides)
-            .buildView(BlobStoreContext.class);
-        BlobStore blobStore = context.getBlobStore();
-
-        // get file from configured bucket, copy it to local temp file
-
-        Blob blob = blobStore.getBlob(bucketName, blobFileName);
-        if ( blob == null) {
-            throw new RuntimeException(
-                "Blob file name " + blobFileName + " not found in bucket " + 
bucketName );
-        }
-
-        FileOutputStream fop = null;
-        File tempFile;
-        try {
-            tempFile = File.createTempFile(bucketName, 
RandomStringUtils.randomAlphabetic(10));
-            tempFile.deleteOnExit();
-            fop = new FileOutputStream(tempFile);
-            InputStream is = blob.getPayload().openStream();
-            IOUtils.copyLarge(is, fop);
-            return tempFile;
-
-        } finally {
-            if ( fop != null ) {
-                fop.close();
-            }
-        }
-    }
-
-
-    @Override
-    public List<String> getBucketFileNames(
-        String bucketName, String endsWith, String accessId, String secretKey 
) {
-
-        // get setup to use JCloud BlobStore interface to AWS S3
-
-        Properties overrides = new Properties();
-        overrides.setProperty("s3" + ".identity", accessId);
-        overrides.setProperty("s3" + ".credential", secretKey);
-
-        final Iterable<? extends Module> MODULES = ImmutableSet.of(
-            new JavaUrlHttpCommandExecutorServiceModule(),
-            new Log4JLoggingModule(),
-            new NettyPayloadModule());
-
-        BlobStoreContext context = ContextBuilder.newBuilder("s3")
-            .credentials(accessId, secretKey)
-            .modules(MODULES)
-            .overrides(overrides)
-            .buildView(BlobStoreContext.class);
-        BlobStore blobStore = context.getBlobStore();
-
-        // gets all the files in the configured bucket recursively
-
-        PageSet<? extends StorageMetadata> pageSets =
-            blobStore.list(bucketName, new ListContainerOptions().recursive());
-
-        if (logger.isTraceEnabled()) {
-            logger.trace("   Found {} files in bucket {}", pageSets.size(), 
bucketName);
-        }
-
-        List<String> blobFileNames = new ArrayList<>();
-        for ( Object pageSet : pageSets ) {
-            String blobFileName = ((MutableBlobMetadata)pageSet).getName();
-            if ( blobFileName.endsWith( endsWith )) {
-                blobFileNames.add(blobFileName);
-            }
-        }
-
-        return blobFileNames;
-    }
-
-
-}
-

http://git-wip-us.apache.org/repos/asf/usergrid/blob/c915a316/stack/services/src/main/java/org/apache/usergrid/services/assets/data/S3BinaryStore.java
----------------------------------------------------------------------
diff --git 
a/stack/services/src/main/java/org/apache/usergrid/services/assets/data/S3BinaryStore.java
 
b/stack/services/src/main/java/org/apache/usergrid/services/assets/data/S3BinaryStore.java
deleted file mode 100644
index 2929d93..0000000
--- 
a/stack/services/src/main/java/org/apache/usergrid/services/assets/data/S3BinaryStore.java
+++ /dev/null
@@ -1,346 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.usergrid.services.assets.data;
-
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.hash.Hashing;
-import com.google.common.io.Files;
-import com.google.inject.Module;
-import org.apache.commons.codec.binary.Hex;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.usergrid.persistence.Entity;
-import org.apache.usergrid.persistence.EntityManager;
-import org.apache.usergrid.persistence.EntityManagerFactory;
-import org.apache.usergrid.utils.StringUtils;
-import org.jclouds.ContextBuilder;
-import org.jclouds.blobstore.BlobStore;
-import org.jclouds.blobstore.BlobStoreContext;
-import org.jclouds.blobstore.domain.Blob;
-import org.jclouds.blobstore.domain.BlobBuilder;
-import org.jclouds.blobstore.options.GetOptions;
-import org.jclouds.http.config.JavaUrlHttpCommandExecutorServiceModule;
-import org.jclouds.logging.log4j.config.Log4JLoggingModule;
-import org.jclouds.netty.config.NettyPayloadModule;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.io.*;
-import java.util.Map;
-import java.util.Properties;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-public class S3BinaryStore implements BinaryStore {
-
-    private static final Iterable<? extends Module> MODULES = ImmutableSet
-            .of( new JavaUrlHttpCommandExecutorServiceModule(), new 
Log4JLoggingModule(), new NettyPayloadModule() );
-
-    private static final Logger logger = LoggerFactory.getLogger( 
S3BinaryStore.class );
-    private static final long FIVE_MB = ( FileUtils.ONE_MB * 5 );
-    private static String WORKERS_PROP_NAME = "usergrid.binary.upload-workers";
-
-    private BlobStoreContext context;
-    private String accessId;
-    private String secretKey;
-    private String bucketName;
-    private ExecutorService executorService;
-
-    @Autowired
-    private Properties properties;
-
-    @Autowired
-    private EntityManagerFactory emf;
-
-
-    public S3BinaryStore( String accessId, String secretKey, String bucketName 
) {
-        this.accessId = accessId;
-        this.secretKey = secretKey;
-        this.bucketName = bucketName;
-    }
-
-
-    private BlobStoreContext getContext() {
-        if ( context == null ) {
-            context = ContextBuilder.newBuilder( "aws-s3" ).credentials( 
accessId, secretKey ).modules( MODULES )
-                                    .buildView( BlobStoreContext.class );
-
-            BlobStore blobStore = context.getBlobStore();
-            blobStore.createContainerInLocation( null, bucketName );
-        }
-
-        return context;
-    }
-
-
-    public void destroy() {
-        if ( context != null ) {
-            context.close();
-        }
-    }
-
-
-    @Override
-    public void write( final UUID appId, final Entity entity, InputStream 
inputStream ) throws IOException {
-
-        // write up to 5mb of data to an byte array
-
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        long written = IOUtils.copyLarge( inputStream, baos, 0, FIVE_MB );
-        byte[] data = baos.toByteArray();
-
-        if ( written < FIVE_MB ) { // total smaller than 5mb
-
-            final String uploadFileName = AssetUtils.buildAssetKey( appId, 
entity );
-            final String mimeType = AssetMimeHandler.get().getMimeType( 
entity, data );
-
-            final Map<String, Object> fileMetadata = 
AssetUtils.getFileMetadata( entity );
-            fileMetadata.put( AssetUtils.LAST_MODIFIED, 
System.currentTimeMillis() );
-
-            BlobStore blobStore = getContext().getBlobStore();
-
-            // need this for JClouds 1.7.x:
-//            BlobBuilder.PayloadBlobBuilder bb =  blobStore.blobBuilder( 
uploadFileName )
-//                .payload( data ).calculateMD5().contentType( mimeType );
-
-            // need this for JClouds 1.8.x:
-            BlobBuilder.PayloadBlobBuilder bb = 
blobStore.blobBuilder(uploadFileName)
-                .payload( data )
-                .contentMD5( Hashing.md5().newHasher().putBytes( data ).hash() 
)
-                .contentType( mimeType );
-
-            fileMetadata.put( AssetUtils.CONTENT_LENGTH, written );
-            if ( fileMetadata.get( AssetUtils.CONTENT_DISPOSITION ) != null ) {
-                bb.contentDisposition( fileMetadata.get( 
AssetUtils.CONTENT_DISPOSITION ).toString() );
-            }
-            final Blob blob = bb.build();
-
-            String md5sum = Hex.encodeHexString( 
blob.getMetadata().getContentMetadata().getContentMD5() );
-            fileMetadata.put( AssetUtils.CHECKSUM, md5sum );
-
-            String eTag = blobStore.putBlob( bucketName, blob );
-            fileMetadata.put( AssetUtils.E_TAG, eTag );
-        }
-        else { // bigger than 5mb... dump 5 mb tmp files and upload from them
-
-            ExecutorService executors = getExecutorService();
-
-            executors.submit( new UploadWorker( appId, entity, inputStream, 
data, written ) );
-        }
-    }
-
-    private ExecutorService getExecutorService() {
-
-        if ( executorService == null ) {
-            synchronized (this) {
-
-                int workers = 40;
-                String workersString = properties.getProperty( 
WORKERS_PROP_NAME, "40");
-
-                if ( StringUtils.isNumeric( workersString ) ) {
-                    workers = Integer.parseInt( workersString );
-                } else if ( !StringUtils.isEmpty( workersString )) {
-                    logger.error("Ignoring invalid setting for {}", 
WORKERS_PROP_NAME);
-                }
-                executorService = Executors.newFixedThreadPool( workers );
-            }
-        }
-
-        return executorService;
-    }
-
-
-    @Override
-    public InputStream read( UUID appId, Entity entity, long offset, long 
length ) throws IOException {
-        BlobStore blobStore = getContext().getBlobStore();
-        Blob blob;
-        if ( offset == 0 && length == FIVE_MB ) {
-            blob = blobStore.getBlob( bucketName, AssetUtils.buildAssetKey( 
appId, entity ) );
-        }
-        else {
-            GetOptions options = GetOptions.Builder.range( offset, length );
-            blob = blobStore.getBlob( bucketName, AssetUtils.buildAssetKey( 
appId, entity ), options );
-        }
-        if ( blob == null || blob.getPayload() == null ) {
-            return null;
-        }
-        return blob.getPayload().getInput();
-    }
-
-
-    @Override
-    public InputStream read( UUID appId, Entity entity ) throws IOException {
-        return read( appId, entity, 0, FIVE_MB );
-    }
-
-
-    @Override
-    public void delete( UUID appId, Entity entity ) {
-        BlobStore blobStore = getContext().getBlobStore();
-        blobStore.removeBlob( bucketName, AssetUtils.buildAssetKey( appId, 
entity ) );
-    }
-
-    class UploadWorker implements Callable<Void> {
-
-        private UUID appId;
-        private Entity entity;
-        private InputStream inputStream;
-        private byte[] data;
-        private long written;
-
-
-        public UploadWorker( UUID appId, Entity entity, InputStream is, byte[] 
data, long written ) {
-            this.appId = appId;
-            this.entity = entity;
-            this.inputStream = is;
-            this.data = data;
-            this.written = written;
-        }
-
-        @Override
-        public Void call() {
-
-            if (logger.isTraceEnabled()) {
-                logger.trace("Writing temp file for S3 upload");
-            }
-
-            // determine max size file allowed, default to 50mb
-            long maxSizeBytes = 50 * FileUtils.ONE_MB;
-            String maxSizeMbString = properties.getProperty( 
"usergrid.binary.max-size-mb", "50" );
-            if (StringUtils.isNumeric( maxSizeMbString )) {
-                maxSizeBytes = Long.parseLong( maxSizeMbString ) * 
FileUtils.ONE_MB;
-            }
-
-            // always allow files up to 5mb
-            if (maxSizeBytes < 5 * FileUtils.ONE_MB ) {
-                maxSizeBytes = 5 * FileUtils.ONE_MB;
-            }
-
-            // write temporary file, slightly larger than our size limit
-            OutputStream os = null;
-            File tempFile;
-            try {
-                tempFile = File.createTempFile( entity.getUuid().toString(), 
"tmp" );
-                tempFile.deleteOnExit();
-                os = new BufferedOutputStream( new FileOutputStream( 
tempFile.getAbsolutePath() ) );
-                os.write( data );
-                written += data.length;
-                written += IOUtils.copyLarge( inputStream, os, 0, maxSizeBytes 
+ 1 );
-
-                if (logger.isTraceEnabled()) {
-                    logger.trace("Write temp file {} length {}", 
tempFile.getName(), written);
-                }
-
-            } catch ( IOException e ) {
-                throw new RuntimeException( "Error creating temp file", e );
-
-            } finally {
-                if ( os != null ) {
-                    try {
-                        os.flush();
-                    } catch (IOException e) {
-                        logger.error( "Error flushing data to temporary upload 
file", e );
-                    }
-                    IOUtils.closeQuietly( os );
-                }
-            }
-
-            // if tempFile is too large, delete it, add error to entity file 
metadata and abort
-
-            Map<String, Object> fileMetadata = AssetUtils.getFileMetadata( 
entity );
-
-            if ( tempFile.length() > maxSizeBytes ) {
-                if (logger.isDebugEnabled()) {
-                    logger.debug("File too large. Temp file size (bytes) = {}, 
Max file size (bytes) = {}",
-                            tempFile.length(), maxSizeBytes);
-                }
-                try {
-                    EntityManager em = emf.getEntityManager( appId );
-                    fileMetadata.put( "error", "Asset size " + 
tempFile.length()
-                                    + " is larger than max size of " + 
maxSizeBytes );
-                    em.update( entity );
-                    tempFile.delete();
-
-                } catch ( Exception e ) {
-                    logger.error( "Error updating entity with error message", 
e);
-                }
-                return null;
-            }
-
-            String uploadFileName = AssetUtils.buildAssetKey( appId, entity );
-            String mimeType = AssetMimeHandler.get().getMimeType( entity, data 
);
-
-            try {  // start the upload
-
-                if (logger.isTraceEnabled()) {
-                    logger.trace("S3 upload thread started");
-                }
-
-                BlobStore blobStore = getContext().getBlobStore();
-
-                // need this for JClouds 1.7.x:
-//                BlobBuilder.PayloadBlobBuilder bb =  blobStore.blobBuilder( 
uploadFileName )
-//                    .payload( tempFile ).calculateMD5().contentType( 
mimeType );
-
-                // need this for JClouds 1.8.x:
-                BlobBuilder.PayloadBlobBuilder bb = blobStore.blobBuilder( 
uploadFileName )
-                    .payload( tempFile )
-                    .contentMD5( Files.hash( tempFile, Hashing.md5() ) )
-                    .contentType( mimeType );
-
-                if ( fileMetadata.get( AssetUtils.CONTENT_DISPOSITION ) != 
null ) {
-                    bb.contentDisposition( fileMetadata.get( 
AssetUtils.CONTENT_DISPOSITION ).toString() );
-                }
-                final Blob blob = bb.build();
-
-                String md5sum = Hex.encodeHexString( 
blob.getMetadata().getContentMetadata().getContentMD5() );
-                fileMetadata.put( AssetUtils.CHECKSUM, md5sum );
-
-                if (logger.isTraceEnabled()) {
-                    logger.trace("S3 upload starting");
-                }
-
-                String eTag = blobStore.putBlob( bucketName, blob );
-
-                if (logger.isTraceEnabled()) {
-                    logger.trace("S3 upload complete eTag=" + eTag);
-                }
-
-                // update entity with eTag
-                EntityManager em = emf.getEntityManager( appId );
-                fileMetadata.put( AssetUtils.LAST_MODIFIED, 
System.currentTimeMillis() );
-                fileMetadata.put( AssetUtils.CONTENT_LENGTH, written );
-                fileMetadata.put( AssetUtils.E_TAG, eTag );
-                em.update( entity );
-            }
-            catch ( Exception e ) {
-                logger.error( "error uploading", e );
-            }
-
-            if ( tempFile != null && tempFile.exists() ) {
-                tempFile.delete();
-            }
-
-            return null;
-        }
-    }
-}
-

Reply via email to