http://git-wip-us.apache.org/repos/asf/usergrid/blob/c915a316/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java deleted file mode 100644 index 6c1ffc0..0000000 --- a/stack/services/src/main/java/org/apache/usergrid/management/importer/ImportServiceImpl.java +++ /dev/null @@ -1,1352 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.usergrid.management.importer; - -import com.google.common.base.Preconditions; -import org.apache.usergrid.batch.JobExecution; -import org.apache.usergrid.batch.service.SchedulerService; -import org.apache.usergrid.corepersistence.util.CpNamingUtils; -import org.apache.usergrid.management.ManagementService; -import org.apache.usergrid.persistence.*; -import org.apache.usergrid.persistence.entities.FailedImportEntity; -import org.apache.usergrid.persistence.entities.FileImport; -import org.apache.usergrid.persistence.entities.Import; -import org.apache.usergrid.persistence.entities.JobData; -import org.apache.usergrid.persistence.exceptions.EntityNotFoundException; -import org.apache.usergrid.persistence.Query; -import org.apache.usergrid.persistence.Query.Level; -import org.apache.usergrid.utils.InflectionUtils; -import org.codehaus.jackson.JsonFactory; -import org.codehaus.jackson.JsonParser; -import org.codehaus.jackson.JsonToken; -import org.codehaus.jackson.map.ObjectMapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import rx.Observable; -import rx.Subscriber; -import rx.functions.Action1; -import rx.functions.Func2; -import rx.schedulers.Schedulers; - -import javax.annotation.Nullable; -import javax.annotation.PostConstruct; -import java.io.File; -import java.util.*; - - -public class ImportServiceImpl implements ImportService { - - public static final String IMPORT_ID = "importId"; - public static final String IMPORT_JOB_NAME = "importJob"; - public static final String FILE_IMPORT_ID = "fileImportId"; - public static final String FILE_IMPORT_JOB_NAME = "fileImportJob"; - public static final int HEARTBEAT_COUNT = 50; - - public static final String APP_IMPORT_CONNECTION = "imports"; - public static final String IMPORT_FILE_INCLUDES_CONNECTION = "files"; - - private static final Logger logger = LoggerFactory.getLogger(ImportServiceImpl.class); - - int MAX_FILE_IMPORTS = 1000; // max number of file import jobs / import job - - protected EntityManagerFactory emf; - - private SchedulerService sch; - - private ManagementService managementService; - - private JsonFactory jsonFactory = new JsonFactory(); - - - @PostConstruct - public void init() { - } - - - /** - * This schedules the main import Job. - * - * @param config configuration of the job to be scheduled - * @return it returns the UUID of the scheduled job - */ - @Override - public Import schedule(final UUID application, Map<String, Object> config) throws Exception { - - Preconditions.checkNotNull(config, "import information cannot be null"); - Preconditions.checkNotNull(application, "application cannot be null"); - - final EntityManager rootEM; - try { - rootEM = emf.getEntityManager(emf.getManagementAppId()); - } catch (Exception e) { - logger.error("application doesn't exist within the current context", e); - return null; - } - - Import importEntity = new Import(); - importEntity.setState(Import.State.CREATED); - - // create the import entity to store all metadata about the import job - try { - importEntity = rootEM.create(importEntity); - } catch (Exception e) { - logger.error("Import entity creation failed", e); - return null; - } - - // update state for import job to created - - // set data to be transferred to importInfo - JobData jobData = new JobData(); - jobData.setProperty("importInfo", config); - jobData.setProperty(IMPORT_ID, importEntity.getUuid()); - - long soonestPossible = System.currentTimeMillis() + 250; //sch grace period - - // schedule import job - sch.createJob(IMPORT_JOB_NAME, soonestPossible, jobData); - - // update state for import job to created - importEntity.setState(Import.State.SCHEDULED); - rootEM.update(importEntity); - - final EntityRef appInfo = getApplicationInfoEntity(rootEM, application); - - //now link it to the application - rootEM.createConnection(appInfo, APP_IMPORT_CONNECTION, importEntity); - - return importEntity; - } - - - @Override - public Results getImports(final UUID applicationId, @Nullable final String ql, @Nullable final String cursor) { - Preconditions.checkNotNull(applicationId, "applicationId must be specified"); - - try { - final EntityManager rootEm = emf.getEntityManager(emf.getManagementAppId()); - final Entity appInfo = getApplicationInfoEntity(rootEm, applicationId); - - Query query = Query.fromQLNullSafe(ql); - query.setCursor(cursor); - - //set our entity type - query.setEntityType(Schema.getDefaultSchema().getEntityType(Import.class)); - - return rootEm.searchCollection(appInfo, APP_IMPORT_CONNECTION, query); - } catch (Exception e) { - throw new RuntimeException("Unable to get import entity", e); - } - } - - - @Override - public Import getImport(final UUID applicationId, final UUID importId) { - Preconditions.checkNotNull(applicationId, "applicationId must be specified"); - Preconditions.checkNotNull(importId, "importId must be specified"); - - try { - final EntityManager rootEm = emf.getEntityManager(emf.getManagementAppId()); - - final Entity appInfo = getApplicationInfoEntity(rootEm, applicationId); - final Import importEntity = rootEm.get(importId, Import.class); - - // check if it's on the path - if (!rootEm.isConnectionMember(appInfo, APP_IMPORT_CONNECTION, importEntity)) { - return null; - } - - return importEntity; - } catch (Exception e) { - throw new RuntimeException("Unable to get import entity", e); - } - - } - - - private Entity getApplicationInfoEntity(final EntityManager rootEm, final UUID applicationId) throws Exception { - final Entity entity = rootEm.get(new SimpleEntityRef(CpNamingUtils.APPLICATION_INFO, applicationId)); - - if (entity == null) { - throw new EntityNotFoundException("Cound not find application with id " + applicationId); - } - - return entity; - } - - @Override - public Results getFileImports(final UUID applicationId, final UUID importId, - @Nullable final String ql, @Nullable final String cursor) { - - Preconditions.checkNotNull(applicationId, "applicationId must be specified"); - Preconditions.checkNotNull(importId, "importId must be specified"); - - try { - final EntityManager rootEm = emf.getEntityManager(emf.getManagementAppId()); - - - final Import importEntity = getImport(applicationId, importId); - - Query query = Query.fromQLNullSafe(ql); - query.setCursor(cursor); - query.setConnectionType(IMPORT_FILE_INCLUDES_CONNECTION); - query.setResultsLevel(Level.ALL_PROPERTIES); - - - //set our entity type - query.setEntityType(Schema.getDefaultSchema().getEntityType(FileImport.class)); - - return rootEm.searchTargetEntities(importEntity, query); - } catch (Exception e) { - throw new RuntimeException("Unable to get import entity", e); - } - - } - - - @Override - public FileImport getFileImport(final UUID applicationId, final UUID importId, final UUID fileImportId) { - try { - final EntityManager rootEm = emf.getEntityManager(emf.getManagementAppId()); - - final Import importEntity = getImport(applicationId, importId); - - if (importEntity == null) { - throw new EntityNotFoundException("Import not found with id " + importId); - } - - final FileImport fileImport = rootEm.get(importId, FileImport.class); - - - // check if it's on the path - if (!rootEm.isConnectionMember(importEntity, APP_IMPORT_CONNECTION, fileImport)) { - return null; - } - - return fileImport; - } catch (Exception e) { - throw new RuntimeException("Unable to load file import", e); - } - } - - - @Override - public Results getFailedImportEntities(final UUID applicationId, final UUID importId, final UUID fileImportId, - @Nullable final String ql, @Nullable final String cursor) { - - Preconditions.checkNotNull(applicationId, "applicationId must be specified"); - Preconditions.checkNotNull(importId, "importId must be specified"); - Preconditions.checkNotNull(fileImportId, "fileImportId must be specified"); - - try { - final EntityManager rootEm = emf.getEntityManager(emf.getManagementAppId()); - - - final FileImport importEntity = getFileImport(applicationId, importId, fileImportId); - - Query query = Query.fromQLNullSafe(ql); - query.setCursor(cursor); - query.setConnectionType(FileImportTracker.ERRORS_CONNECTION_NAME); - query.setResultsLevel(Level.ALL_PROPERTIES); - - - //set our entity type - query.setEntityType(Schema.getDefaultSchema().getEntityType(FailedImportEntity.class)); - - return rootEm.searchTargetEntities(importEntity, query); - } catch (Exception e) { - throw new RuntimeException("Unable to get import entity", e); - } - } - - - @Override - public FailedImportEntity getFailedImportEntity(final UUID applicationId, final UUID importId, - final UUID fileImportId, final UUID failedImportId) { - try { - final EntityManager rootEm = emf.getEntityManager(emf.getManagementAppId()); - - - final FileImport importEntity = getFileImport(applicationId, importId, fileImportId); - - if (importEntity == null) { - throw new EntityNotFoundException("Import not found with id " + importId); - } - - - final FailedImportEntity fileImport = rootEm.get(importId, FailedImportEntity.class); - - - // check if it's on the path - if (!rootEm.isConnectionMember(importEntity, FileImportTracker.ERRORS_CONNECTION_NAME, fileImport)) { - return null; - } - - return fileImport; - } catch (Exception e) { - throw new RuntimeException("Unable to load file import", e); - } - } - - - /** - * This schedules the sub FileImport Job - * - * @param file file to be scheduled - * @return it returns the UUID of the scheduled job - * @throws Exception - */ - private JobData createFileTask(Map<String, Object> config, String file, EntityRef importRef) throws Exception { - - if (logger.isTraceEnabled()) { - logger.trace("scheduleFile() for import {}:{} file {}", - importRef.getType(), importRef.getType(), file); - } - - EntityManager rootEM; - - try { - rootEM = emf.getEntityManager(emf.getManagementAppId()); - } catch (Exception e) { - logger.error("application doesn't exist within the current context"); - return null; - } - - // create a FileImport entity to store metadata about the fileImport job - UUID applicationId = (UUID) config.get("applicationId"); - FileImport fileImport = new FileImport(file, applicationId); - fileImport = rootEM.create(fileImport); - - Import importEntity = rootEM.get(importRef, Import.class); - - try { - // create a connection between the main import job and the sub FileImport Job - rootEM.createConnection(importEntity, IMPORT_FILE_INCLUDES_CONNECTION, fileImport); - - if (logger.isTraceEnabled()) { - logger.trace("Created connection from {}:{} to {}:{}", - importEntity.getType(), importEntity.getUuid(), - fileImport.getType(), fileImport.getUuid() - ); - } - - } catch (Exception e) { - logger.error("Exception creating file task connection", e.getMessage()); - return null; - } - - // mark the File Import Job as created - fileImport.setState(FileImport.State.CREATED); - rootEM.update(fileImport); - - // set data to be transferred to the FileImport Job - JobData jobData = new JobData(); - jobData.setProperty("File", file); - jobData.setProperty(FILE_IMPORT_ID, fileImport.getUuid()); - jobData.addProperties(config); - - // update state of the job to Scheduled - fileImport.setState(FileImport.State.SCHEDULED); - rootEM.update(fileImport); - - return jobData; - } - - - private int getConnectionCount(final Import importRoot) { - - try { - - EntityManager rootEM = emf.getEntityManager(emf.getManagementAppId()); - Query query = Query.fromQL("select *"); - query.setEntityType("file_import"); - query.setConnectionType(IMPORT_FILE_INCLUDES_CONNECTION); - query.setLimit(MAX_FILE_IMPORTS); - - // TODO, this won't work with more than 100 files - Results entities = rootEM.searchTargetEntities(importRoot, query); - return entities.size(); - - // see ImportConnectsTest() -// Results entities = rootEM.getTargetEntities( -// importRoot, "includes", null, Level.ALL_PROPERTIES ); -// PagingResultsIterator itr = new PagingResultsIterator( entities ); -// int count = 0; -// while ( itr.hasNext() ) { -// itr.next(); -// count++; -// } -// return count; - } catch (Exception e) { - logger.error("application doesn't exist within the current context"); - throw new RuntimeException(e); - } - } - - - /** - * Schedule the file tasks. This must happen in 2 phases. The first is linking the - * sub files to the master the second is scheduling them to run. - */ - private JobData scheduleFileTasks(final JobData jobData) { - - long soonestPossible = System.currentTimeMillis() + 250; //sch grace period - - // schedule file import job - return sch.createJob(FILE_IMPORT_JOB_NAME, soonestPossible, jobData); - } - - - /** - * Query Entity Manager for the state of the Import Entity. This corresponds to the GET /import - */ - @Override - public Import.State getState(UUID uuid) throws Exception { - - Preconditions.checkNotNull(uuid, "uuid cannot be null"); - - EntityManager rootEm = emf.getEntityManager(emf.getManagementAppId()); - - //retrieve the import entity. - Import importUG = rootEm.get(uuid, Import.class); - - if (importUG == null) { - throw new EntityNotFoundException("Could not find entity with uuid " + uuid); - } - - return importUG.getState(); - } - - - /** - * Query Entity Manager for the error message generated for an import job. - */ - @Override - public String getErrorMessage(final UUID uuid) throws Exception { - - //get application entity manager - - if (uuid == null) { - logger.error("getErrorMessage(): UUID passed in cannot be null."); - return "UUID passed in cannot be null"; - } - - EntityManager rootEm = emf.getEntityManager(emf.getManagementAppId()); - - //retrieve the import entity. - Import importUG = rootEm.get(uuid, Import.class); - - if (importUG == null) { - logger.error("getErrorMessage(): no entity with that uuid was found"); - return "No Such Element found"; - } - return importUG.getErrorMessage(); - } - - - /** - * Returns the Import Entity that stores all meta-data for the particular import Job - * - * @param jobExecution the import job details - * @return Import Entity - */ - @Override - public Import getImportEntity(final JobExecution jobExecution) throws Exception { - - UUID importId = (UUID) jobExecution.getJobData().getProperty(IMPORT_ID); - EntityManager importManager = emf.getEntityManager(emf.getManagementAppId()); - - return importManager.get(importId, Import.class); - } - - - /** - * Returns the File Import Entity that stores all meta-data for the particular sub File import Job - * - * @return File Import Entity - */ - @Override - public FileImport getFileImportEntity(final JobExecution jobExecution) throws Exception { - UUID fileImportId = (UUID) jobExecution.getJobData().getProperty(FILE_IMPORT_ID); - EntityManager em = emf.getEntityManager(emf.getManagementAppId()); - return em.get(fileImportId, FileImport.class); - } - - - public EntityManagerFactory getEmf() { - return emf; - } - - - public void setEmf(final EntityManagerFactory emf) { - this.emf = emf; - } - - - public ManagementService getManagementService() { - - return managementService; - } - - - public void setManagementService(final ManagementService managementService) { - this.managementService = managementService; - } - - - public void setSch(final SchedulerService sch) { - this.sch = sch; - } - - - /** - * This method creates sub-jobs for each file i.e. File Import Jobs. - * - * @param jobExecution the job created by the scheduler with all the required config data - */ - @Override - public void doImport(JobExecution jobExecution) throws Exception { - - if (logger.isTraceEnabled()) { - logger.trace("doImport()"); - } - - @SuppressWarnings("unchecked") - Map<String, Object> config = (Map<String, Object>) jobExecution.getJobData().getProperty("importInfo"); - if (config == null) { - logger.error("doImport(): Import Information passed through is null"); - return; - } - - @SuppressWarnings("unchecked") - Map<String, Object> properties = (Map<String, Object>) config.get("properties"); - @SuppressWarnings("unchecked") - Map<String, Object> storage_info = (Map<String, Object>) properties.get("storage_info"); - - String bucketName = (String) storage_info.get("bucket_location"); - String accessId = (String) storage_info.get("s3_access_id"); - String secretKey = (String) storage_info.get("s3_key"); - - // get Import Entity from the management app, update it to show that job has started - - final EntityManager rootEM = emf.getEntityManager(emf.getManagementAppId()); - UUID importId = (UUID) jobExecution.getJobData().getProperty(IMPORT_ID); - Import importEntity = rootEM.get(importId, Import.class); - - importEntity.setState(Import.State.STARTED); - importEntity.setStarted(System.currentTimeMillis()); - importEntity.setErrorMessage(" "); - rootEM.update(importEntity); - if (logger.isTraceEnabled()) { - logger.trace("doImport(): updated state"); - } - - // if no S3 importer was passed in then create one - - S3Import s3Import; - Object s3PlaceHolder = jobExecution.getJobData().getProperty("s3Import"); - try { - if (s3PlaceHolder != null) { - s3Import = (S3Import) s3PlaceHolder; - } else { - s3Import = new S3ImportImpl(); - } - } catch (Exception e) { - logger.error("doImport(): Error creating S3Import", e); - importEntity.setErrorMessage(e.getMessage()); - importEntity.setState(Import.State.FAILED); - rootEM.update(importEntity); - return; - } - - // get list of all JSON files in S3 bucket - - final List<String> bucketFiles; - try { - - if (config.get("organizationId") == null) { - logger.error("doImport(): No organization could be found"); - importEntity.setErrorMessage("No organization could be found"); - importEntity.setState(Import.State.FAILED); - rootEM.update(importEntity); - return; - - } else { - - if (config.get("applicationId") == null) { - throw new UnsupportedOperationException("Import applications not supported"); - - } else { - bucketFiles = s3Import.getBucketFileNames(bucketName, ".json", accessId, secretKey); - } - } - - } catch (OrganizationNotFoundException | ApplicationNotFoundException e) { - importEntity.setErrorMessage(e.getMessage()); - importEntity.setState(Import.State.FAILED); - rootEM.update(importEntity); - return; - } - - - // schedule a FileImport job for each file found in the bucket - - if (bucketFiles.isEmpty()) { - importEntity.setState(Import.State.FINISHED); - importEntity.setErrorMessage("No files found in the bucket: " + bucketName); - rootEM.update(importEntity); - - } else { - - Map<String, Object> fileMetadata = new HashMap<>(); - ArrayList<Map<String, Object>> value = new ArrayList<>(); - final List<JobData> fileJobs = new ArrayList<>(bucketFiles.size()); - - // create the Entity Connection and set up metadata for each job - - for (String bucketFile : bucketFiles) { - final JobData jobData = createFileTask(config, bucketFile, importEntity); - fileJobs.add(jobData); - } - - int retries = 0; - int maxRetries = 60; - boolean done = false; - while (!done && retries++ < maxRetries) { - - final int count = getConnectionCount(importEntity); - - if (count == fileJobs.size()) { - if (logger.isTraceEnabled()) { - logger.trace("Got ALL {} of {} expected connections", count, fileJobs.size()); - } - done = true; - } else { - if (logger.isTraceEnabled()) { - logger.trace("Got {} of {} expected connections. Waiting...", count, fileJobs.size()); - } - Thread.sleep(1000); - } - } - if (retries >= maxRetries) { - throw new RuntimeException("Max retries was reached"); - } - - // schedule each job - - for (JobData jobData : fileJobs) { - - final JobData scheduled = scheduleFileTasks(jobData); - - Map<String, Object> fileJobID = new HashMap<>(); - fileJobID.put("FileName", scheduled.getProperty("File")); - fileJobID.put("JobID", scheduled.getUuid()); - value.add(fileJobID); - } - - fileMetadata.put("files", value); - importEntity.addProperties(fileMetadata); - importEntity.setFileCount(fileJobs.size()); - rootEM.update(importEntity); - } - } - - - @Override - public void downloadAndImportFile(JobExecution jobExecution) { - - // get values we need - - @SuppressWarnings("unchecked") - Map<String, Object> properties = (Map<String, Object>) jobExecution.getJobData().getProperty("properties"); - if (properties == null) { - logger.error("downloadAndImportFile(): Import Information passed through is null"); - return; - } - @SuppressWarnings("unchecked") - Map<String, Object> storage_info = (Map<String, Object>) properties.get("storage_info"); - - String bucketName = (String) storage_info.get("bucket_location"); - String accessId = (String) storage_info.get("s3_access_id"); - String secretKey = (String) storage_info.get("s3_key"); - - EntityManager rootEM = emf.getEntityManager(emf.getManagementAppId()); - - // get the file import entity - - FileImport fileImport; - try { - fileImport = getFileImportEntity(jobExecution); - } catch (Exception e) { - logger.error("Error updating fileImport to set state of file import", e); - return; - } - - // tracker flushes every 100 entities - final FileImportTracker tracker = new FileImportTracker(emf, fileImport, 100); - - String fileName = jobExecution.getJobData().getProperty("File").toString(); - UUID targetAppId = (UUID) jobExecution.getJobData().getProperty("applicationId"); - - // is job already done? - if (FileImport.State.FAILED.equals(fileImport.getState()) - || FileImport.State.FINISHED.equals(fileImport.getState())) { - return; - } - - // update FileImport Entity to indicate that we have started - - if (logger.isDebugEnabled()) { - logger.debug("downloadAndImportFile() for file {} ", fileName); - } - - try { - rootEM.update(fileImport); - fileImport.setState(FileImport.State.STARTED); - rootEM.update(fileImport); - - if (rootEM.get(targetAppId) == null) { - tracker.fatal("Application " + targetAppId + " does not exist"); - return; - } - - } catch (Exception e) { - tracker.fatal("Application " + targetAppId + " does not exist"); - checkIfComplete(rootEM, fileImport); - return; - } - - EntityManager targetEm = emf.getEntityManager(targetAppId); - - // download file from S3, if no S3 importer was passed in then create one - - File downloadedFile = null; - S3Import s3Import; - Object s3PlaceHolder = jobExecution.getJobData().getProperty("s3Import"); - try { - if (s3PlaceHolder != null) { - s3Import = (S3Import) s3PlaceHolder; - } else { - s3Import = new S3ImportImpl(); - } - } catch (Exception e) { - tracker.fatal("Error connecting to S3: " + e.getMessage()); - checkIfComplete(rootEM, fileImport); - return; - } - - try { - downloadedFile = s3Import.copyFileFromBucket( - fileName, bucketName, accessId, secretKey); - } catch (Exception e) { - tracker.fatal("Error downloading file: " + e.getMessage()); - checkIfComplete(rootEM, fileImport); - return; - } - - // parse JSON data, create Entities and Connections from import data - - try { - parseEntitiesAndConnectionsFromJson( - jobExecution, downloadedFile, targetEm, rootEM, fileImport, tracker); - - } catch (Exception e) { - tracker.fatal(e.getMessage()); - } - - checkIfComplete(rootEM, fileImport); - } - - - private Import getImportEntity(final EntityManager rootEm, final FileImport fileImport) { - try { - Results importJobResults = - rootEm.getSourceEntities(fileImport, IMPORT_FILE_INCLUDES_CONNECTION, - null, Level.ALL_PROPERTIES); - - List<Entity> importEntities = importJobResults.getEntities(); - final Import importEntity = (Import) importEntities.get(0).toTypedEntity(); - return importEntity; - } catch (Exception e) { - throw new RuntimeException("Unable to import entity"); - } - } - - /** - * Check if we're the last job on failure - */ - private void checkIfComplete(final EntityManager rootEM, final FileImport fileImport) { - int failCount = 0; - int successCount = 0; - - final Import importEntity = getImportEntity(rootEM, fileImport); - - try { - - // wait for query index to catch up - - // TODO: better way to wait for indexes to catch up - try { - Thread.sleep(5000); - } catch (Exception intentionallyIgnored) { - // intentionally ignored - } - - // get file import entities for this import job - - Query query = new Query(); - query.setEntityType(Schema.getDefaultSchema().getEntityType(FileImport.class)); - query.setConnectionType(IMPORT_FILE_INCLUDES_CONNECTION); - query.setLimit(MAX_FILE_IMPORTS); - - Results entities = rootEM.searchTargetEntities(importEntity, query); - PagingResultsIterator itr = new PagingResultsIterator(entities); - - if (!itr.hasNext()) { - logger.warn("Found no FileImport entities for import {}, unable to check if complete", - importEntity.getUuid()); - return; - } - - if (logger.isTraceEnabled()) { - logger.trace("Checking {} file import jobs to see if we are done for file {}", - new Object[]{entities.size(), fileImport.getFileName()}); - } - - // loop through entities, count different types of status - - while (itr.hasNext()) { - FileImport fi = (FileImport) itr.next(); - - switch (fi.getState()) { - case FAILED: // failed, but we may not be complete so continue checking - failCount++; - break; - case FINISHED: // finished, we can continue checking - successCount++; - continue; - default: // not something we recognize as complete, short circuit - if (logger.isDebugEnabled()) { - logger.debug("not done yet, bail out... {}", fi.getState().toString()); - } - return; - } - } - } catch (Exception e) { - failCount++; - if (importEntity != null) { - importEntity.setErrorMessage("Error determining status of file import jobs"); - } - if (logger.isDebugEnabled()) { - logger.debug("Error determining status of file import jobs", e); - } - } - - if (logger.isTraceEnabled()) { - logger.trace("successCount = {} failCount = {}", successCount, failCount); - } - - if (importEntity != null) { - if (logger.isTraceEnabled()) { - logger.trace("FINISHED"); - } - - if (failCount == 0) { - importEntity.setState(Import.State.FINISHED); - } else { - // we had failures, set it to failed - importEntity.setState(Import.State.FAILED); - } - - try { - rootEM.update(importEntity); - if (logger.isTraceEnabled()) { - logger.trace("Updated import entity {}:{} with state {}", - importEntity.getType(), importEntity.getUuid(), importEntity.getState()); - } - } catch (Exception e) { - logger.error("Error updating import entity", e); - } - } - - - } - - - /** - * Gets the JSON parser for given file - * - * @param collectionFile the file for which JSON parser is required - */ - private JsonParser getJsonParserForFile(File collectionFile) throws Exception { - JsonParser jp = jsonFactory.createJsonParser(collectionFile); - jp.setCodec(new ObjectMapper()); - return jp; - } - - - /** - * Imports the entity's connecting references (collections, connections and dictionaries) - * - * @param execution The job jobExecution currently running - * @param file The file to be imported - * @param em Entity Manager for the application being imported - * @param rootEm Entity manager for the root applicaition - * @param fileImport The file import entity - */ - private void parseEntitiesAndConnectionsFromJson( - final JobExecution execution, - final File file, - final EntityManager em, - final EntityManager rootEm, - final FileImport fileImport, - final FileImportTracker tracker) throws Exception { - - - // tracker flushes every 100 entities - //final FileImportTracker tracker = new FileImportTracker( emf, fileImport, 100 ); - - // function to execute for each write event - final Action1<WriteEvent> doWork = new Action1<WriteEvent>() { - @Override - public void call(WriteEvent writeEvent) { - writeEvent.doWrite(em, fileImport, tracker); - } - }; - - // invokes the heartbeat every HEARTBEAT_COUNT operations - final Func2<Integer, WriteEvent, Integer> heartbeatReducer = new Func2<Integer, WriteEvent, Integer>() { - @Override - public Integer call(final Integer integer, final WriteEvent writeEvent) { - final int next = integer.intValue() + 1; - if (next % HEARTBEAT_COUNT == 0) { - execution.heartbeat(); - } - return next; - } - }; - - - // FIRST PASS: import all entities in the file - - - // observable that parses JSON and emits write events - JsonParser jp = getJsonParserForFile(file); - - // TODO: move JSON parser into observable creation so open/close happens within the stream - // entitiesOnly = true - final JsonEntityParserObservable jsonObservableEntities = - new JsonEntityParserObservable(jp, em, rootEm, fileImport, tracker, true); - - final Observable<WriteEvent> entityEventObservable = Observable.create(jsonObservableEntities); - - // only take while our stats tell us we should continue processing - // potentially skip the first n if this is a resume operation - final int entityNumSkip = (int) tracker.getTotalEntityCount(); - - - entityEventObservable.takeWhile(writeEvent -> !tracker.shouldStopProcessingEntities()).skip(entityNumSkip) - .flatMap(writeEvent -> { - return Observable.just(writeEvent).doOnNext(doWork); - }, 10).reduce(0, heartbeatReducer).toBlocking().last(); - - - jp.close(); - - if (FileImport.State.FAILED.equals(fileImport.getState())) { - if (logger.isTraceEnabled()) { - logger.trace("\n\nFailed to completely write entities, skipping second phase. File: {}\n", - fileImport.getFileName()); - } - return; - } - if (logger.isTraceEnabled()) { - logger.trace("\n\nWrote entities. File: {}\n", fileImport.getFileName()); - } - - - // SECOND PASS: import all connections and dictionaries - - - // observable that parses JSON and emits write events - jp = getJsonParserForFile(file); - - // TODO: move JSON parser into observable creation so open/close happens within the stream - // entitiesOnly = false - final JsonEntityParserObservable jsonObservableOther = - new JsonEntityParserObservable(jp, em, rootEm, fileImport, tracker, false); - - final Observable<WriteEvent> otherEventObservable = Observable.create(jsonObservableOther); - - // only take while our stats tell us we should continue processing - // potentially skip the first n if this is a resume operation - final int connectionNumSkip = (int) tracker.getTotalConnectionCount(); - - // with this code we get asynchronous behavior and testImportWithMultipleFiles will fail - final int connectionCount = otherEventObservable.takeWhile( - writeEvent -> !tracker.shouldStopProcessingConnections()).skip(connectionNumSkip).flatMap(entityWrapper -> { - return Observable.just(entityWrapper).doOnNext(doWork).subscribeOn(Schedulers.io()); - - }, 10).reduce(0, heartbeatReducer).toBlocking().last(); - - jp.close(); - - if (logger.isTraceEnabled()) { - logger.trace("\n\nparseEntitiesAndConnectionsFromJson(): Wrote others for file {}\n", - fileImport.getFileName()); - } - - if (FileImport.State.FAILED.equals(fileImport.getState())) { - if (logger.isDebugEnabled()) { - logger.debug("\n\nparseEntitiesAndConnectionsFromJson(): failed to completely write entities\n"); - } - return; - } - - // flush the job statistics - tracker.complete(); - - if (FileImport.State.FAILED.equals(fileImport.getState())) { - if (logger.isDebugEnabled()) { - logger.debug("\n\nFailed to completely write connections and dictionaries. File: {}\n", - fileImport.getFileName()); - } - return; - } - - if (logger.isTraceEnabled()) { - logger.trace("\n\nWrote connections and dictionaries. File: {}\n", fileImport.getFileName()); - } - } - - - private interface WriteEvent { - public void doWrite(EntityManager em, FileImport fileImport, FileImportTracker tracker); - } - - - private final class EntityEvent implements WriteEvent { - UUID entityUuid; - String entityType; - Map<String, Object> properties; - - EntityEvent(UUID entityUuid, String entityType, Map<String, Object> properties) { - this.entityUuid = entityUuid; - this.entityType = entityType; - this.properties = properties; - } - - - // Creates entities - @Override - public void doWrite(EntityManager em, FileImport fileImport, FileImportTracker tracker) { - try { - if (logger.isTraceEnabled()) { - logger.trace("Writing imported entity {}:{} into app {}", - entityType, entityUuid, em.getApplication().getUuid()); - } - - em.create(entityUuid, entityType, properties); - - tracker.entityWritten(); - - } catch (Exception e) { - logger.error("Error writing entity. From file:{}", fileImport.getFileName(), e); - - tracker.entityFailed(e.getMessage() + " From file: " + fileImport.getFileName()); - } - } - } - - - private final class ConnectionEvent implements WriteEvent { - EntityRef ownerEntityRef; - String connectionType; - EntityRef entityRef; - - ConnectionEvent(EntityRef ownerEntityRef, String connectionType, EntityRef entryRef) { - this.ownerEntityRef = ownerEntityRef; - this.connectionType = connectionType; - this.entityRef = entryRef; - } - - // creates connections between entities - @Override - public void doWrite(EntityManager em, FileImport fileImport, FileImportTracker tracker) { - - try { - // TODO: do we need to ensure that all Entity events happen first? - // TODO: what happens if ConnectionEvents happen before all entities are saved? - - // Connections are specified as UUIDs with no type - if (entityRef.getType() == null) { - entityRef = em.get(ownerEntityRef.getUuid()); - } - - if (logger.isTraceEnabled()) { - logger.trace("Creating connection from {}:{} to {}:{}", - ownerEntityRef.getType(), ownerEntityRef.getUuid(), - entityRef.getType(), entityRef.getUuid()); - } - - em.createConnection(ownerEntityRef, connectionType, entityRef); - - tracker.connectionWritten(); - - } catch (Exception e) { - logger.error("Error writing connection. From file: {}", fileImport.getFileName(), e); - - tracker.connectionFailed(e.getMessage() + " From file: " + fileImport.getFileName()); - } - } - } - - - private final class DictionaryEvent implements WriteEvent { - - EntityRef ownerEntityRef; - String dictionaryName; - Map<String, Object> dictionary; - - DictionaryEvent(EntityRef ownerEntityRef, String dictionaryName, Map<String, Object> dictionary) { - this.ownerEntityRef = ownerEntityRef; - this.dictionaryName = dictionaryName; - this.dictionary = dictionary; - } - - // adds map to the dictionary - @Override - public void doWrite(EntityManager em, FileImport fileImport, FileImportTracker stats) { - try { - - if (logger.isTraceEnabled()) { - logger.trace("Adding map to {}:{} dictionary {}", - ownerEntityRef.getType(), ownerEntityRef.getType(), dictionaryName); - } - - em.addMapToDictionary(ownerEntityRef, dictionaryName, dictionary); - - } catch (Exception e) { - logger.error("Error writing dictionary. From file: " + fileImport.getFileName(), e); - - // TODO add statistics for dictionary writes and failures - } - } - } - - - private final class JsonEntityParserObservable implements Observable.OnSubscribe<WriteEvent> { - public static final String COLLECTION_OBJECT_NAME = "collections"; - private final JsonParser jp; - EntityManager em; - EntityManager rootEm; - FileImport fileImport; - FileImportTracker tracker; - boolean entitiesOnly; - - - JsonEntityParserObservable( - JsonParser parser, - EntityManager em, - EntityManager rootEm, - FileImport fileImport, - FileImportTracker tracker, - boolean entitiesOnly) { - - this.jp = parser; - this.em = em; - this.rootEm = rootEm; - this.fileImport = fileImport; - this.tracker = tracker; - this.entitiesOnly = entitiesOnly; - } - - - @Override - public void call(final Subscriber<? super WriteEvent> subscriber) { - process(subscriber); - } - - - private void process(final Subscriber<? super WriteEvent> subscriber) { - - try { - - // we ignore imported entity type information, entities get the type of the collection - Stack<JsonToken> objectStartStack = new Stack(); - Stack<String> objectNameStack = new Stack(); - EntityRef lastEntity = null; - - String entityType = null; - - while (true) { - - JsonToken token = jp.nextToken(); - - // nothing left to do. - if (token == null) { - break; - } - - String name = jp.getCurrentName(); - - - // start of an object with a field name - - if (token.equals(JsonToken.START_OBJECT)) { - - objectStartStack.push(token); - - // nothing to do - if (name == null) { - continue; - } - - - if ("Metadata".equals(name)) { - - Map<String, Object> entityMap = jp.readValueAs(HashMap.class); - - UUID uuid = null; - if (entityMap.get("uuid") != null) { - uuid = UUID.fromString((String) entityMap.get("uuid")); - lastEntity = new SimpleEntityRef(entityType, uuid); - } - - if (entitiesOnly) { - //logger.debug("{}Got entity with uuid {}", indent, lastEntity); - - WriteEvent event = new EntityEvent(uuid, entityType, entityMap); - processWriteEvent(subscriber, event); - } - objectStartStack.pop(); - } else if ("connections".equals(name)) { - - Map<String, Object> connectionMap = jp.readValueAs(HashMap.class); - - for (String type : connectionMap.keySet()) { - List targets = (List) connectionMap.get(type); - - for (Object targetObject : targets) { - UUID target = UUID.fromString((String) targetObject); - - if (!entitiesOnly) { - //logger.debug("{}Got connection {} to {}", - //new Object[]{indent, type, target.toString()}); - - EntityRef entryRef = new SimpleEntityRef(target); - WriteEvent event = new ConnectionEvent(lastEntity, type, entryRef); - processWriteEvent(subscriber, event); - } - } - } - - objectStartStack.pop(); - - } else if ("dictionaries".equals(name)) { - - Map<String, Object> dictionariesMap = jp.readValueAs(HashMap.class); - for (String dname : dictionariesMap.keySet()) { - Map dmap = (Map) dictionariesMap.get(dname); - - if (!entitiesOnly) { - //logger.debug("{}Got dictionary {} size {}", - //new Object[] {indent, dname, dmap.size() }); - - WriteEvent event = new DictionaryEvent(lastEntity, dname, dmap); - processWriteEvent(subscriber, event); - } - } - - objectStartStack.pop(); - - } else { - // push onto object names we don't immediately understand. Used for parent detection - objectNameStack.push(name); - } - - } else if (token.equals(JsonToken.START_ARRAY)) { - if (objectNameStack.size() == 1 - && COLLECTION_OBJECT_NAME.equals(objectNameStack.peek())) { - entityType = InflectionUtils.singularize(name); - } - - } else if (token.equals(JsonToken.END_OBJECT)) { - objectStartStack.pop(); - } - } - - if (subscriber != null) { - subscriber.onCompleted(); - } - - if (logger.isTraceEnabled()) { - logger.trace("process(): done parsing JSON"); - } - - } catch (Exception e) { - - tracker.fatal(e.getMessage()); - - if (subscriber != null) { - - // don't need to blow up here, we handled the problem - // if we blow up we may prevent in-flight entities from being written - // subscriber.onError(e); - - // but we are done reading entities - subscriber.onCompleted(); - } - } - } - - private void processWriteEvent(final Subscriber<? super WriteEvent> subscriber, WriteEvent writeEvent) { - - if (subscriber == null) { - - // this logic makes it easy to remove Rx for debugging purposes - // no Rx, just do it - writeEvent.doWrite(em, fileImport, tracker); - - } else { - subscriber.onNext(writeEvent); - } - } - - } -} - - -/** - * Custom Exception class for Organization Not Found - */ -class OrganizationNotFoundException extends Exception { - OrganizationNotFoundException(String s) { - super(s); - } -} - - -/** - * Custom Exception class for Application Not Found - */ -class ApplicationNotFoundException extends Exception { - ApplicationNotFoundException(String s) { - super(s); - } -}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/c915a316/stack/services/src/main/java/org/apache/usergrid/management/importer/S3Import.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/S3Import.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/S3Import.java deleted file mode 100644 index 7e09051..0000000 --- a/stack/services/src/main/java/org/apache/usergrid/management/importer/S3Import.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.usergrid.management.importer; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - - -/** - * Plug-able S3Import interface. - */ -public interface S3Import { - - List<String> getBucketFileNames( - String bucketName, String endsWith, String accessId, String secretKey ) throws Exception; - - public File copyFileFromBucket( - String blobFileName, String bucketName, String accessId, String secretKey ) throws Exception; -} http://git-wip-us.apache.org/repos/asf/usergrid/blob/c915a316/stack/services/src/main/java/org/apache/usergrid/management/importer/S3ImportImpl.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/management/importer/S3ImportImpl.java b/stack/services/src/main/java/org/apache/usergrid/management/importer/S3ImportImpl.java deleted file mode 100644 index 921de64..0000000 --- a/stack/services/src/main/java/org/apache/usergrid/management/importer/S3ImportImpl.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.usergrid.management.importer; - -import com.google.common.collect.ImmutableSet; -import com.google.inject.Module; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.RandomStringUtils; -import org.jclouds.ContextBuilder; -import org.jclouds.blobstore.BlobStore; -import org.jclouds.blobstore.BlobStoreContext; -import org.jclouds.blobstore.domain.Blob; -import org.jclouds.blobstore.domain.MutableBlobMetadata; -import org.jclouds.blobstore.domain.PageSet; -import org.jclouds.blobstore.domain.StorageMetadata; -import org.jclouds.blobstore.options.ListContainerOptions; -import org.jclouds.http.config.JavaUrlHttpCommandExecutorServiceModule; -import org.jclouds.logging.log4j.config.Log4JLoggingModule; -import org.jclouds.netty.config.NettyPayloadModule; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.*; -import java.nio.file.Files; -import java.util.*; - - -public class S3ImportImpl implements S3Import { - private static final Logger logger = LoggerFactory.getLogger(S3ImportImpl.class); - - - public File copyFileFromBucket( - String blobFileName, String bucketName, String accessId, String secretKey ) throws Exception { - - // setup to use JCloud BlobStore interface to AWS S3 - - Properties overrides = new Properties(); - overrides.setProperty("s3" + ".identity", accessId); - overrides.setProperty("s3" + ".credential", secretKey); - - final Iterable<? extends Module> MODULES = ImmutableSet.of( - new JavaUrlHttpCommandExecutorServiceModule(), - new Log4JLoggingModule(), - new NettyPayloadModule()); - - BlobStoreContext context = ContextBuilder.newBuilder("s3") - .credentials(accessId, secretKey) - .modules(MODULES) - .overrides(overrides) - .buildView(BlobStoreContext.class); - BlobStore blobStore = context.getBlobStore(); - - // get file from configured bucket, copy it to local temp file - - Blob blob = blobStore.getBlob(bucketName, blobFileName); - if ( blob == null) { - throw new RuntimeException( - "Blob file name " + blobFileName + " not found in bucket " + bucketName ); - } - - FileOutputStream fop = null; - File tempFile; - try { - tempFile = File.createTempFile(bucketName, RandomStringUtils.randomAlphabetic(10)); - tempFile.deleteOnExit(); - fop = new FileOutputStream(tempFile); - InputStream is = blob.getPayload().openStream(); - IOUtils.copyLarge(is, fop); - return tempFile; - - } finally { - if ( fop != null ) { - fop.close(); - } - } - } - - - @Override - public List<String> getBucketFileNames( - String bucketName, String endsWith, String accessId, String secretKey ) { - - // get setup to use JCloud BlobStore interface to AWS S3 - - Properties overrides = new Properties(); - overrides.setProperty("s3" + ".identity", accessId); - overrides.setProperty("s3" + ".credential", secretKey); - - final Iterable<? extends Module> MODULES = ImmutableSet.of( - new JavaUrlHttpCommandExecutorServiceModule(), - new Log4JLoggingModule(), - new NettyPayloadModule()); - - BlobStoreContext context = ContextBuilder.newBuilder("s3") - .credentials(accessId, secretKey) - .modules(MODULES) - .overrides(overrides) - .buildView(BlobStoreContext.class); - BlobStore blobStore = context.getBlobStore(); - - // gets all the files in the configured bucket recursively - - PageSet<? extends StorageMetadata> pageSets = - blobStore.list(bucketName, new ListContainerOptions().recursive()); - - if (logger.isTraceEnabled()) { - logger.trace(" Found {} files in bucket {}", pageSets.size(), bucketName); - } - - List<String> blobFileNames = new ArrayList<>(); - for ( Object pageSet : pageSets ) { - String blobFileName = ((MutableBlobMetadata)pageSet).getName(); - if ( blobFileName.endsWith( endsWith )) { - blobFileNames.add(blobFileName); - } - } - - return blobFileNames; - } - - -} - http://git-wip-us.apache.org/repos/asf/usergrid/blob/c915a316/stack/services/src/main/java/org/apache/usergrid/services/assets/data/S3BinaryStore.java ---------------------------------------------------------------------- diff --git a/stack/services/src/main/java/org/apache/usergrid/services/assets/data/S3BinaryStore.java b/stack/services/src/main/java/org/apache/usergrid/services/assets/data/S3BinaryStore.java deleted file mode 100644 index 2929d93..0000000 --- a/stack/services/src/main/java/org/apache/usergrid/services/assets/data/S3BinaryStore.java +++ /dev/null @@ -1,346 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.usergrid.services.assets.data; - - -import com.google.common.collect.ImmutableSet; -import com.google.common.hash.Hashing; -import com.google.common.io.Files; -import com.google.inject.Module; -import org.apache.commons.codec.binary.Hex; -import org.apache.commons.io.FileUtils; -import org.apache.commons.io.IOUtils; -import org.apache.usergrid.persistence.Entity; -import org.apache.usergrid.persistence.EntityManager; -import org.apache.usergrid.persistence.EntityManagerFactory; -import org.apache.usergrid.utils.StringUtils; -import org.jclouds.ContextBuilder; -import org.jclouds.blobstore.BlobStore; -import org.jclouds.blobstore.BlobStoreContext; -import org.jclouds.blobstore.domain.Blob; -import org.jclouds.blobstore.domain.BlobBuilder; -import org.jclouds.blobstore.options.GetOptions; -import org.jclouds.http.config.JavaUrlHttpCommandExecutorServiceModule; -import org.jclouds.logging.log4j.config.Log4JLoggingModule; -import org.jclouds.netty.config.NettyPayloadModule; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; - -import java.io.*; -import java.util.Map; -import java.util.Properties; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -public class S3BinaryStore implements BinaryStore { - - private static final Iterable<? extends Module> MODULES = ImmutableSet - .of( new JavaUrlHttpCommandExecutorServiceModule(), new Log4JLoggingModule(), new NettyPayloadModule() ); - - private static final Logger logger = LoggerFactory.getLogger( S3BinaryStore.class ); - private static final long FIVE_MB = ( FileUtils.ONE_MB * 5 ); - private static String WORKERS_PROP_NAME = "usergrid.binary.upload-workers"; - - private BlobStoreContext context; - private String accessId; - private String secretKey; - private String bucketName; - private ExecutorService executorService; - - @Autowired - private Properties properties; - - @Autowired - private EntityManagerFactory emf; - - - public S3BinaryStore( String accessId, String secretKey, String bucketName ) { - this.accessId = accessId; - this.secretKey = secretKey; - this.bucketName = bucketName; - } - - - private BlobStoreContext getContext() { - if ( context == null ) { - context = ContextBuilder.newBuilder( "aws-s3" ).credentials( accessId, secretKey ).modules( MODULES ) - .buildView( BlobStoreContext.class ); - - BlobStore blobStore = context.getBlobStore(); - blobStore.createContainerInLocation( null, bucketName ); - } - - return context; - } - - - public void destroy() { - if ( context != null ) { - context.close(); - } - } - - - @Override - public void write( final UUID appId, final Entity entity, InputStream inputStream ) throws IOException { - - // write up to 5mb of data to an byte array - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - long written = IOUtils.copyLarge( inputStream, baos, 0, FIVE_MB ); - byte[] data = baos.toByteArray(); - - if ( written < FIVE_MB ) { // total smaller than 5mb - - final String uploadFileName = AssetUtils.buildAssetKey( appId, entity ); - final String mimeType = AssetMimeHandler.get().getMimeType( entity, data ); - - final Map<String, Object> fileMetadata = AssetUtils.getFileMetadata( entity ); - fileMetadata.put( AssetUtils.LAST_MODIFIED, System.currentTimeMillis() ); - - BlobStore blobStore = getContext().getBlobStore(); - - // need this for JClouds 1.7.x: -// BlobBuilder.PayloadBlobBuilder bb = blobStore.blobBuilder( uploadFileName ) -// .payload( data ).calculateMD5().contentType( mimeType ); - - // need this for JClouds 1.8.x: - BlobBuilder.PayloadBlobBuilder bb = blobStore.blobBuilder(uploadFileName) - .payload( data ) - .contentMD5( Hashing.md5().newHasher().putBytes( data ).hash() ) - .contentType( mimeType ); - - fileMetadata.put( AssetUtils.CONTENT_LENGTH, written ); - if ( fileMetadata.get( AssetUtils.CONTENT_DISPOSITION ) != null ) { - bb.contentDisposition( fileMetadata.get( AssetUtils.CONTENT_DISPOSITION ).toString() ); - } - final Blob blob = bb.build(); - - String md5sum = Hex.encodeHexString( blob.getMetadata().getContentMetadata().getContentMD5() ); - fileMetadata.put( AssetUtils.CHECKSUM, md5sum ); - - String eTag = blobStore.putBlob( bucketName, blob ); - fileMetadata.put( AssetUtils.E_TAG, eTag ); - } - else { // bigger than 5mb... dump 5 mb tmp files and upload from them - - ExecutorService executors = getExecutorService(); - - executors.submit( new UploadWorker( appId, entity, inputStream, data, written ) ); - } - } - - private ExecutorService getExecutorService() { - - if ( executorService == null ) { - synchronized (this) { - - int workers = 40; - String workersString = properties.getProperty( WORKERS_PROP_NAME, "40"); - - if ( StringUtils.isNumeric( workersString ) ) { - workers = Integer.parseInt( workersString ); - } else if ( !StringUtils.isEmpty( workersString )) { - logger.error("Ignoring invalid setting for {}", WORKERS_PROP_NAME); - } - executorService = Executors.newFixedThreadPool( workers ); - } - } - - return executorService; - } - - - @Override - public InputStream read( UUID appId, Entity entity, long offset, long length ) throws IOException { - BlobStore blobStore = getContext().getBlobStore(); - Blob blob; - if ( offset == 0 && length == FIVE_MB ) { - blob = blobStore.getBlob( bucketName, AssetUtils.buildAssetKey( appId, entity ) ); - } - else { - GetOptions options = GetOptions.Builder.range( offset, length ); - blob = blobStore.getBlob( bucketName, AssetUtils.buildAssetKey( appId, entity ), options ); - } - if ( blob == null || blob.getPayload() == null ) { - return null; - } - return blob.getPayload().getInput(); - } - - - @Override - public InputStream read( UUID appId, Entity entity ) throws IOException { - return read( appId, entity, 0, FIVE_MB ); - } - - - @Override - public void delete( UUID appId, Entity entity ) { - BlobStore blobStore = getContext().getBlobStore(); - blobStore.removeBlob( bucketName, AssetUtils.buildAssetKey( appId, entity ) ); - } - - class UploadWorker implements Callable<Void> { - - private UUID appId; - private Entity entity; - private InputStream inputStream; - private byte[] data; - private long written; - - - public UploadWorker( UUID appId, Entity entity, InputStream is, byte[] data, long written ) { - this.appId = appId; - this.entity = entity; - this.inputStream = is; - this.data = data; - this.written = written; - } - - @Override - public Void call() { - - if (logger.isTraceEnabled()) { - logger.trace("Writing temp file for S3 upload"); - } - - // determine max size file allowed, default to 50mb - long maxSizeBytes = 50 * FileUtils.ONE_MB; - String maxSizeMbString = properties.getProperty( "usergrid.binary.max-size-mb", "50" ); - if (StringUtils.isNumeric( maxSizeMbString )) { - maxSizeBytes = Long.parseLong( maxSizeMbString ) * FileUtils.ONE_MB; - } - - // always allow files up to 5mb - if (maxSizeBytes < 5 * FileUtils.ONE_MB ) { - maxSizeBytes = 5 * FileUtils.ONE_MB; - } - - // write temporary file, slightly larger than our size limit - OutputStream os = null; - File tempFile; - try { - tempFile = File.createTempFile( entity.getUuid().toString(), "tmp" ); - tempFile.deleteOnExit(); - os = new BufferedOutputStream( new FileOutputStream( tempFile.getAbsolutePath() ) ); - os.write( data ); - written += data.length; - written += IOUtils.copyLarge( inputStream, os, 0, maxSizeBytes + 1 ); - - if (logger.isTraceEnabled()) { - logger.trace("Write temp file {} length {}", tempFile.getName(), written); - } - - } catch ( IOException e ) { - throw new RuntimeException( "Error creating temp file", e ); - - } finally { - if ( os != null ) { - try { - os.flush(); - } catch (IOException e) { - logger.error( "Error flushing data to temporary upload file", e ); - } - IOUtils.closeQuietly( os ); - } - } - - // if tempFile is too large, delete it, add error to entity file metadata and abort - - Map<String, Object> fileMetadata = AssetUtils.getFileMetadata( entity ); - - if ( tempFile.length() > maxSizeBytes ) { - if (logger.isDebugEnabled()) { - logger.debug("File too large. Temp file size (bytes) = {}, Max file size (bytes) = {}", - tempFile.length(), maxSizeBytes); - } - try { - EntityManager em = emf.getEntityManager( appId ); - fileMetadata.put( "error", "Asset size " + tempFile.length() - + " is larger than max size of " + maxSizeBytes ); - em.update( entity ); - tempFile.delete(); - - } catch ( Exception e ) { - logger.error( "Error updating entity with error message", e); - } - return null; - } - - String uploadFileName = AssetUtils.buildAssetKey( appId, entity ); - String mimeType = AssetMimeHandler.get().getMimeType( entity, data ); - - try { // start the upload - - if (logger.isTraceEnabled()) { - logger.trace("S3 upload thread started"); - } - - BlobStore blobStore = getContext().getBlobStore(); - - // need this for JClouds 1.7.x: -// BlobBuilder.PayloadBlobBuilder bb = blobStore.blobBuilder( uploadFileName ) -// .payload( tempFile ).calculateMD5().contentType( mimeType ); - - // need this for JClouds 1.8.x: - BlobBuilder.PayloadBlobBuilder bb = blobStore.blobBuilder( uploadFileName ) - .payload( tempFile ) - .contentMD5( Files.hash( tempFile, Hashing.md5() ) ) - .contentType( mimeType ); - - if ( fileMetadata.get( AssetUtils.CONTENT_DISPOSITION ) != null ) { - bb.contentDisposition( fileMetadata.get( AssetUtils.CONTENT_DISPOSITION ).toString() ); - } - final Blob blob = bb.build(); - - String md5sum = Hex.encodeHexString( blob.getMetadata().getContentMetadata().getContentMD5() ); - fileMetadata.put( AssetUtils.CHECKSUM, md5sum ); - - if (logger.isTraceEnabled()) { - logger.trace("S3 upload starting"); - } - - String eTag = blobStore.putBlob( bucketName, blob ); - - if (logger.isTraceEnabled()) { - logger.trace("S3 upload complete eTag=" + eTag); - } - - // update entity with eTag - EntityManager em = emf.getEntityManager( appId ); - fileMetadata.put( AssetUtils.LAST_MODIFIED, System.currentTimeMillis() ); - fileMetadata.put( AssetUtils.CONTENT_LENGTH, written ); - fileMetadata.put( AssetUtils.E_TAG, eTag ); - em.update( entity ); - } - catch ( Exception e ) { - logger.error( "error uploading", e ); - } - - if ( tempFile != null && tempFile.exists() ) { - tempFile.delete(); - } - - return null; - } - } -} -